Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions shared/src/bee_msg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use anyhow::{Context, Result, anyhow};
use bee_serde_derive::BeeSerde;
use std::any::Any;
use std::collections::{HashMap, HashSet};
use std::time::Duration;

pub mod buddy_group;
pub mod misc;
Expand All @@ -26,6 +27,8 @@ pub trait BaseMsg: Any + std::fmt::Debug + Send + Sync + 'static {}
pub trait Msg: BaseMsg + Default + Clone {
/// Message type as defined in NetMessageTypes.h
const ID: MsgId;
/// How long to wait to receive this message as a response
const RESPONSE_TIME_LIMIT: Duration = Duration::from_secs(2);
}

impl<M> BaseMsg for M where M: Msg {}
Expand Down
4 changes: 4 additions & 0 deletions shared/src/bee_msg/quota.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ pub struct GetQuotaInfoResp {

impl Msg for GetQuotaInfoResp {
const ID: MsgId = 2098;
/// This value is large enough to fix timeout issues for a customer whose storage servers
/// took >30m to process the requests. For anyone else, this should also be more than enough,
/// as on a typical system this isn't supposed to be taking that long by far.
const RESPONSE_TIME_LIMIT: Duration = Duration::from_mins(40);
}

/// Sets exceeded quota information on server nodes.
Expand Down
5 changes: 5 additions & 0 deletions shared/src/conn.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
//! Connection to other BeeGFS nodes

use std::time::Duration;

mod async_queue;
pub mod incoming;
pub mod msg_dispatch;
Expand All @@ -16,3 +18,6 @@ const TCP_BUF_LEN: usize = 4 * 1024 * 1024;
/// Must match the `DGRAMMR_(RECV|SEND)BUF_SIZE` value in `DatagramListener.*` in the C/C++
/// codebase. Must be smaller than TCP_BUF_LEN;
const UDP_BUF_LEN: usize = 65536;

/// Reasonable time limit for most write and read operations
const DEFAULT_TIME_LIMIT: Duration = Duration::from_secs(2);
6 changes: 4 additions & 2 deletions shared/src/conn/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ async fn read_stream(
stream_authentication_required: bool,
) -> Result<()> {
// Read header
stream.read_exact(&mut buf[0..Header::LEN]).await?;
stream
.read_exact(&mut buf[0..Header::LEN], DEFAULT_TIME_LIMIT)
.await?;

let header = deserialize_header(&buf[0..Header::LEN])?;

Expand All @@ -156,7 +158,7 @@ async fn read_stream(

// Read body
stream
.read_exact(&mut buf[Header::LEN..header.msg_len()])
.read_exact(&mut buf[Header::LEN..header.msg_len()], DEFAULT_TIME_LIMIT)
.await?;

// Forward to the dispatcher. The dispatcher is responsible for deserializing, dispatching to
Expand Down
5 changes: 4 additions & 1 deletion shared/src/conn/msg_dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use super::stream::Stream;
use crate::bee_msg::{Header, Msg, deserialize_body, serialize};
use crate::bee_serde::{Deserializable, Serializable};
use crate::conn::DEFAULT_TIME_LIMIT;
use anyhow::Result;
use std::fmt::Debug;
use std::future::Future;
Expand Down Expand Up @@ -40,7 +41,9 @@ pub struct StreamRequest<'a> {
impl Request for StreamRequest<'_> {
async fn respond<M: Msg + Serializable>(self, msg: &M) -> Result<()> {
let msg_len = serialize(msg, self.buf)?;
self.stream.write_all(&self.buf[0..msg_len]).await
self.stream
.write_all(&self.buf[0..msg_len], DEFAULT_TIME_LIMIT)
.await
}

fn authenticate_connection(&mut self) {
Expand Down
44 changes: 28 additions & 16 deletions shared/src/conn/outgoing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use super::store::Store;
use crate::bee_msg::misc::AuthenticateChannel;
use crate::bee_msg::{Header, Msg, deserialize_body, deserialize_header, serialize};
use crate::bee_serde::{Deserializable, Serializable};
use crate::conn::TCP_BUF_LEN;
use crate::conn::store::StoredStream;
use crate::conn::stream::Stream;
use crate::conn::{DEFAULT_TIME_LIMIT, TCP_BUF_LEN};
use crate::types::{AuthSecret, Uid};
use anyhow::{Context, Result, bail};
use std::fmt::Debug;
Expand Down Expand Up @@ -58,7 +58,9 @@ impl Pool {
let mut buf = self.store.pop_buf_or_create();

let msg_len = serialize(msg, &mut buf)?;
let resp_header = self.comm_stream(node_uid, &mut buf, msg_len, true).await?;
let resp_header = self
.comm_stream(node_uid, &mut buf, msg_len, Some(M::RESPONSE_TIME_LIMIT))
.await?;
let resp_msg = deserialize_body(&resp_header, &buf[Header::LEN..])?;

self.store.push_buf(buf);
Expand All @@ -75,7 +77,7 @@ impl Pool {
let mut buf = self.store.pop_buf_or_create();

let msg_len = serialize(msg, &mut buf)?;
self.comm_stream(node_uid, &mut buf, msg_len, false).await?;
self.comm_stream(node_uid, &mut buf, msg_len, None).await?;

self.store.push_buf(buf);

Expand All @@ -94,19 +96,21 @@ impl Pool {
/// 2. Get a permit that allows opening a new stream. Try to open a new stream using the
/// available addresses.
/// 3. Pop an open stream from the store, waiting until one gets available.
///
/// If `response_time_limit` is set to `Some(t)`, a response is expected.
async fn comm_stream(
&self,
node_uid: Uid,
buf: &mut [u8],
send_len: usize,
expect_response: bool,
response_time_limit: Option<Duration>,
) -> Result<Header> {
debug_assert_eq!(buf.len(), TCP_BUF_LEN);

// 1. Pop open streams until communication succeeds or none are left
while let Some(stream) = self.store.try_pop_stream(node_uid) {
match self
.write_and_read_stream(buf, stream, send_len, expect_response)
.write_and_read_stream(buf, stream, send_len, response_time_limit)
.await
{
Ok(header) => return Ok(header),
Expand All @@ -132,7 +136,7 @@ impl Pool {
continue;
}

match Stream::connect_tcp(addr).await {
match Stream::connect_tcp(addr, DEFAULT_TIME_LIMIT).await {
Ok(stream) => {
let mut stream = StoredStream::from_stream(stream, permit);

Expand All @@ -152,7 +156,7 @@ impl Pool {

stream
.as_mut()
.write_all(&auth_buf[0..msg_len])
.write_all(&auth_buf[0..msg_len], DEFAULT_TIME_LIMIT)
.await
.with_context(err_context)?;

Expand All @@ -162,7 +166,7 @@ impl Pool {
// Communication using the newly opened stream should usually not fail. If
// it does, abort. It might be better to just try the next address though.
let resp_header = self
.write_and_read_stream(buf, stream, send_len, expect_response)
.write_and_read_stream(buf, stream, send_len, response_time_limit)
.await
.with_context(err_context)?;

Expand All @@ -189,7 +193,7 @@ impl Pool {
})?;

let resp_header = self
.write_and_read_stream(buf, stream, send_len, expect_response)
.write_and_read_stream(buf, stream, send_len, response_time_limit)
.await
.with_context(|| {
format!("Communication using existing stream to node with uid {node_uid} failed")
Expand All @@ -205,20 +209,28 @@ impl Pool {
buf: &mut [u8],
mut stream: StoredStream<Uid>,
send_len: usize,
expect_response: bool,
response_time_limit: Option<Duration>,
) -> Result<Header> {
stream.as_mut().write_all(&buf[0..send_len]).await?;
stream
.as_mut()
.write_all(&buf[0..send_len], DEFAULT_TIME_LIMIT)
.await?;

let header = if expect_response {
// Read header
stream.as_mut().read_exact(&mut buf[0..Header::LEN]).await?;
let header = if let Some(tl) = response_time_limit {
// Read header - wait for the per-message defined time limit.
stream
.as_mut()
.read_exact(&mut buf[0..Header::LEN], tl)
.await?;
let header = deserialize_header(&buf[0..Header::LEN])?;

// Read body
// Read body - the per-message time limit has already been respected by the header
// above, for the rest of the data we can just use the default short limit.
stream
.as_mut()
.read_exact(&mut buf[Header::LEN..header.msg_len()])
.read_exact(&mut buf[Header::LEN..header.msg_len()], DEFAULT_TIME_LIMIT)
.await?;

header
} else {
Header::default()
Expand Down
23 changes: 10 additions & 13 deletions shared/src/conn/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::time::timeout;

const TIMEOUT: Duration = Duration::from_secs(2);

/// A connected generic stream.
///
/// Provides functionality to communicate with the connected peer. Can support multiple
Expand Down Expand Up @@ -39,9 +37,9 @@ impl From<TcpStream> for Stream {
impl Stream {
/// Connect to peer using TCP and obtain a [Stream] object.
///
/// Times out after [TIMEOUT].
pub async fn connect_tcp(addr: &SocketAddr) -> Result<Self> {
let stream = match timeout(TIMEOUT, TcpStream::connect(addr)).await {
/// Times out after [time_limit].
pub async fn connect_tcp(addr: &SocketAddr, time_limit: Duration) -> Result<Self> {
let stream = match timeout(time_limit, TcpStream::connect(addr)).await {
Ok(res) => res?,
Err(_) => bail!("Connecting a TCP stream to {addr} timed out"),
};
Expand Down Expand Up @@ -74,13 +72,12 @@ impl Stream {
/// Reads from the stream into the provided buffer.
///
/// The buffer will be filled completely before the future completes. Times out after
/// [TIMEOUT].
/// [time_limit].
///
/// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused.
// Clippy: Suppress false positive
#[allow(clippy::needless_pass_by_ref_mut)]
pub async fn read_exact(&mut self, buf: &mut [u8]) -> Result<()> {
match timeout(TIMEOUT, async {
pub async fn read_exact(&mut self, buf: &mut [u8], time_limit: Duration) -> Result<()> {
match timeout(time_limit, async {
match &mut self.stream {
InnerStream::Tcp(s) => {
s.read_exact(buf).await?;
Expand All @@ -98,11 +95,11 @@ impl Stream {
/// Writes to the stream from the provided buffer.
///
/// The buffer will be written completely before the future completes. Times out after
/// [TIMEOUT].
/// [time_limit].
///
/// **Important**: Not cancel safe. If a timeout occurs, the stream may not be reused.
pub async fn write_all(&mut self, buf: &[u8]) -> Result<()> {
match timeout(TIMEOUT, async {
pub async fn write_all(&mut self, buf: &[u8], time_limit: Duration) -> Result<()> {
match timeout(time_limit, async {
match &mut self.stream {
InnerStream::Tcp(s) => {
s.write_all(buf).await?;
Expand All @@ -113,7 +110,7 @@ impl Stream {
.await
{
Ok(res) => res,
Err(_) => Err(anyhow!("Writing to a stream to {} timed out", self.addr())),
Err(_) => Err(anyhow!("Writing to stream to {} timed out", self.addr())),
}
}

Expand Down
Loading