Hello,
First let me share how glad I am that this project exists.
Avoiding bringing in the whole ROS 2 ecosystem is a relief!
However I am having trouble running the publishers and subscribers properly in my program, which relies a lot on tokio.
Somehow the publisher never hears about the subscriber, and messages never pass.
So I took the new pub / sub test using Tokio, and run it on my machine (a Mac): it works fine.
Now, if I copy this exact test to my project (which I can't disclose), it fails with the timeout.
I tried aligning the versions of tokio, its features, but nothing changed: the test from this repo works, but the one from mine does not. I already had a similar in the past, and I worked it around by using the multi-thread runtime of tokio in my test. But this time, it does not work...
I rewrote the test to avoid any spawned task, and got it working in both projects:
use std::time::Duration;
use ros2_client::{Context, MessageTypeName, Name, NodeName, NodeOptions, DEFAULT_PUBLISHER_QOS};
#[tokio::test]
async fn pub_and_sub_no_spawn() {
let ctx = Context::new().unwrap();
let mut node = ctx
.new_node(
NodeName::new("/", "topic_test_sub_node").unwrap(),
NodeOptions::new(),
)
.unwrap();
let spinner = node.spinner().unwrap();
let topic = node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let subscriber = node.create_subscription::<String>(&topic, None).unwrap();
let topic = node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let publisher = node.create_publisher::<String>(&topic, None).unwrap();
let publishing = async {
// send messages every 0.25 seconds
loop {
publisher
.async_publish("hello subscriber!".into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(250)).await;
}
};
tokio::select! {
_ = spinner.spin() => {}
// if we don't get a message within five seconds, fail the test.
res = tokio::time::timeout(
Duration::from_secs(5),
subscriber.async_take(),
) => {
let (msg, _msg_info) = res
.expect("Test timed out - publisher never sent anything!")
.expect("we should've got a message.");
assert_eq!(msg, "hello subscriber!");
}
_ = publishing => {}
}
}
This version also works, with spawns, single-threaded, but with a single node and a single spin:
use std::time::Duration;
use ros2_client::{
Context, MessageTypeName, Name, NodeName, NodeOptions, DEFAULT_PUBLISHER_QOS,
};
#[tokio::test]
async fn pub_sub_spawn_single_node() {
let ctx = Context::new().unwrap();
let mut node = ctx
.new_node(
NodeName::new("/", "topic_test_sub_node").unwrap(),
NodeOptions::new(),
)
.unwrap();
let spinner = node.spinner().unwrap();
let topic = node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let subscriber = node.create_subscription::<String>(&topic, None).unwrap();
let topic = node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let publisher = node.create_publisher::<String>(&topic, None).unwrap();
let publishing = async move {
// send messages every 0.25 seconds
loop {
publisher
.async_publish("hello subscriber!".into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(250)).await;
}
};
let spinner_task = tokio::spawn(spinner.spin());
let subscriber_task = tokio::spawn(async move {
let res = tokio::time::timeout(Duration::from_secs(5), subscriber.async_take()).await;
let (msg, _msg_info) = res
.expect("Test timed out - publisher never sent anything!")
.expect("we should've got a message.");
assert_eq!(msg, "hello subscriber!");
});
let publishing_task = tokio::spawn(publishing);
// Wait for all tasks to complete
tokio::select! {
_ = spinner_task => {},
_ = subscriber_task => {},
_ = publishing_task => {},
};
}
And here with 2 nodes, 2 spinners, it breaks in my code:
use std::time::Duration;
use ros2_client::{
Context, MessageTypeName, Name, NodeName, NodeOptions, DEFAULT_PUBLISHER_QOS,
};
#[tokio::test]
async fn pub_sub_distinct_nodes() {
let ctx = Context::new().unwrap();
let mut pub_node = ctx
.new_node(
NodeName::new("/", "topic_test_pub_node").unwrap(),
NodeOptions::new(),
)
.unwrap();
let mut sub_node = ctx
.new_node(
NodeName::new("/", "topic_test_sub_node").unwrap(),
NodeOptions::new(),
)
.unwrap();
let pub_topic = pub_node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let subscriber = pub_node
.create_subscription::<String>(&pub_topic, None)
.unwrap();
let sub_topic = sub_node
.create_topic(
&Name::new("/", "pub_sub_topic").unwrap(),
MessageTypeName::new("std_msgs", "String"),
&DEFAULT_PUBLISHER_QOS.clone(),
)
.unwrap();
let publisher = pub_node
.create_publisher::<String>(&sub_topic, None)
.unwrap();
let publishing = async move {
// send messages every 0.25 seconds
loop {
publisher
.async_publish("hello subscriber!".into())
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(250)).await;
}
};
let pub_spinner_task = tokio::spawn(async move {
let spinner = pub_node.spinner().unwrap();
spinner.spin().await
});
let sub_spinner_task = tokio::spawn(async move {
let spinner = sub_node.spinner().unwrap();
spinner.spin().await
});
let subscriber_task = tokio::spawn(async move {
let res = tokio::time::timeout(Duration::from_secs(5), subscriber.async_take()).await;
let (msg, _msg_info) = res
.expect("Test timed out - publisher never sent anything!")
.expect("we should've got a message.");
assert_eq!(msg, "hello subscriber!");
});
let publishing_task = tokio::spawn(publishing);
// Wait for all tasks to complete
tokio::select! {
_ = pub_spinner_task => {},
_ = sub_spinner_task => {},
_ = subscriber_task => {},
_ = publishing_task => {},
};
}
My theory is that there is something racy and that complex causes make the scheduling different across cases.
For a moment I thought my Mac's Firewall was playing tricks to me, but I allowed both programs. Then I also tested with the Firewall disabled, it made no difference.
I also had a look at QoS: there was the same QoS policy used everywhere. If I change the QoS between the subscriber and publisher (having each use its respective default QoS policy), even my first version with no spawn fails. To me this test rules out the fact that QoS is at play.
Does anyone understand what happens?
Is it how it's supposed to work?
Hello,
First let me share how glad I am that this project exists.
Avoiding bringing in the whole ROS 2 ecosystem is a relief!
However I am having trouble running the publishers and subscribers properly in my program, which relies a lot on tokio.
Somehow the publisher never hears about the subscriber, and messages never pass.
So I took the new pub / sub test using Tokio, and run it on my machine (a Mac): it works fine.
Now, if I copy this exact test to my project (which I can't disclose), it fails with the timeout.
I tried aligning the versions of tokio, its features, but nothing changed: the test from this repo works, but the one from mine does not. I already had a similar in the past, and I worked it around by using the multi-thread runtime of tokio in my test. But this time, it does not work...
I rewrote the test to avoid any spawned task, and got it working in both projects:
This version also works, with spawns, single-threaded, but with a single node and a single spin:
And here with 2 nodes, 2 spinners, it breaks in my code:
My theory is that there is something racy and that complex causes make the scheduling different across cases.
For a moment I thought my Mac's Firewall was playing tricks to me, but I allowed both programs. Then I also tested with the Firewall disabled, it made no difference.
I also had a look at QoS: there was the same QoS policy used everywhere. If I change the QoS between the subscriber and publisher (having each use its respective default QoS policy), even my first version with no spawn fails. To me this test rules out the fact that QoS is at play.
Does anyone understand what happens?
Is it how it's supposed to work?