Skip to content

Commit 3cdb56e

Browse files
committed
First pass of Modulr Agent
0 parents  commit 3cdb56e

File tree

13 files changed

+1094
-0
lines changed

13 files changed

+1094
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
/target
2+
Cargo.lock

Cargo.toml

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
[package]
2+
name = "modulr_agent"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
webrtc = "0.10"
8+
tokio = { version = "1", features = ["full"] }
9+
tokio-tungstenite = { version = "0.20", features = ["native-tls"] }
10+
tungstenite = "0.20"
11+
serde = { version = "1", features = ["derive"] }
12+
serde_json = "1"
13+
anyhow = "1"
14+
futures-util = "0.3.31"
15+
gstreamer = "0.24.2"
16+
gstreamer-app = "0.24.2"
17+
bytes = "1.10.1"
18+
thiserror = "2.0.17"
19+
20+
# ROS 1 dependencies
21+
rosrust = { version = "0.9.12", optional = true }
22+
rosrust_msg = { version = "0.1.8", optional = true }
23+
24+
# ROS 2 dependencies
25+
rclrs = { version = "*", optional = true }
26+
geometry_msgs = { version = "*", optional = true }
27+
sensor_msgs = { version = "*", optional = true }
28+
std_msgs = { version = "*", optional = true }
29+
env_logger = "0.11.8"
30+
log = "0.4.28"
31+
32+
[features]
33+
default = ["ros2"]
34+
ros1 = ["dep:rosrust", "dep:rosrust_msg"]
35+
ros2 = [
36+
"dep:rclrs",
37+
"dep:geometry_msgs",
38+
"dep:sensor_msgs",
39+
"dep:std_msgs",
40+
]

README.md

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
# Modulr Robot Agent
2+
3+
This package contains the agent to be installed on any robot connecting to the Modulr infrastructure.
4+
5+
## Building the Package
6+
7+
This package has only been tested on Ubuntu systems. To build the package, you will need to install Rust: https://rustup.rs/
8+
9+
You will also need to install GStreamer:
10+
11+
```bash
12+
sudo apt install -y libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev \
13+
gstreamer1.0-plugins-base gstreamer1.0-plugins-good \
14+
gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly \
15+
gstreamer1.0-libav libgstrtspserver-1.0-dev libges-1.0-dev
16+
```
17+
18+
Finally, ROS is required. ROS 1 and ROS 2 are both supported.
19+
20+
## ROS 1
21+
22+
Follow the installation instructions here: https://wiki.ros.org/noetic/Installation/Ubuntu
23+
24+
Then, build the package by running:
25+
26+
```bash
27+
cargo build --features ros1 --no-default-features
28+
```
29+
30+
You can run the package with logging enabled using:
31+
32+
```bash
33+
RUST_LOG=debug cargo run --features ros1 --no-default-features
34+
```
35+
36+
*Alternatively, edit the Cargo.toml file such that ros1 is the default feature, then run without any of the --features or --no-default-features flags.*
37+
38+
## ROS 2
39+
40+
Follow the installation instructions here: https://docs.ros.org/en/jazzy/Installation/Ubuntu-Install-Debs.html
41+
42+
Note that any distro of ROS 2 should be compatible, but only Jazzy has been tested so far.
43+
44+
Some extra installation is required for Rust-related build tooling:
45+
46+
```bash
47+
sudo apt install -y git libclang-dev python3-pip python3-vcstool
48+
pip install git+https://github.com/colcon/colcon-cargo.git --break-system-packages
49+
pip install git+https://github.com/colcon/colcon-ros-cargo.git --break-system-packages
50+
```
51+
52+
Create a ROS 2 workspace for the package:
53+
54+
```bash
55+
mkdir -p ~/modulr_ws/src
56+
cd ~/modulr_ws
57+
git clone <this repo> src/modulr_agent
58+
vcs import src < src/modulr_agent/ros2rust.repos
59+
# Change the following line depending on your installed version of ROS 2
60+
vcs import src < src/ros2_rust/ros2_rust_jazzy.repos
61+
```
62+
63+
Build the package (first time command):
64+
65+
```bash
66+
cd ~/modulr_ws
67+
colcon build
68+
source install/setup.bash
69+
```
70+
71+
Build the package (subsequent calls):
72+
73+
```bash
74+
cd ~/modulr_ws
75+
colcon build --packages-select modulr_agent
76+
```
77+
78+
Run the package with debug logging:
79+
80+
```bash
81+
RUST_LOG=debug ./install/modulr_agent/bin/modulr_agent
82+
```
83+
84+
*Note that cargo commands should be enabled, including cargo build and cargo run, but these did not work on the first system. It is a TODO to fix this build issue.*

package.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
<package format="3">
2+
<name>modulr_agent</name>
3+
<version>0.1.0</version>
4+
<description></description>
5+
<maintainer email="michael@modulr.cloud">Michael Hart</maintainer>
6+
<license>MIT-0</license>
7+
8+
<depend>rclrs</depend>
9+
<depend>geometry_msgs</depend>
10+
<depend>sensor_msgs</depend>
11+
<depend>std_msgs</depend>
12+
13+
<export>
14+
<build_type>ament_cargo</build_type>
15+
</export>
16+
</package>

ros2rust.repos

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
repositories:
2+
ros2_rust:
3+
type: git
4+
url: https://github.com/ros2-rust/ros2_rust.git
5+
version: main

src/main.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
mod ros_bridge;
2+
mod video_pipeline;
3+
mod webrtc_link;
4+
mod webrtc_message;
5+
6+
use std::sync::Arc;
7+
8+
use anyhow::Result;
9+
use bytes::Bytes;
10+
use log::{info, warn};
11+
use tokio::sync::Mutex;
12+
13+
use crate::ros_bridge::RosBridge;
14+
use crate::video_pipeline::VideoPipeline;
15+
use crate::video_pipeline::VideoPipelineError;
16+
use crate::webrtc_link::WebRtcLink;
17+
use crate::webrtc_link::WebRtcLinkError;
18+
use crate::webrtc_message::WebRtcMessage;
19+
20+
#[tokio::main]
21+
async fn main() -> Result<()> {
22+
env_logger::init();
23+
24+
let robot_id = "robot1";
25+
let signaling_url = "ws://192.168.132.19:8765";
26+
let webrtc_link = Arc::new(Mutex::new(WebRtcLink::new(robot_id, signaling_url)));
27+
let pipeline = Arc::new(Mutex::new(VideoPipeline::new()));
28+
let bridge = Arc::new(Mutex::new(RosBridge::try_new()?));
29+
30+
info!("Creating system components and callbacks");
31+
32+
// Browser -> WebRTC -> ROS
33+
let bridge_clone = Arc::clone(&bridge);
34+
webrtc_link
35+
.lock()
36+
.await
37+
.on_webrtc_message(Box::new(move |msg: &WebRtcMessage| {
38+
let bridge_clone = Arc::clone(&bridge_clone);
39+
let msg_clone = msg.clone();
40+
Box::pin(async move {
41+
match msg_clone {
42+
WebRtcMessage::MovementCommand(cmd) => {
43+
bridge_clone
44+
.lock()
45+
.await
46+
.post_movement_command(&cmd)
47+
.expect("Failed to post movement command!");
48+
}
49+
}
50+
})
51+
}))
52+
.await;
53+
54+
// TODO: only queue frames in pipeline when WebRTC session is established
55+
56+
// Pipeline -> WebRTC -> Browser
57+
let webrtc_link_clone = Arc::clone(&webrtc_link);
58+
pipeline
59+
.lock()
60+
.await
61+
.on_frame_ready(Box::new(move |frame: &Bytes| {
62+
let webrtc_link_clone = Arc::clone(&webrtc_link_clone);
63+
let frame_clone = Bytes::copy_from_slice(frame);
64+
Box::pin(async move {
65+
let err = webrtc_link_clone
66+
.lock()
67+
.await
68+
.write_frame(frame_clone)
69+
.await;
70+
match err {
71+
Err(WebRtcLinkError::IncorrectStateForOperation) => {
72+
warn!("Still waiting for WebRTC pipeline to connect.")
73+
}
74+
Err(_) => {
75+
panic!("Failed to write frame!")
76+
}
77+
_ => (),
78+
}
79+
})
80+
}))
81+
.await;
82+
83+
// Robot frame -> Pipeline
84+
let pipeline_clone = Arc::clone(&pipeline);
85+
bridge
86+
.lock()
87+
.await
88+
.on_image_frame_received(Box::new(move |data: &Bytes| {
89+
let data_clone = Bytes::copy_from_slice(data);
90+
let pipeline_clone = Arc::clone(&pipeline_clone);
91+
Box::pin(async move {
92+
pipeline_clone
93+
.lock()
94+
.await
95+
.queue_frame(&data_clone)
96+
.await
97+
.expect("Failed to write camera frame to pipeline!");
98+
})
99+
}));
100+
101+
info!("Starting all tasks running");
102+
103+
tokio::spawn(async move {
104+
let mut guard = webrtc_link.lock().await;
105+
guard.try_connect().await?;
106+
guard.try_register().await?;
107+
Ok::<(), WebRtcLinkError>(())
108+
});
109+
let pipeline_clone = Arc::clone(&pipeline);
110+
tokio::spawn(async move {
111+
pipeline_clone.lock().await.launch().await?;
112+
info!("Spawned the launch task successfully");
113+
Ok::<(), VideoPipelineError>(())
114+
});
115+
bridge.lock().await.spin();
116+
117+
let _ = tokio::signal::ctrl_c().await;
118+
info!("Exit requested, cleaning up...");
119+
pipeline.lock().await.stop_pipeline().await?;
120+
Ok(())
121+
}

