feat(ethexe/node-loader): support multiple validator nodes#5208
feat(ethexe/node-loader): support multiple validator nodes#5208
Conversation
Changed Files
|
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces support for multiple validator nodes in the node-loader, improving its robustness and ability to handle API failures. It also includes a script to easily start a local Vara.eth network with multiple validators, streamlining the development and testing process. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for multiple validator nodes in the node-loader. It adds an EthexeRpcPool to manage connections to multiple ethexe-node endpoints, with logic for random selection, reconnection, and retries on failure. A new script, start-local-network.sh, is also included to facilitate setting up a local test network with multiple validators.
My review focuses on the new connection management and retry logic. I've identified a potential race condition in connection handling that could lead to creating unnecessary connections, and significant code duplication in the retry logic for RPC calls. I've provided suggestions to improve both of these aspects for better performance and maintainability.
| async fn reconnect_client( | ||
| &self, | ||
| endpoint_idx: usize, | ||
| api: &Ethereum, | ||
| ) -> Result<Arc<VaraEthApi>> { | ||
| let endpoint = self | ||
| .endpoints | ||
| .get(endpoint_idx) | ||
| .ok_or_else(|| anyhow!("invalid endpoint index: {endpoint_idx}"))?; | ||
|
|
||
| tracing::warn!( | ||
| endpoint_idx, | ||
| endpoint = %endpoint.url, | ||
| "Connecting ethexe RPC client" | ||
| ); | ||
|
|
||
| let client = Arc::new(VaraEthApi::new(&endpoint.url, api.clone()).await?); | ||
| let mut lock = endpoint.client.write().await; | ||
| *lock = Some(client.clone()); | ||
|
|
||
| tracing::info!( | ||
| endpoint_idx, | ||
| endpoint = %endpoint.url, | ||
| "Connected ethexe RPC client" | ||
| ); | ||
|
|
||
| Ok(client) | ||
| } | ||
|
|
||
| async fn get_or_connect_client( | ||
| &self, | ||
| endpoint_idx: usize, | ||
| api: &Ethereum, | ||
| ) -> Result<Arc<VaraEthApi>> { | ||
| let endpoint = self | ||
| .endpoints | ||
| .get(endpoint_idx) | ||
| .ok_or_else(|| anyhow!("invalid endpoint index: {endpoint_idx}"))?; | ||
|
|
||
| if let Some(client) = endpoint.client.read().await.clone() { | ||
| return Ok(client); | ||
| } | ||
|
|
||
| self.reconnect_client(endpoint_idx, api).await | ||
| } |
There was a problem hiding this comment.
There's a potential race condition in get_or_connect_client that can lead to creating unnecessary connections. If multiple threads call this function for an unconnected endpoint, they might all see that the client is None and proceed to call reconnect_client, resulting in multiple connections being established where only one is needed. The last one to acquire the write lock will win, and the other connections will be dropped.
To fix this, you can use a double-checked locking pattern. This involves checking for the client with a read lock, then acquiring a write lock and checking again before creating a new connection. This ensures that only one thread will create the connection.
I suggest refactoring get_or_connect_client and inlining reconnect_client to implement this pattern.
async fn get_or_connect_client(
&self,
endpoint_idx: usize,
api: &Ethereum,
) -> Result<Arc<VaraEthApi>> {
let endpoint = self
.endpoints
.get(endpoint_idx)
.ok_or_else(|| anyhow!("invalid endpoint index: {endpoint_idx}"))?;
// First, check with a read lock for efficiency.
if let Some(client) = endpoint.client.read().await.clone() {
return Ok(client);
}
// If no client, acquire a write lock to create one.
let mut client_guard = endpoint.client.write().await;
// Double-check in case another thread created the client while we were waiting for the lock.
if let Some(client) = client_guard.clone() {
return Ok(client);
}
// Still no client, so we are the one to create it.
tracing::warn!(
endpoint_idx,
endpoint = %endpoint.url,
"Connecting ethexe RPC client"
);
let client = Arc::new(VaraEthApi::new(&endpoint.url, api.clone()).await?);
*client_guard = Some(client.clone());
tracing::info!(
endpoint_idx,
endpoint = %endpoint.url,
"Connected ethexe RPC client"
);
Ok(client)
}| async fn request_code_validation( | ||
| &self, | ||
| endpoint_idx: usize, | ||
| api: &Ethereum, | ||
| code: &[u8], | ||
| ) -> Result<CodeId> { | ||
| for attempt in 1..=RPC_MAX_ATTEMPTS { | ||
| let client = match self.get_or_connect_client(endpoint_idx, api).await { | ||
| Ok(client) => client, | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| continue; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client" | ||
| ); | ||
| return Err(err); | ||
| } | ||
| }; | ||
|
|
||
| match client.router().request_code_validation(code).await { | ||
| Ok((_, code_id)) => return Ok(code_id), | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "request_code_validation failed; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "request_code_validation failed" | ||
| ); | ||
| return Err(err.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Err(anyhow!("request_code_validation exhausted retries")) | ||
| } | ||
|
|
||
| async fn wait_for_code_validation( | ||
| &self, | ||
| endpoint_idx: usize, | ||
| api: &Ethereum, | ||
| code_id: CodeId, | ||
| ) -> Result<()> { | ||
| for attempt in 1..=RPC_MAX_ATTEMPTS { | ||
| let client = match self.get_or_connect_client(endpoint_idx, api).await { | ||
| Ok(client) => client, | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| continue; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client" | ||
| ); | ||
| return Err(err); | ||
| } | ||
| }; | ||
|
|
||
| match client.router().wait_for_code_validation(code_id).await { | ||
| Ok(_) => return Ok(()), | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "wait_for_code_validation failed; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "wait_for_code_validation failed" | ||
| ); | ||
| return Err(err.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Err(anyhow!("wait_for_code_validation exhausted retries")) | ||
| } | ||
|
|
||
| async fn send_message_injected( | ||
| &self, | ||
| endpoint_idx: usize, | ||
| api: &Ethereum, | ||
| actor: ActorId, | ||
| payload: &[u8], | ||
| value: u128, | ||
| ) -> Result<MessageId> { | ||
| for attempt in 1..=RPC_MAX_ATTEMPTS { | ||
| let client = match self.get_or_connect_client(endpoint_idx, api).await { | ||
| Ok(client) => client, | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| continue; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "failed to acquire ethexe RPC client" | ||
| ); | ||
| return Err(err); | ||
| } | ||
| }; | ||
|
|
||
| match client | ||
| .mirror(actor) | ||
| .send_message_injected(payload, value) | ||
| .await | ||
| { | ||
| Ok(mid) => return Ok(mid), | ||
| Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => { | ||
| tracing::warn!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "send_message_injected failed; reconnecting and retrying" | ||
| ); | ||
| self.invalidate_client(endpoint_idx).await; | ||
| } | ||
| Err(err) => { | ||
| tracing::error!( | ||
| endpoint_idx, | ||
| attempt, | ||
| max_attempts = RPC_MAX_ATTEMPTS, | ||
| error = %err, | ||
| "send_message_injected failed" | ||
| ); | ||
| return Err(err.into()); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Err(anyhow!("send_message_injected exhausted retries")) | ||
| } |
There was a problem hiding this comment.
The retry logic is duplicated across request_code_validation, wait_for_code_validation, and send_message_injected. This makes the code harder to maintain and prone to errors if one of them is updated and the others are not.
You can extract this logic into a generic helper function that takes a closure for the specific RPC call. This would reduce code duplication and improve maintainability.
Here's an example of how such a generic helper function could look:
async fn with_retry<T, F, Fut>(&self, endpoint_idx: usize, api: &Ethereum, call_name: &str, f: F) -> Result<T>
where
F: Fn(Arc<VaraEthApi>) -> Fut,
Fut: std::future::Future<Output = Result<T>>,
{
for attempt in 1..=RPC_MAX_ATTEMPTS {
let client = match self.get_or_connect_client(endpoint_idx, api).await {
Ok(client) => client,
Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => {
tracing::warn!(
endpoint_idx,
attempt,
max_attempts = RPC_MAX_ATTEMPTS,
error = %err,
"failed to acquire ethexe RPC client for {}; reconnecting and retrying",
call_name
);
self.invalidate_client(endpoint_idx).await;
continue;
}
Err(err) => {
tracing::error!(
endpoint_idx,
attempt,
max_attempts = RPC_MAX_ATTEMPTS,
error = %err,
"failed to acquire ethexe RPC client for {}",
call_name
);
return Err(err);
}
};
match f(client).await {
Ok(result) => return Ok(result),
Err(err) if attempt < RPC_MAX_ATTEMPTS && is_retryable_rpc_error(&err) => {
tracing::warn!(
endpoint_idx,
attempt,
max_attempts = RPC_MAX_ATTEMPTS,
error = %err,
"{} failed; reconnecting and retrying",
call_name
);
self.invalidate_client(endpoint_idx).await;
}
Err(err) => {
tracing::error!(
endpoint_idx,
attempt,
max_attempts = RPC_MAX_ATTEMPTS,
error = %err,
"{} failed",
call_name
);
return Err(err.into());
}
}
}
Err(anyhow!("{} exhausted retries", call_name))
}You could then refactor request_code_validation like this:
async fn request_code_validation(
&self,
endpoint_idx: usize,
api: &Ethereum,
code: &[u8],
) -> Result<CodeId> {
self.with_retry(endpoint_idx, api, "request_code_validation", |client| async move {
client.router().request_code_validation(code).await.map(|(_, code_id)| code_id)
}).await
}
Added support for multiple-validator nodes in node-loader. It will randomly switch between them and reconenct to them if API fails. In case of too many fails connection is removed, and on all connections dead program terminates.
Also added start-local-network.sh script that allows users to start proper Vara.eth network with multiple validators that are all connected to single Anvil instance. It is also possible to start-up node-loader automatically this way.