From 437356a8e28844d9dc56d7706ca9f31aa8d188f6 Mon Sep 17 00:00:00 2001 From: Rusty Bee <145002912+rustybee42@users.noreply.github.com> Date: Tue, 2 Jun 2026 12:06:33 +0200 Subject: [PATCH] fix: Per message setting of response time limit Add a new associated const to the Msg trait that defines how long to wait for this message when receiving it as a response. To achieve this, make the timeout setting for stream writes and reads a function argument and pass the message specific time limit (default: 2s). For GetQuotaInfoResp, set this to 40m. This should fix running into timeouts while fetching quota if the storage server needs a long time to process the request. --- shared/src/bee_msg.rs | 3 +++ shared/src/bee_msg/quota.rs | 4 +++ shared/src/conn.rs | 5 ++++ shared/src/conn/incoming.rs | 6 +++-- shared/src/conn/msg_dispatch.rs | 5 +++- shared/src/conn/outgoing.rs | 44 +++++++++++++++++++++------------ shared/src/conn/stream.rs | 23 ++++++++--------- 7 files changed, 58 insertions(+), 32 deletions(-) diff --git a/shared/src/bee_msg.rs b/shared/src/bee_msg.rs index d176d0e4..b8793da3 100644 --- a/shared/src/bee_msg.rs +++ b/shared/src/bee_msg.rs @@ -6,6 +6,7 @@ use anyhow::{Context, Result, anyhow}; use bee_serde_derive::BeeSerde; use std::any::Any; use std::collections::{HashMap, HashSet}; +use std::time::Duration; pub mod buddy_group; pub mod misc; @@ -26,6 +27,8 @@ pub trait BaseMsg: Any + std::fmt::Debug + Send + Sync + 'static {} pub trait Msg: BaseMsg + Default + Clone { /// Message type as defined in NetMessageTypes.h const ID: MsgId; + /// How long to wait to receive this message as a response + const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(2); } impl BaseMsg for M where M: Msg {} diff --git a/shared/src/bee_msg/quota.rs b/shared/src/bee_msg/quota.rs index 19be936f..7806d874 100644 --- a/shared/src/bee_msg/quota.rs +++ b/shared/src/bee_msg/quota.rs @@ -119,6 +119,10 @@ pub struct GetQuotaInfoResp { impl Msg for GetQuotaInfoResp { const ID: MsgId = 2098; + /// This value is large enough to fix timeout issues for a customer whose storage servers + /// took >30m to process the requests. For anyone else, this should also be more than enough, + /// as on a typical system this isn't supposed to be taking that long by far. + const RESPONSE_TIME_LIMIT: Duration = Duration::from_mins(40); } /// Sets exceeded quota information on server nodes. diff --git a/shared/src/conn.rs b/shared/src/conn.rs index 559da65d..366257bc 100644 --- a/shared/src/conn.rs +++ b/shared/src/conn.rs @@ -1,5 +1,7 @@ //! Connection to other BeeGFS nodes +use std::time::Duration; + mod async_queue; pub mod incoming; pub mod msg_dispatch; @@ -16,3 +18,6 @@ const TCP_BUF_LEN: usize = 4 * 1024 * 1024; /// Must match the `DGRAMMR_(RECV|SEND)BUF_SIZE` value in `DatagramListener.*` in the C/C++ /// codebase. Must be smaller than TCP_BUF_LEN; const UDP_BUF_LEN: usize = 65536; + +/// Reasonable time limit for most write and read operations +const DEFAULT_TIME_LIMIT: Duration = Duration::from_secs(2); diff --git a/shared/src/conn/incoming.rs b/shared/src/conn/incoming.rs index 45ed69b5..114d2d2a 100644 --- a/shared/src/conn/incoming.rs +++ b/shared/src/conn/incoming.rs @@ -139,7 +139,9 @@ async fn read_stream( stream_authentication_required: bool, ) -> Result<()> { // Read header - stream.read_exact(&mut buf[0..Header::LEN]).await?; + stream + .read_exact(&mut buf[0..Header::LEN], DEFAULT_TIME_LIMIT) + .await?; let header = deserialize_header(&buf[0..Header::LEN])?; @@ -156,7 +158,7 @@ async fn read_stream( // Read body stream - .read_exact(&mut buf[Header::LEN..header.msg_len()]) + .read_exact(&mut buf[Header::LEN..header.msg_len()], DEFAULT_TIME_LIMIT) .await?; // Forward to the dispatcher. The dispatcher is responsible for deserializing, dispatching to diff --git a/shared/src/conn/msg_dispatch.rs b/shared/src/conn/msg_dispatch.rs index 4bf78242..fc220d3e 100644 --- a/shared/src/conn/msg_dispatch.rs +++ b/shared/src/conn/msg_dispatch.rs @@ -3,6 +3,7 @@ use super::stream::Stream; use crate::bee_msg::{Header, Msg, deserialize_body, serialize}; use crate::bee_serde::{Deserializable, Serializable}; +use crate::conn::DEFAULT_TIME_LIMIT; use anyhow::Result; use std::fmt::Debug; use std::future::Future; @@ -40,7 +41,9 @@ pub struct StreamRequest<'a> { impl Request for StreamRequest<'_> { async fn respond(self, msg: &M) -> Result<()> { let msg_len = serialize(msg, self.buf)?; - self.stream.write_all(&self.buf[0..msg_len]).await + self.stream + .write_all(&self.buf[0..msg_len], DEFAULT_TIME_LIMIT) + .await } fn authenticate_connection(&mut self) { diff --git a/shared/src/conn/outgoing.rs b/shared/src/conn/outgoing.rs index 78fcd2cd..5d0457d2 100644 --- a/shared/src/conn/outgoing.rs +++ b/shared/src/conn/outgoing.rs @@ -3,9 +3,9 @@ use super::store::Store; use crate::bee_msg::misc::AuthenticateChannel; use crate::bee_msg::{Header, Msg, deserialize_body, deserialize_header, serialize}; use crate::bee_serde::{Deserializable, Serializable}; -use crate::conn::TCP_BUF_LEN; use crate::conn::store::StoredStream; use crate::conn::stream::Stream; +use crate::conn::{DEFAULT_TIME_LIMIT, TCP_BUF_LEN}; use crate::types::{AuthSecret, Uid}; use anyhow::{Context, Result, bail}; use std::fmt::Debug; @@ -58,7 +58,9 @@ impl Pool { let mut buf = self.store.pop_buf_or_create(); let msg_len = serialize(msg, &mut buf)?; - let resp_header = self.comm_stream(node_uid, &mut buf, msg_len, true).await?; + let resp_header = self + .comm_stream(node_uid, &mut buf, msg_len, Some(M::RESPONSE_TIME_LIMIT)) + .await?; let resp_msg = deserialize_body(&resp_header, &buf[Header::LEN..])?; self.store.push_buf(buf); @@ -75,7 +77,7 @@ impl Pool { let mut buf = self.store.pop_buf_or_create(); let msg_len = serialize(msg, &mut buf)?; - self.comm_stream(node_uid, &mut buf, msg_len, false).await?; + self.comm_stream(node_uid, &mut buf, msg_len, None).await?; self.store.push_buf(buf); @@ -94,19 +96,21 @@ impl Pool { /// 2. Get a permit that allows opening a new stream. Try to open a new stream using the /// available addresses. /// 3. Pop an open stream from the store, waiting until one gets available. + /// + /// If `response_time_limit` is set to `Some(t)`, a response is expected. async fn comm_stream( &self, node_uid: Uid, buf: &mut [u8], send_len: usize, - expect_response: bool, + response_time_limit: Option, ) -> Result
{ debug_assert_eq!(buf.len(), TCP_BUF_LEN); // 1. Pop open streams until communication succeeds or none are left while let Some(stream) = self.store.try_pop_stream(node_uid) { match self - .write_and_read_stream(buf, stream, send_len, expect_response) + .write_and_read_stream(buf, stream, send_len, response_time_limit) .await { Ok(header) => return Ok(header), @@ -132,7 +136,7 @@ impl Pool { continue; } - match Stream::connect_tcp(addr).await { + match Stream::connect_tcp(addr, DEFAULT_TIME_LIMIT).await { Ok(stream) => { let mut stream = StoredStream::from_stream(stream, permit); @@ -152,7 +156,7 @@ impl Pool { stream .as_mut() - .write_all(&auth_buf[0..msg_len]) + .write_all(&auth_buf[0..msg_len], DEFAULT_TIME_LIMIT) .await .with_context(err_context)?; @@ -162,7 +166,7 @@ impl Pool { // Communication using the newly opened stream should usually not fail. If // it does, abort. It might be better to just try the next address though. let resp_header = self - .write_and_read_stream(buf, stream, send_len, expect_response) + .write_and_read_stream(buf, stream, send_len, response_time_limit) .await .with_context(err_context)?; @@ -189,7 +193,7 @@ impl Pool { })?; let resp_header = self - .write_and_read_stream(buf, stream, send_len, expect_response) + .write_and_read_stream(buf, stream, send_len, response_time_limit) .await .with_context(|| { format!("Communication using existing stream to node with uid {node_uid} failed") @@ -205,20 +209,28 @@ impl Pool { buf: &mut [u8], mut stream: StoredStream, send_len: usize, - expect_response: bool, + response_time_limit: Option, ) -> Result
{ - stream.as_mut().write_all(&buf[0..send_len]).await?; + stream + .as_mut() + .write_all(&buf[0..send_len], DEFAULT_TIME_LIMIT) + .await?; - let header = if expect_response { - // Read header - stream.as_mut().read_exact(&mut buf[0..Header::LEN]).await?; + let header = if let Some(tl) = response_time_limit { + // Read header - wait for the per-message defined time limit. + stream + .as_mut() + .read_exact(&mut buf[0..Header::LEN], tl) + .await?; let header = deserialize_header(&buf[0..Header::LEN])?; - // Read body + // Read body - the per-message time limit has already been respected by the header + // above, for the rest of the data we can just use the default short limit. stream .as_mut() - .read_exact(&mut buf[Header::LEN..header.msg_len()]) + .read_exact(&mut buf[Header::LEN..header.msg_len()], DEFAULT_TIME_LIMIT) .await?; + header } else { Header::default() diff --git a/shared/src/conn/stream.rs b/shared/src/conn/stream.rs index a98f88fe..f73d4bbe 100644 --- a/shared/src/conn/stream.rs +++ b/shared/src/conn/stream.rs @@ -9,8 +9,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use tokio::time::timeout; -const TIMEOUT: Duration = Duration::from_secs(2); - /// A connected generic stream. /// /// Provides functionality to communicate with the connected peer. Can support multiple @@ -39,9 +37,9 @@ impl From for Stream { impl Stream { /// Connect to peer using TCP and obtain a [Stream] object. /// - /// Times out after [TIMEOUT]. - pub async fn connect_tcp(addr: &SocketAddr) -> Result { - let stream = match timeout(TIMEOUT, TcpStream::connect(addr)).await { + /// Times out after [time_limit]. + pub async fn connect_tcp(addr: &SocketAddr, time_limit: Duration) -> Result { + let stream = match timeout(time_limit, TcpStream::connect(addr)).await { Ok(res) => res?, Err(_) => bail!("Connecting a TCP stream to {addr} timed out"), }; @@ -74,13 +72,12 @@ impl Stream { /// Reads from the stream into the provided buffer. /// /// The buffer will be filled completely before the future completes. Times out after - /// [TIMEOUT]. + /// [time_limit]. /// /// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused. // Clippy: Suppress false positive - #[allow(clippy::needless_pass_by_ref_mut)] - pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> { - match timeout(TIMEOUT, async { + pub async fn read_exact(&mut self, buf: &mut [u8], time_limit: Duration) -> Result<()> { + match timeout(time_limit, async { match &mut self.stream { InnerStream::Tcp(s) => { s.read_exact(buf).await?; @@ -98,11 +95,11 @@ impl Stream { /// Writes to the stream from the provided buffer. /// /// The buffer will be written completely before the future completes. Times out after - /// [TIMEOUT]. + /// [time_limit]. /// /// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused. - pub async fn write_all(&mut self, buf: &[u8]) -> Result<()> { - match timeout(TIMEOUT, async { + pub async fn write_all(&mut self, buf: &[u8], time_limit: Duration) -> Result<()> { + match timeout(time_limit, async { match &mut self.stream { InnerStream::Tcp(s) => { s.write_all(buf).await?; @@ -113,7 +110,7 @@ impl Stream { .await { Ok(res) => res, - Err(_) => Err(anyhow!("Writing to a stream to {} timed out", self.addr())), + Err(_) => Err(anyhow!("Writing to stream to {} timed out", self.addr())), } }