Skip to content

No incoming data after one minute - Communication between rust dds publisher and rti conntext client #252

@BetterCallBene

Description

@BetterCallBene

Hello guys, I got a example running with rust dds as publisher and rti connext as subscriber.

I create following structure on the of c++/idl side

module dds_tutorials
{
    module HelloWorldData
    {
        struct time {
            long long sec; // seconds since epoch in UTC
            unsigned long nanosec; // nanoseconds since above second
        };

        struct Msg
        {
            long user_id;
            string<256> message;
            time time_stamp;
        };
    };
};

and the c++ client look as following:

#include <dds/sub/ddssub.hpp>
#include <dds/core/ddscore.hpp>
#include <dds/domain/DomainParticipant.hpp>


static void RunPublisher(const int domainID)
{
    const dds::domain::DomainParticipant participant(domain_id);

    const dds::topic::Topic<dds_tutorials::HelloWorldData::Msg> topic(participant, "tutorials.helloworld");

    const dds::sub::Subscriber subscriber(participant);

    dds::sub::DataReader<dds_tutorials::HelloWorldData::Msg> reader(subscriber, topic);

    dds::core::cond::StatusCondition status_condition(reader);

    status_condition.enabled_statuses(dds::core::status::StatusMask::liveliness_changed());

    // Lambda function for the status_condition
    // Handler register a custom handler with the condition
    status_condition->handler([&reader]() {
        // Get the status changes so we can check which status condition
        // triggered
        dds::core::status::StatusMask status_mask = reader.status_changes();
        // In Case of Liveliness changed
        if ((status_mask & dds::core::status::StatusMask::liveliness_changed()).any()) {
            dds::core::status::LivelinessChangedStatus st = reader.liveliness_changed_status();
            std::cout << "Liveliness changed => Active writers = " << st.alive_count() << std::endl;
        }
    });

    dds::sub::cond::ReadCondition read_condition(
        reader,
        dds::sub::status::DataState::any(),
        [&reader]() {
            // Take all samples
            dds::sub::LoanedSamples<dds_tutorials::HelloWorldData::Msg> samples = reader.take();
            for (auto sample : samples) {
                if (sample.info().valid()) {
                    std::cout << "=== [Subscriber] message received :"
                              << "\n";
                    std::cout << "    userID    :   " << sample->data().user_id() << "\n";
                    std::cout << "    Message   : \"" << sample->data().message() << "\"";
                    auto nanoseconds = std::chrono::nanoseconds(sample.data().time_stamp().nanosec());
                    auto seconds = std::chrono::seconds(sample.data().time_stamp().sec());
                    auto duration = seconds + nanoseconds;
                    auto tp = std::chrono::time_point<std::chrono::system_clock>{duration};
                    auto t_c = std::chrono::system_clock::to_time_t(tp);
                    std::cout << "    time point: \"" << std::put_time(std::localtime(&t_c), "%F %T.\n") << "\"" << std::endl;
                }
            }
        }  // The LoanedSamples destructor returns the loan
    );

    // Create a WaitSet and attach both ReadCondition and StatusCondition
    dds::core::cond::WaitSet waitset;
    waitset += read_condition;
    waitset += status_condition;

    while (!shutdownRequested) {
        waitset.dispatch(dds::core::Duration(4));  // Wait up to 4s each time
    }
}

On the other side I create a publisher with rust dds:

fn main() {
    configure_logging();
    let poll = Poll::new().unwrap();
    let mut events = Events::with_capacity(1);

    // Set Ctrl-C handler
    let (stop_sender, stop_receiver) = channel::channel();
    ctrlc::set_handler(move || {
        stop_sender.send(()).unwrap_or(());
        // ignore errors, as we are quitting anyway
    })
    .expect("Error setting Ctrl-C handler");
    println!("Press Ctrl-C to quit.");

    poll.register(
        &stop_receiver,
        STOP_PROGRAM,
        Ready::readable(),
        PollOpt::edge(),
    )
    .unwrap();

    let domain_id = 10u16;
    let domain_participant = DomainParticipant::new(domain_id)
        .unwrap_or_else(|e| panic!("Domain Participant can not be created: Reason: {}", e));

    let qos = QosPolicyBuilder::new()
        .reliability(policy::Reliability::BestEffort)
        .durability(policy::Durability::Volatile)
        .liveliness(rustdds::policy::Liveliness::Automatic { lease_duration: rustdds::Duration::DURATION_INFINITE })
        .history(policy::History::KeepLast { depth: 10 })
        .build();

    println!("Following topics are found:");
    let discovered_topics = domain_participant.discovered_topics();
    for topic in discovered_topics.iter() {
        println!("{}", topic.topic_name());
    }

    let camera_topic = domain_participant
        .create_topic(
            "tutorials.helloworld".to_string(),
            "dds_tutorials::HelloWorldData::Msg".to_string(),
            &qos,
            TopicKind::NoKey,
        )
        .unwrap_or_else(|e| panic!("Topic can not be created: Reason: {}", e));

    let publisher = domain_participant.create_publisher(&qos).unwrap();

    let writer = publisher.create_datawriter_no_key_cdr::<HelloWorldData>(&camera_topic, None).unwrap();


    let mut last_write = Instant::now();
    let loop_delay = time::Duration::from_secs(2);
    let data = HelloWorldData {
        userID: 10,
        message: "Hello World".to_string(),
        time_stamp: Time{
            sec: 0i64,
            nanosec: 0u32
        }
    };
    loop {
        poll.poll(&mut events, Some(loop_delay)).unwrap();

        for event in &events {
            match event.token() {
                STOP_PROGRAM => {
                    if stop_receiver.try_recv().is_ok() {
                        println!("Done");
                        return;
                    }
                },
                other_token => {
                    println!("Polled event is {:?}. WTF?", other_token);
                }
            }
        }

        let now = Instant::now();
        if last_write + loop_delay < now {
            println!("Publishing new data:");
            let mut new_data = data.clone();

            let duration = time::SystemTime::now()
            .duration_since(time::UNIX_EPOCH)
            .expect("Time went backwards");


            new_data.time_stamp.sec = duration.as_secs() as i64;
            new_data.time_stamp.nanosec = (duration.as_nanos() - new_data.time_stamp.sec as u128) as u32;

            //new_data.time_stamp.nanosec = duration.as_nanos();

            writer
                .write(new_data, None)
                .unwrap_or_else(|e| panic!("DataWriter write failed: {:?}", e));
            last_write = now;
        }
    }
}

The connection between subscriber and publisher works without any problems but after approximately 1 min the subscriber print out that the liveliness has changed and no worker is available and no data is coming anymore. Additional I can see in the rtiadminconsole that the datawriter disappear. If I have the same scenario but with rust a subscriber and a publisher this issue will not appear. Can you help me with this problem? Thanks

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions