From b7d8e45470ce70a88799f892bdb7b3d8776d0331 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Wed, 3 Jun 2026 23:59:44 +0200 Subject: [PATCH 1/2] plugin: Add GET_PER_COMMITMENT_POINT to stuck HSM types Type 18 (WIRE_HSMD_GET_PER_COMMITMENT_POINT) is used by onchaind to derive historical commitment keys during force-close resolution. If the signer doesn't respond it blocks block processing just like the signing types already in the list. Observed on several deeply-lagged nodes where onchaind fires for a closing channel and gets stuck on a GET_PER_COMMITMENT_POINT request before the 10-minute bgsync preemption kicks in. Co-Authored-By: Claude Sonnet 4.6 --- libs/gl-plugin/src/stager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/gl-plugin/src/stager.rs b/libs/gl-plugin/src/stager.rs index 2919a352a..8c21a87ef 100644 --- a/libs/gl-plugin/src/stager.rs +++ b/libs/gl-plugin/src/stager.rs @@ -105,6 +105,7 @@ impl Stage { 12, // WIRE_HSMD_SIGN_DELAYED_PAYMENT_TO_US 13, // WIRE_HSMD_SIGN_REMOTE_HTLC_TO_US 14, // WIRE_HSMD_SIGN_PENALTY_TO_US + 18, // WIRE_HSMD_GET_PER_COMMITMENT_POINT (onchaind key derivation) 20, // WIRE_HSMD_SIGN_REMOTE_HTLC_TX 21, // WIRE_HSMD_SIGN_MUTUAL_CLOSE_TX 28, // WIRE_HSMD_CHECK_PUBKEY From 581e8f60c79044cb65ea25e9a963b7cd8fff6a91 Mon Sep 17 00:00:00 2001 From: Christian Decker Date: Sun, 7 Jun 2026 17:51:25 +0200 Subject: [PATCH 2/2] gl-signerproxy: replace shared block_on with message-passing gRPC dispatcher A single current_thread Tokio runtime was shared across all OS threads via Arc. When onchaind called block_on() for a type 143 request and blocked forever (no signer connected), no other thread could enter block_on, so type 27 init requests and all subsequent HSM calls were serialised behind the stuck type 143. This prevented type 143 from ever reaching the plugin stager, so stuck_request_types() returned empty and the bgsync session ran the full 10-minute timeout instead of aborting early. Fix: keep one runtime, remove direct block_on from handler threads. Add an mpsc channel to a grpc_dispatcher async task that spawns each gRPC call as an independent tokio::spawn'd task. Handler threads block on oneshot::blocking_recv for their own response; a permanently-stuck type 143 task cannot delay any other task. Adds a regression test that confirms a type 143 lockup does not block a concurrent type 27 request on a different connection. Co-Authored-By: Claude Sonnet 4.6 --- libs/gl-signerproxy/Cargo.toml | 5 +- libs/gl-signerproxy/src/hsmproxy.rs | 233 ++++++++++++++++++++++++++-- 2 files changed, 220 insertions(+), 18 deletions(-) diff --git a/libs/gl-signerproxy/Cargo.toml b/libs/gl-signerproxy/Cargo.toml index 366938742..9be8af230 100644 --- a/libs/gl-signerproxy/Cargo.toml +++ b/libs/gl-signerproxy/Cargo.toml @@ -21,7 +21,7 @@ tonic-build = "0.8" anyhow = { workspace = true } env_logger = { workspace = true } # Minimal tokio - only for gRPC client runtime -tokio = { version = "1", features = ["rt", "net", "io-util"] } +tokio = { version = "1", features = ["rt", "net", "io-util", "sync"] } tonic = { version = "0.8", features = ["tls", "transport"] } prost = "0.11" log = "*" @@ -29,3 +29,6 @@ tower = "0.4" which = "4.4.2" libc = "0.2" byteorder = "1.5.0" + +[dev-dependencies] +tokio = { version = "1", features = ["time"] } diff --git a/libs/gl-signerproxy/src/hsmproxy.rs b/libs/gl-signerproxy/src/hsmproxy.rs index 36790e187..8a7d8c46d 100644 --- a/libs/gl-signerproxy/src/hsmproxy.rs +++ b/libs/gl-signerproxy/src/hsmproxy.rs @@ -1,6 +1,6 @@ // Implementation of the server-side hsmd. It collects requests and passes // them on to the clients which actually have access to the keys. -use crate::pb::{hsm_client::HsmClient, Empty, HsmRequest, HsmRequestContext}; +use crate::pb::{hsm_client::HsmClient, Empty, HsmRequest, HsmRequestContext, HsmResponse}; use crate::wire::{DaemonConnection, Message}; use anyhow::{anyhow, Context}; use anyhow::{Error, Result}; @@ -16,6 +16,7 @@ use std::sync::atomic; use std::sync::Arc; use std::thread; use tokio::runtime::Runtime; +use tokio::sync::{mpsc, oneshot}; use tonic::transport::{Endpoint, Uri}; use tower::service_fn; use which::which; @@ -46,14 +47,51 @@ fn setup_node_stream() -> Result { Ok(DaemonConnection::new(ms)) } +/// Messages sent from blocking handler threads to the async gRPC dispatcher. +enum GrpcMessage { + Ping { + response_tx: oneshot::Sender>, + }, + Request { + request: HsmRequest, + response_tx: oneshot::Sender>, + }, +} + +/// Async dispatcher running on the single Tokio runtime. Each message is +/// spawned as an independent task so a stuck gRPC call (e.g. type 143 waiting +/// for a signer that never connects) cannot block other concurrent requests. +async fn grpc_dispatcher(server: GrpcClient, mut rx: mpsc::Receiver) { + while let Some(msg) = rx.recv().await { + let mut s = server.clone(); + tokio::spawn(async move { + match msg { + GrpcMessage::Ping { response_tx } => { + let res = s.ping(Empty::default()).await.map(|_| ()); + let _ = response_tx.send(res); + } + GrpcMessage::Request { + request, + response_tx, + } => { + let res = s + .request(tonic::Request::new(request)) + .await + .map(|r| r.into_inner()); + let _ = response_tx.send(res); + } + } + }); + } +} + fn start_handler( local: NodeConnection, counter: Arc, - grpc: GrpcClient, - runtime: Arc, + tx: mpsc::Sender, ) { thread::spawn(move || { - match process_requests(local, counter, grpc, runtime).context("processing requests") { + match process_requests(local, counter, tx).context("processing requests") { Ok(()) => panic!("why did the hsmproxy stop processing requests without an error?"), Err(e) => warn!("hsmproxy stopped processing requests with error: {}", e), } @@ -63,13 +101,20 @@ fn start_handler( fn process_requests( node_conn: NodeConnection, request_counter: Arc, - mut server: GrpcClient, - runtime: Arc, + tx: mpsc::Sender, ) -> Result<(), Error> { let conn = node_conn.conn; let context = node_conn.context; + info!("Pinging server"); - runtime.block_on(server.ping(Empty::default()))?; + let (ping_tx, ping_rx) = oneshot::channel(); + tx.blocking_send(GrpcMessage::Ping { response_tx: ping_tx }) + .context("dispatcher gone")?; + ping_rx + .blocking_recv() + .context("dispatcher dropped before ping response")? + .map_err(|e| anyhow!("ping failed: {}", e))?; + loop { if let Ok(msg) = conn.read() { match msg.msgtype() { @@ -89,9 +134,7 @@ fn process_requests( let remote = remote.as_raw_fd(); let msg = Message::new_with_fds(vec![0, 109], &vec![remote]); - let grpc = server.clone(); - // Start new handler for the client - start_handler(local, request_counter.clone(), grpc, runtime.clone()); + start_handler(local, request_counter.clone(), tx.clone()); if let Err(e) = conn.write(msg) { error!("error writing msg to node_connection: {:?}", e); return Err(e); @@ -104,21 +147,32 @@ fn process_requests( }, _ => { // By default we forward to the remote HSMd - let req = tonic::Request::new(HsmRequest { + let req = HsmRequest { context: context.clone(), raw: msg.body.clone(), request_id: request_counter.fetch_add(1, atomic::Ordering::Relaxed) as u32, requests: Vec::new(), signer_state: Vec::new(), - }); + }; eprintln!( "WIRE: lightningd -> hsmd: Got a message from node: {:?}", &req ); - let start_time = tokio::time::Instant::now(); + let start_time = std::time::Instant::now(); debug!("Got a message from node: {:?}", &req); - let res = runtime.block_on(server.request(req))?.into_inner(); + + let (resp_tx, resp_rx) = oneshot::channel(); + tx.blocking_send(GrpcMessage::Request { + request: req, + response_tx: resp_tx, + }) + .context("dispatcher gone")?; + let res = resp_rx + .blocking_recv() + .context("dispatcher dropped before response")? + .map_err(|e| anyhow!("gRPC error: {}", e))?; + let delta = start_time.elapsed(); let msg = Message::from_raw(res.raw); eprintln!( @@ -163,6 +217,143 @@ fn grpc_connect(runtime: &Runtime) -> Result { }) } +#[cfg(test)] +mod tests { + use super::*; + use crate::pb::HsmResponse; + use byteorder::{BigEndian, ByteOrder}; + use std::io::{Read, Write}; + use std::os::unix::net::UnixStream as StdUnixStream; + use std::sync::atomic::AtomicUsize; + use std::time::{Duration, Instant}; + + /// Write a CLN wire message: 4-byte big-endian length prefix followed by body. + fn write_cln_msg(stream: &mut StdUnixStream, body: &[u8]) { + let mut len_buf = [0u8; 4]; + BigEndian::write_u32(&mut len_buf, body.len() as u32); + stream.write_all(&len_buf).unwrap(); + stream.write_all(body).unwrap(); + } + + /// Read a CLN wire message, returning the body (without the length prefix). + fn read_cln_msg(stream: &mut StdUnixStream) -> Vec { + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).unwrap(); + let len = BigEndian::read_u32(&len_buf) as usize; + let mut buf = vec![0u8; len]; + stream.read_exact(&mut buf).unwrap(); + buf + } + + /// Starts a mock gRPC dispatcher on a background thread. + /// + /// Pings are acked immediately. Type 143 requests are held for 60 s to + /// simulate a signer that never connects. All other requests respond at once. + fn start_mock_dispatcher(mut rx: mpsc::Receiver) { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { + while let Some(msg) = rx.recv().await { + tokio::spawn(async move { + match msg { + GrpcMessage::Ping { response_tx } => { + let _ = response_tx.send(Ok(())); + } + GrpcMessage::Request { + request, + response_tx, + } => { + let type_id = ((request.raw[0] as u16) << 8) + | request.raw[1] as u16; + if type_id == 143 { + // Simulate a signer that never arrives. + tokio::time::sleep(Duration::from_secs(60)).await; + } + let _ = response_tx.send(Ok(HsmResponse { + request_id: request.request_id, + raw: vec![0, 0], + signer_state: vec![], + error: "".into(), + })); + } + } + }); + } + }); + }); + } + + /// Spawns a `process_requests` loop in a background thread using the given + /// socket end and channel sender. + fn spawn_proxy_connection( + proxy_end: StdUnixStream, + counter: Arc, + tx: mpsc::Sender, + ) { + std::thread::spawn(move || { + let conn = NodeConnection { + conn: DaemonConnection::new(proxy_end), + context: None, + }; + let _ = process_requests(conn, counter, tx); + }); + } + + /// A type 143 request that blocks indefinitely (no signer) must not + /// prevent a concurrent type 27 request on a different connection from + /// completing in a timely fashion. + #[test] + fn type_143_lockup_does_not_block_other_requests() { + let (tx, rx) = mpsc::channel::(64); + start_mock_dispatcher(rx); + + let counter = Arc::new(AtomicUsize::new(1000)); + + // Connection 1 — onchaind analogue; will send the blocking type 143. + let (mut cln_1, proxy_1) = StdUnixStream::pair().unwrap(); + spawn_proxy_connection(proxy_1, counter.clone(), tx.clone()); + + // Connection 2 — any other subdaemon; should be unaffected. + let (mut cln_2, proxy_2) = StdUnixStream::pair().unwrap(); + spawn_proxy_connection(proxy_2, counter.clone(), tx.clone()); + + // Allow both connections to complete the startup ping. + std::thread::sleep(Duration::from_millis(100)); + + // Send a blocking type 143 on connection 1. + write_cln_msg(&mut cln_1, &[0, 143, 0, 0, 0, 0]); + + // Brief pause so the type 143 task is in-flight before we send the + // next request — this is the race condition the fix resolves. + std::thread::sleep(Duration::from_millis(50)); + + // Send a fast type 27 on connection 2. + write_cln_msg(&mut cln_2, &[0, 27, 0, 0]); + + // The type 27 response must arrive well before the 60-second type 143 + // timeout; 5 seconds is a generous bound for CI. + cln_2 + .set_read_timeout(Some(Duration::from_secs(5))) + .unwrap(); + let start = Instant::now(); + let response = read_cln_msg(&mut cln_2); + let elapsed = start.elapsed(); + + assert!( + elapsed < Duration::from_secs(5), + "type 27 response took {:?} — it was blocked by the type 143 request", + elapsed + ); + assert!( + !response.is_empty(), + "expected a non-empty response to the type 27 request" + ); + } +} + pub fn run() -> Result<(), Error> { let args: Vec = std::env::args().collect(); @@ -184,16 +375,24 @@ pub fn run() -> Result<(), Error> { .context("failed to create tokio runtime")?, ); - let node = setup_node_stream()?; + let (tx, rx) = mpsc::channel::(64); let grpc = grpc_connect(&runtime)?; + // Dedicated thread drives the runtime and the async dispatcher. + // Each incoming GrpcMessage is spawned as an independent task so + // concurrent requests cannot block each other. + let rt = runtime.clone(); + thread::spawn(move || { + rt.block_on(grpc_dispatcher(grpc, rx)); + }); + + let node = setup_node_stream()?; process_requests( NodeConnection { conn: node, context: None, }, request_counter, - grpc, - runtime, + tx, ) }