src/ros_bridge/mod.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
mod ros_bridge_error;
2+
use std::pin::Pin;
3+
4+
use bytes::Bytes;
5+
pub use ros_bridge_error::RosBridgeError;
6+
7+
pub type OnCameraImageHdlrFn =
8+
Box<dyn (FnMut(&Bytes) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>) + Send + Sync>;
9+
10+
#[cfg(feature = "ros1")]
11+
mod ros1_bridge;
12+
#[cfg(feature = "ros1")]
13+
pub use ros1_bridge::RosBridge;
14+
15+
#[cfg(feature = "ros2")]
16+
mod ros2_bridge;
17+
#[cfg(feature = "ros2")]
18+
pub use ros2_bridge::RosBridge;
19+
20+
#[cfg(all(feature = "ros1", feature = "ros2"))]
21+
compile_error!("select only one ros feature for RosBridge");

src/ros_bridge/ros1_bridge.rs

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use std::sync::{Arc, Mutex};
2+
3+
use bytes::Bytes;
4+
use log::{debug, error};
5+
use rosrust::{Publisher, Subscriber};
6+
use std::pin::Pin;
7+
use tokio::sync::mpsc;
8+
9+
use crate::{
10+
ros_bridge::{OnCameraImageHdlrFn, RosBridgeError},
11+
webrtc_message::MovementCommand,
12+
};
13+
14+
pub struct RosBridge {
15+
_image_sub: Subscriber,
16+
image_listeners: Arc<Mutex<Vec<RosBridgeOnFrameCallback>>>,
17+
mvmt_pub: Publisher<rosrust_msg::geometry_msgs::Twist>,
18+
}
19+
20+
impl RosBridge {
21+
pub fn try_new() -> Result<Self, RosBridgeError> {
22+
rosrust::init("modulr_agent");
23+
24+
let image_listeners: Arc<Mutex<Vec<OnCameraImageHdlrFn>>> = Arc::new(Mutex::new(vec![]));
25+
26+
let (tx, mut rx) = mpsc::channel::<Bytes>(10);
27+
28+
let listeners_clone = Arc::clone(image_listeners);
29+
let image_sub = rosrust::subscribe(
30+
"/camera/image_raw",
31+
1,
32+
move |v: rosrust_msg::sensor_msgs::Image| {
33+
let buffer = Bytes::from(v.data);
34+
if tx.try_send(buffer).is_err() {
35+
error!("Failed to send image buffer to channel!");
36+
}
37+
},
38+
)
39+
.map_err(|_| RosBridgeError::SubscriptionCreateFailure)?;
40+
41+
let listeners_clone = Arc::clone(&image_listeners);
42+
tokio::spawn(async move {
43+
while let Some(buffer) = rx.recv().await {
44+
// debug!("Received frame from ROS camera subscription");
45+
for listener in listeners_clone.lock().unwrap().iter_mut() {
46+
tokio::spawn(listener(&buffer));
47+
}
48+
}
49+
});
50+
51+
let mvmt_pub =
52+
rosrust::publish("/cmd_vel", 2).map_err(|_| RosBridgeError::PublisherCreateFailure)?;
53+
54+
Ok(Self {
55+
_image_sub: image_sub,
56+
image_listeners,
57+
mvmt_pub,
58+
})
59+
}
60+
61+
// Register a callback for camera images
62+
pub fn on_image_frame_received(&mut self, listener: OnCameraImageHdlrFn) {
63+
if let Ok(mut listeners) = self.image_listeners.try_lock() {
64+
listeners.push(listener);
65+
}
66+
}
67+
68+
// Post a new movement command
69+
pub fn post_movement_command(
70+
&mut self,
71+
command: &MovementCommand,
72+
) -> Result<(), RosBridgeError> {
73+
debug!("Received movement command: {:?}", command);
74+
let msg = rosrust_msg::geometry_msgs::Twist {
75+
linear: rosrust_msg::geometry_msgs::Vector3 {
76+
x: command.forward,
77+
y: 0f64,
78+
z: 0f64,
79+
},
80+
angular: rosrust_msg::geometry_msgs::Vector3 {
81+
x: 0f64,
82+
y: 0f64,
83+
z: command.turn,
84+
},
85+
};
86+
self.mvmt_pub
87+
.send(msg)
88+
.map_err(|_| RosBridgeError::PublishFailed)?;
89+
Ok(())
90+
}
91+
92+
pub fn spin(&self) {
93+
std::thread::spawn(|| {
94+
rosrust::spin();
95+
});
96+
}
97+
}

0 commit comments

Comments
 (0)