Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libs/gl-plugin/src/stager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ impl Stage {
12, // WIRE_HSMD_SIGN_DELAYED_PAYMENT_TO_US
13, // WIRE_HSMD_SIGN_REMOTE_HTLC_TO_US
14, // WIRE_HSMD_SIGN_PENALTY_TO_US
18, // WIRE_HSMD_GET_PER_COMMITMENT_POINT (onchaind key derivation)
20, // WIRE_HSMD_SIGN_REMOTE_HTLC_TX
21, // WIRE_HSMD_SIGN_MUTUAL_CLOSE_TX
28, // WIRE_HSMD_CHECK_PUBKEY
Expand Down
5 changes: 4 additions & 1 deletion libs/gl-signerproxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ tonic-build = "0.8"
anyhow = { workspace = true }
env_logger = { workspace = true }
# Minimal tokio - only for gRPC client runtime
tokio = { version = "1", features = ["rt", "net", "io-util"] }
tokio = { version = "1", features = ["rt", "net", "io-util", "sync"] }
tonic = { version = "0.8", features = ["tls", "transport"] }
prost = "0.11"
log = "*"
tower = "0.4"
which = "4.4.2"
libc = "0.2"
byteorder = "1.5.0"

[dev-dependencies]
tokio = { version = "1", features = ["time"] }
233 changes: 216 additions & 17 deletions libs/gl-signerproxy/src/hsmproxy.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Implementation of the server-side hsmd. It collects requests and passes
// them on to the clients which actually have access to the keys.
use crate::pb::{hsm_client::HsmClient, Empty, HsmRequest, HsmRequestContext};
use crate::pb::{hsm_client::HsmClient, Empty, HsmRequest, HsmRequestContext, HsmResponse};
use crate::wire::{DaemonConnection, Message};
use anyhow::{anyhow, Context};
use anyhow::{Error, Result};
Expand All @@ -16,6 +16,7 @@ use std::sync::atomic;
use std::sync::Arc;
use std::thread;
use tokio::runtime::Runtime;
use tokio::sync::{mpsc, oneshot};
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;
use which::which;
Expand Down Expand Up @@ -46,14 +47,51 @@ fn setup_node_stream() -> Result<DaemonConnection, Error> {
Ok(DaemonConnection::new(ms))
}

/// Messages sent from blocking handler threads to the async gRPC dispatcher.
enum GrpcMessage {
Ping {
response_tx: oneshot::Sender<Result<(), tonic::Status>>,
},
Request {
request: HsmRequest,
response_tx: oneshot::Sender<Result<HsmResponse, tonic::Status>>,
},
}

/// Async dispatcher running on the single Tokio runtime. Each message is
/// spawned as an independent task so a stuck gRPC call (e.g. type 143 waiting
/// for a signer that never connects) cannot block other concurrent requests.
async fn grpc_dispatcher(server: GrpcClient, mut rx: mpsc::Receiver<GrpcMessage>) {
while let Some(msg) = rx.recv().await {
let mut s = server.clone();
tokio::spawn(async move {
match msg {
GrpcMessage::Ping { response_tx } => {
let res = s.ping(Empty::default()).await.map(|_| ());
let _ = response_tx.send(res);
}
GrpcMessage::Request {
request,
response_tx,
} => {
let res = s
.request(tonic::Request::new(request))
.await
.map(|r| r.into_inner());
let _ = response_tx.send(res);
}
}
});
}
}

fn start_handler(
local: NodeConnection,
counter: Arc<atomic::AtomicUsize>,
grpc: GrpcClient,
runtime: Arc<Runtime>,
tx: mpsc::Sender<GrpcMessage>,
) {
thread::spawn(move || {
match process_requests(local, counter, grpc, runtime).context("processing requests") {
match process_requests(local, counter, tx).context("processing requests") {
Ok(()) => panic!("why did the hsmproxy stop processing requests without an error?"),
Err(e) => warn!("hsmproxy stopped processing requests with error: {}", e),
}
Expand All @@ -63,13 +101,20 @@ fn start_handler(
fn process_requests(
node_conn: NodeConnection,
request_counter: Arc<atomic::AtomicUsize>,
mut server: GrpcClient,
runtime: Arc<Runtime>,
tx: mpsc::Sender<GrpcMessage>,
) -> Result<(), Error> {
let conn = node_conn.conn;
let context = node_conn.context;

info!("Pinging server");
runtime.block_on(server.ping(Empty::default()))?;
let (ping_tx, ping_rx) = oneshot::channel();
tx.blocking_send(GrpcMessage::Ping { response_tx: ping_tx })
.context("dispatcher gone")?;
ping_rx
.blocking_recv()
.context("dispatcher dropped before ping response")?
.map_err(|e| anyhow!("ping failed: {}", e))?;

loop {
if let Ok(msg) = conn.read() {
match msg.msgtype() {
Expand All @@ -89,9 +134,7 @@ fn process_requests(
let remote = remote.as_raw_fd();
let msg = Message::new_with_fds(vec![0, 109], &vec![remote]);

let grpc = server.clone();
// Start new handler for the client
start_handler(local, request_counter.clone(), grpc, runtime.clone());
start_handler(local, request_counter.clone(), tx.clone());
if let Err(e) = conn.write(msg) {
error!("error writing msg to node_connection: {:?}", e);
return Err(e);
Expand All @@ -104,21 +147,32 @@ fn process_requests(
},
_ => {
// By default we forward to the remote HSMd
let req = tonic::Request::new(HsmRequest {
let req = HsmRequest {
context: context.clone(),
raw: msg.body.clone(),
request_id: request_counter.fetch_add(1, atomic::Ordering::Relaxed) as u32,
requests: Vec::new(),
signer_state: Vec::new(),
});
};

eprintln!(
"WIRE: lightningd -> hsmd: Got a message from node: {:?}",
&req
);
let start_time = tokio::time::Instant::now();
let start_time = std::time::Instant::now();
debug!("Got a message from node: {:?}", &req);
let res = runtime.block_on(server.request(req))?.into_inner();

let (resp_tx, resp_rx) = oneshot::channel();
tx.blocking_send(GrpcMessage::Request {
request: req,
response_tx: resp_tx,
})
.context("dispatcher gone")?;
let res = resp_rx
.blocking_recv()
.context("dispatcher dropped before response")?
.map_err(|e| anyhow!("gRPC error: {}", e))?;

let delta = start_time.elapsed();
let msg = Message::from_raw(res.raw);
eprintln!(
Expand Down Expand Up @@ -163,6 +217,143 @@ fn grpc_connect(runtime: &Runtime) -> Result<GrpcClient, Error> {
})
}

#[cfg(test)]
mod tests {
use super::*;
use crate::pb::HsmResponse;
use byteorder::{BigEndian, ByteOrder};
use std::io::{Read, Write};
use std::os::unix::net::UnixStream as StdUnixStream;
use std::sync::atomic::AtomicUsize;
use std::time::{Duration, Instant};

/// Write a CLN wire message: 4-byte big-endian length prefix followed by body.
fn write_cln_msg(stream: &mut StdUnixStream, body: &[u8]) {
let mut len_buf = [0u8; 4];
BigEndian::write_u32(&mut len_buf, body.len() as u32);
stream.write_all(&len_buf).unwrap();
stream.write_all(body).unwrap();
}

/// Read a CLN wire message, returning the body (without the length prefix).
fn read_cln_msg(stream: &mut StdUnixStream) -> Vec<u8> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf).unwrap();
let len = BigEndian::read_u32(&len_buf) as usize;
let mut buf = vec![0u8; len];
stream.read_exact(&mut buf).unwrap();
buf
}

/// Starts a mock gRPC dispatcher on a background thread.
///
/// Pings are acked immediately. Type 143 requests are held for 60 s to
/// simulate a signer that never connects. All other requests respond at once.
fn start_mock_dispatcher(mut rx: mpsc::Receiver<GrpcMessage>) {
std::thread::spawn(move || {
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
while let Some(msg) = rx.recv().await {
tokio::spawn(async move {
match msg {
GrpcMessage::Ping { response_tx } => {
let _ = response_tx.send(Ok(()));
}
GrpcMessage::Request {
request,
response_tx,
} => {
let type_id = ((request.raw[0] as u16) << 8)
| request.raw[1] as u16;
if type_id == 143 {
// Simulate a signer that never arrives.
tokio::time::sleep(Duration::from_secs(60)).await;
}
let _ = response_tx.send(Ok(HsmResponse {
request_id: request.request_id,
raw: vec![0, 0],
signer_state: vec![],
error: "".into(),
}));
}
}
});
}
});
});
}

/// Spawns a `process_requests` loop in a background thread using the given
/// socket end and channel sender.
fn spawn_proxy_connection(
proxy_end: StdUnixStream,
counter: Arc<AtomicUsize>,
tx: mpsc::Sender<GrpcMessage>,
) {
std::thread::spawn(move || {
let conn = NodeConnection {
conn: DaemonConnection::new(proxy_end),
context: None,
};
let _ = process_requests(conn, counter, tx);
});
}

/// A type 143 request that blocks indefinitely (no signer) must not
/// prevent a concurrent type 27 request on a different connection from
/// completing in a timely fashion.
#[test]
fn type_143_lockup_does_not_block_other_requests() {
let (tx, rx) = mpsc::channel::<GrpcMessage>(64);
start_mock_dispatcher(rx);

let counter = Arc::new(AtomicUsize::new(1000));

// Connection 1 — onchaind analogue; will send the blocking type 143.
let (mut cln_1, proxy_1) = StdUnixStream::pair().unwrap();
spawn_proxy_connection(proxy_1, counter.clone(), tx.clone());

// Connection 2 — any other subdaemon; should be unaffected.
let (mut cln_2, proxy_2) = StdUnixStream::pair().unwrap();
spawn_proxy_connection(proxy_2, counter.clone(), tx.clone());

// Allow both connections to complete the startup ping.
std::thread::sleep(Duration::from_millis(100));

// Send a blocking type 143 on connection 1.
write_cln_msg(&mut cln_1, &[0, 143, 0, 0, 0, 0]);

// Brief pause so the type 143 task is in-flight before we send the
// next request — this is the race condition the fix resolves.
std::thread::sleep(Duration::from_millis(50));

// Send a fast type 27 on connection 2.
write_cln_msg(&mut cln_2, &[0, 27, 0, 0]);

// The type 27 response must arrive well before the 60-second type 143
// timeout; 5 seconds is a generous bound for CI.
cln_2
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
let start = Instant::now();
let response = read_cln_msg(&mut cln_2);
let elapsed = start.elapsed();

assert!(
elapsed < Duration::from_secs(5),
"type 27 response took {:?} — it was blocked by the type 143 request",
elapsed
);
assert!(
!response.is_empty(),
"expected a non-empty response to the type 27 request"
);
}
}

pub fn run() -> Result<(), Error> {
let args: Vec<String> = std::env::args().collect();

Expand All @@ -184,16 +375,24 @@ pub fn run() -> Result<(), Error> {
.context("failed to create tokio runtime")?,
);

let node = setup_node_stream()?;
let (tx, rx) = mpsc::channel::<GrpcMessage>(64);
let grpc = grpc_connect(&runtime)?;

// Dedicated thread drives the runtime and the async dispatcher.
// Each incoming GrpcMessage is spawned as an independent task so
// concurrent requests cannot block each other.
let rt = runtime.clone();
thread::spawn(move || {
rt.block_on(grpc_dispatcher(grpc, rx));
});

let node = setup_node_stream()?;
process_requests(
NodeConnection {
conn: node,
context: None,
},
request_counter,
grpc,
runtime,
tx,
)
}
Loading