Skip to content

Publisher sends data but Subscriber cannot receive on Windows 11 - No RTPS packets captured in Wireshark #399

@781226451

Description

@781226451

Environment:

  • OS: Windows 11
  • RustDDS version: 0.11.8
  • Working platforms: Linux (Ubuntu 22.04), macOS
  • Network setup: Same machine, loopback/localhost

Description:

On Windows 11, a publisher successfully sends data but the subscriber cannot receive any messages. Wireshark captures show no RTPS packets are being transmitted, suggesting the issue is at the network layer.
The same code works correctly on Linux (Ubuntu 22.04) and macOS, where:

  • Publisher and subscriber communicate successfully
  • Wireshark captures RTPS packets with vendorId=01.18 (Atostek - RustDDS)

Steps to Reproduce:

  1. Install npcap and manually place Packet.lib in pnet project's lib/x64 directory (workaround for issue using pnet may break builds #375)
  2. Run the attached example code on Windows 11
  3. Monitor network traffic with Wireshark while running the application

Code

use rustdds::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::sync::Barrier;
use std::thread;
use std::time::Duration;

#[derive(Debug, Clone, Serialize, Deserialize)]
struct MyMessage {
    id: i32,
    content: String,
}

const MAX_MESSAGE_SIZE: i32 = 1000;

fn main() {
    env_logger::Builder::from_default_env()
        .filter_level(log::LevelFilter::Info)
        .init();

    println!("Starting RustDDS pub/sub example...\n");

    let domain_id = 0;
    let participant = DomainParticipant::new(domain_id).expect("Failed to create DomainParticipant");

    let topic_name = "MyMessageTopic";

    let qos = QosPolicyBuilder::new()
        .reliability(policy::Reliability::Reliable {
            max_blocking_time: rustdds::Duration::from_secs(1),
        })
        .build();

    let topic = participant
        .create_topic(topic_name.to_string(), "MyMessage".to_string(), &qos, TopicKind::NoKey)
        .expect("Failed to create topic");

    println!("Created topic: {}\n", topic_name);

    let participant_pub = participant.clone();
    let topic_pub = topic.clone();
    let participant_sub = participant.clone();
    let topic_sub = topic.clone();

    let qos_cloned = qos.clone();
    let qos_cloned2 = qos.clone();

    // Improvement 1: Use Barrier to synchronize the startup of both threads
    let barrier = Arc::new(Barrier::new(2));
    let barrier_pub = barrier.clone();
    let barrier_sub = barrier.clone();

    // Thread 1: Publisher
    let publisher_thread = thread::spawn(move || {
        println!("[Publisher Thread] Starting...");

        let publisher = participant_pub
            .create_publisher(&qos_cloned)
            .expect("Failed to create publisher");

        let writer = publisher
            .create_datawriter_no_key::<MyMessage, CDRSerializerAdapter<_>>(&topic_pub, None)
            .expect("Failed to create data writer");

        println!("[Publisher Thread] DataWriter created");

        // Wait for Subscriber to be ready as well
        barrier_pub.wait();

        // Improvement 2: Give discovery mechanism some time to establish connection
        thread::sleep(Duration::from_millis(500));
        println!("[Publisher Thread] Starting to send messages\n");

        for i in 1..=MAX_MESSAGE_SIZE {
            let message = MyMessage {
                id: i,
                content: format!("Message number {}", i),
            };

            match writer.write(message.clone(), None) {
                Ok(_) => println!("[Publisher] Sent: {:?}", message),
                Err(e) => eprintln!("[Publisher] Failed to send: {:?}", e),
            }

            thread::sleep(Duration::from_millis(500)); // Shorter interval to speed up testing
        }

        println!("\n[Publisher Thread] Finished sending messages");
    });

    // Thread 2: Subscriber
    let subscriber_thread = thread::spawn(move || {
        println!("[Subscriber Thread] Starting...");

        let subscriber = participant_sub
            .create_subscriber(&qos_cloned2)
            .expect("Failed to create subscriber");

        let mut reader = subscriber
            .create_datareader_no_key::<MyMessage, CDRDeserializerAdapter<_>>(&topic_sub, None)
            .expect("Failed to create datareader");

        println!("[Subscriber Thread] DataReader created");

        // Wait for Publisher to be ready as well
        barrier_sub.wait();
        println!("[Subscriber Thread] Starting to receive messages\n");

        let mut received_count = 0;
        let mut last_id_received = 0;

        // Improvement 3: Add timeout mechanism to ensure receiving all messages
        let timeout_duration = Duration::from_secs(15);
        let start_time = std::time::Instant::now();
        let mut no_data_count = 0;

        while received_count < MAX_MESSAGE_SIZE && start_time.elapsed() < timeout_duration {
            thread::sleep(Duration::from_millis(200));

            match reader.take_next_sample() {
                Ok(Some(sample)) => {
                    println!("[Subscriber] Received: id={}, content={}", sample.value().id, sample.value().content);

                    last_id_received = sample.value().id;
                    received_count += 1;
                    no_data_count = 0; // Reset no-data counter
                }
                Ok(None) => {
                    no_data_count += 1;
                    // If no data received multiple times in a row, still continue waiting
                }
                Err(e) => {
                    eprintln!("[Subscriber] Error reading sample: {:?}", e);
                }
            }
        }

        println!("\n[Subscriber Thread] Finished receiving {} messages (last id: {})",
                 received_count, last_id_received);

        if received_count < MAX_MESSAGE_SIZE {
            eprintln!("[Subscriber Thread] WARNING: Missed {} messages!", MAX_MESSAGE_SIZE - received_count);
        }
    });

    subscriber_thread.join().expect("Subscriber thread panicked");
    publisher_thread.join().expect("Publisher thread panicked");

    thread::sleep(Duration::from_millis(100));

    println!("\nExample completed successfully!");
}

Observed Behavior:

  • Publisher thread reports successful message sending
  • Subscriber thread receives 0 messages
  • Wireshark captures no RTPS packets at all
  • No errors or warnings in logs

Expected Behavior:

  • Publisher sends messages
  • Subscriber receives messages
  • Wireshark captures RTPS packets with vendorId=01.18

Additional Context:

  • Already applied workaround from issue using pnet may break builds #375 (npcap + Packet.lib)
  • The issue appears specific to Windows 11 network stack interaction
  • Using env_logger with Info level - no relevant errors shown
  • Both threads start and synchronize correctly via Barrier

Question:
Is there a known limitation or additional configuration required for Windows 11? Could this be related to pnet or WinPcap/npcap compatibility issues on newer Windows versions?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions