diff --git a/pyopendds/DataReader.py b/pyopendds/DataReader.py index f411ce7..2db34c2 100644 --- a/pyopendds/DataReader.py +++ b/pyopendds/DataReader.py @@ -1,5 +1,7 @@ from __future__ import annotations +import sys + from .Topic import Topic from .constants import StatusKind from .util import TimeDurationType, normalize_time_duration @@ -13,13 +15,12 @@ class DataReader: def __init__(self, subscriber: Subscriber, topic: Topic, qos=None, listener=None): self.topic = topic - self.qos = qos self.listener = listener self.subscriber = subscriber subscriber.readers.append(self) from _pyopendds import create_datareader - create_datareader(self, subscriber, topic) + create_datareader(self, subscriber, topic, self.onDataAvailCallback) def wait_for(self, status: StatusKind, timeout: TimeDurationType): from _pyopendds import datareader_wait_for @@ -27,3 +28,20 @@ def wait_for(self, status: StatusKind, timeout: TimeDurationType): def take_next_sample(self): return self.topic._ts_package.take_next_sample(self) + + def onDataAvailCallback(self): + sample = None + #print(f"------ onDataAvailCallback") + if hasattr(self, 'topic'): + sample = self.take_next_sample() + #print(f"---------- Sample {sample}") + else: + print("------ Error, no topic in self => " + self.__qualname__) + if sample is not None: + self.listener(sample) + else: + print("------ Error, data not valid") + + def update_reader_qos(self, qos: DataReaderQos): + from _pyopendds import update_reader_qos + return update_reader_qos(self, qos) \ No newline at end of file diff --git a/pyopendds/DataWriter.py b/pyopendds/DataWriter.py index a6beee8..3527112 100644 --- a/pyopendds/DataWriter.py +++ b/pyopendds/DataWriter.py @@ -1,2 +1,32 @@ +from __future__ import annotations + +from .Topic import Topic +from .constants import StatusKind +from .util import TimeDurationType, normalize_time_duration + +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from .Publisher import Publisher + + class DataWriter: - pass + + def __init__(self, publisher: Publisher, topic: Topic, qos=None, listener=None): + self.topic = topic + self.listener = listener + self.publisher = publisher + publisher.writers.append(self) + + from _pyopendds import create_datawriter + create_datawriter(self, publisher, topic) + + def wait_for(self, status: StatusKind, timeout: TimeDurationType): + from _pyopendds import datareader_wait_for + return datareader_wait_for(self, status, *normalize_time_duration(timeout)) + + def write(self, sample): + return self.topic._ts_package.write(self, sample) + + def update_writer_qos(self, qos: DataWriterQos): + from _pyopendds import update_writer_qos + return update_writer_qos(self, qos) \ No newline at end of file diff --git a/pyopendds/DomainParticipant.py b/pyopendds/DomainParticipant.py index 862b423..935f57c 100644 --- a/pyopendds/DomainParticipant.py +++ b/pyopendds/DomainParticipant.py @@ -22,7 +22,7 @@ def __del__(self): participant_cleanup(self) def create_topic(self, - name: str, topic_type: type, qos=None, listener=None) -> Topic: + name: str, topic_type: type, qos=None, listener=None) -> Topic: return Topic(self, name, topic_type, qos, listener) def create_subscriber(self, qos=None, listener=None) -> Subscriber: diff --git a/pyopendds/Publisher.py b/pyopendds/Publisher.py index a292b85..d5bc2d4 100644 --- a/pyopendds/Publisher.py +++ b/pyopendds/Publisher.py @@ -1,5 +1,6 @@ from __future__ import annotations +from .DataWriter import DataWriter from .Topic import Topic from typing import TYPE_CHECKING @@ -18,5 +19,5 @@ def __init__(self, participant: DomainParticipant, qos=None, listener=None): from _pyopendds import create_publisher create_publisher(self, participant) - def create_datawriter(self, topic: Topic, qos=None, listener=None): - pass + def create_datawriter(self, topic: Topic, listener=None) -> DataWriter: + return DataWriter(self, topic, listener) diff --git a/pyopendds/Qos.py b/pyopendds/Qos.py new file mode 100644 index 0000000..92f50fa --- /dev/null +++ b/pyopendds/Qos.py @@ -0,0 +1,49 @@ +from enum import IntEnum + + +class DurabilityQosPolicyKind(IntEnum): + VOLATILE_DURABILITY_QOS = 0, + TRANSIENT_LOCAL_DURABILITY_QOS = 1, + TRANSIENT_DURABILITY_QOS = 2, + PERSISTENT_DURABILITY_QOS = 3 + + +class ReliabilityQosPolicyKind(IntEnum): + BEST_EFFORT_RELIABILITY_QOS = 0, + RELIABLE_RELIABILITY_QOS = 1 + + +class HistoryQosPolicyKind(IntEnum): + KEEP_LAST_HISTORY_QOS = 0, + KEEP_ALL_HISTORY_QOS = 1 + + +class DurabilityQosPolicy: + def __init__(self): + self.kind = DurabilityQosPolicyKind.PERSISTENT_DURABILITY_QOS + + +class ReliabilityQosPolicy: + def __init__(self): + self.kind = ReliabilityQosPolicyKind.RELIABLE_RELIABILITY_QOS + self.max_blocking_time = 0 + + +class HistoryQosPolicy: + def __init__(self): + self.kind = HistoryQosPolicyKind.KEEP_LAST_HISTORY_QOS + self.depth = 0 + + +class DataWriterQos: + def __init__(self): + self.durability = DurabilityQosPolicy() + self.reliability = ReliabilityQosPolicy() + self.history = HistoryQosPolicy() + + +class DataReaderQos: + def __init__(self): + self.durability = DurabilityQosPolicy() + self.reliability = ReliabilityQosPolicy() + self.history = HistoryQosPolicy() diff --git a/pyopendds/dev/include/pyopendds/user.hpp b/pyopendds/dev/include/pyopendds/user.hpp index aa7c7fd..d5d7348 100644 --- a/pyopendds/dev/include/pyopendds/user.hpp +++ b/pyopendds/dev/include/pyopendds/user.hpp @@ -22,6 +22,40 @@ class Type /*{ static void python_to_cpp(PyObject* py, T& cpp); }*/; + +template +class BooleanType { +public: + static PyObject* get_python_class() + { + return Py_False; + } + + static void cpp_to_python(const T& cpp, PyObject*& py) + { + if ( ! cpp ) { + py = Py_False; + } else { + py = Py_True; + } + } + + static void python_to_cpp(PyObject* py, T& cpp) + { + if (PyBool_Check(py)) { + throw Exception("Not a boolean", PyExc_ValueError); + } + if(py) { + cpp = true; + } else { + cpp = false; + } + } +}; + +//typedef ::CORBA::Boolean bool; +template<> class Type: public BooleanType {}; + template class IntegerType { public: @@ -30,39 +64,64 @@ class IntegerType { static PyObject* get_python_class() { - return PyLong_Type; + return PyLong_FromLong(0); } static void cpp_to_python(const T& cpp, PyObject*& py) { if (limits::is_signed) { - py = PyLong_FromLong(cpp); + if (sizeof(cpp) > sizeof(long)) { + py = PyLong_FromLongLong(cpp); + } else { + py = PyLong_FromLong(cpp); + } } else { - py = PyLong_FromUnsignedLong(cpp); + if (sizeof(cpp) > sizeof(long)) { + py = PyLong_FromUnsignedLongLong(cpp); + } else { + py = PyLong_FromUnsignedLong(cpp); + } } if (!py) throw Exception(); } static void python_to_cpp(PyObject* py, T& cpp) { - LongType value; + T value; //todo: change to LongType if (limits::is_signed) { - value = PyLong_AsLong(py); + if (sizeof(cpp) > sizeof(long)) { + value = PyLong_AsLongLong(py); + } else { + value = PyLong_AsLong(py); + } } else { - value = PyLong_AsUnsignedLong(py); + if (sizeof(cpp) > sizeof(long)) { + value = PyLong_AsUnsignedLongLong(py); + } else { + value = PyLong_AsUnsignedLong(py); + } } if (value < limits::min() || value > limits::max()) { throw Exception( "Integer Value is Out of Range for IDL Type", PyExc_ValueError); } if (value == -1 && PyErr_Occurred()) throw Exception(); - cpp = value; + cpp = T(value); } + }; +typedef ::CORBA::LongLong i64; +template<> class Type: public IntegerType {}; + typedef ::CORBA::Long i32; template<> class Type: public IntegerType {}; +typedef ::CORBA::Short i16; +template<> class Type: public IntegerType {}; + +typedef ::CORBA::Char c8; +template<> class Type: public IntegerType {}; // TODO: Put Other Integer Types Here const char* string_data(const std::string& cpp) @@ -90,7 +149,7 @@ class StringType { public: static PyObject* get_python_class() { - return PyUnicode_Type; + return PyUnicode_FromString(""); } static void cpp_to_python(const T& cpp, PyObject*& py, const char* encoding) @@ -101,9 +160,16 @@ class StringType { py = o; } - static void python_to_cpp(PyObject* py, T& cpp) + static void python_to_cpp(PyObject* py, T& cpp, const char* encoding) { - // TODO: Encode or Throw Unicode Error + PyObject* repr = PyObject_Str(py); + if (!repr) throw Exception(); + PyObject* str = PyUnicode_AsEncodedString(repr, encoding, NULL); + if (!str) throw Exception(); + const char *bytes = PyBytes_AS_STRING(str); + cpp = T(bytes); + Py_XDECREF(repr); + Py_XDECREF(str); } }; @@ -119,6 +185,39 @@ typedef template<> class Type: public StringType {}; // TODO: Put Other String/Char Types Here +template +class FloatingType { +public: + typedef std::numeric_limits limits; + + static PyObject* get_python_class() + { + return PyFloat_FromDouble(0); + } + + static void cpp_to_python(const T& cpp, PyObject*& py) + { + py = PyFloat_FromDouble((double)cpp); + if (!py) throw Exception(); + } + + static void python_to_cpp(PyObject* py, T& cpp) + { + double value; + value = PyFloat_AsDouble(py); + if (value < limits::min() || value > limits::max()) { + throw Exception( + "Floating Value is Out of Range for IDL Type", PyExc_ValueError); + } + if (value == -1 && PyErr_Occurred()) throw Exception(); + cpp = value; + } +}; + +typedef ::CORBA::Float f32; +typedef ::CORBA::Double f64; +template<> class Type: public FloatingType {}; +template<> class Type: public FloatingType {}; // TODO: FloatingType for floating point type class TopicTypeBase { @@ -127,6 +226,7 @@ class TopicTypeBase { virtual const char* type_name() = 0; virtual void register_type(PyObject* pyparticipant) = 0; virtual PyObject* take_next_sample(PyObject* pyreader) = 0; + virtual PyObject* write(PyObject* pywriter, PyObject* pysample) = 0; typedef std::shared_ptr Ptr; typedef std::map TopicTypes; @@ -215,7 +315,7 @@ class TopicType : public TopicTypeBase { DDS::WaitSet_var ws = new DDS::WaitSet; ws->attach_condition(read_condition); DDS::ConditionSeq active; - const DDS::Duration_t max_wait_time = {10, 0}; + const DDS::Duration_t max_wait_time = {60, 0}; if (Errors::check_rc(ws->wait(active, max_wait_time))) { throw Exception(); } @@ -223,17 +323,43 @@ class TopicType : public TopicTypeBase { reader_impl->delete_readcondition(read_condition); IdlType sample; - DDS::SampleInfo info; + DDS::SampleInfo info; if (Errors::check_rc(reader_impl->take_next_sample(sample, info))) { throw Exception(); } PyObject* rv = nullptr; - Type::cpp_to_python(sample, rv); + if (info.valid_data) + Type::cpp_to_python(sample, rv); + else + rv = Py_None; return rv; } + PyObject* write(PyObject* pywriter, PyObject* pysample) + { + DDS::DataWriter* writer = get_capsule(pywriter); + if (!writer) throw Exception(); + + DataWriter* writer_impl = DataWriter::_narrow(writer); + if (!writer_impl) { + throw Exception("Could not narrow writer implementation", Errors::PyOpenDDS_Error()); + } + + IdlType rv; + Type::python_to_cpp(pysample, rv); + + DDS::ReturnCode_t rc = writer_impl->write(rv, DDS::HANDLE_NIL); + if (rc != DDS::RETCODE_OK) { + throw Exception( + "WRITE ERROR", Errors::PyOpenDDS_Error()); + } + if (Errors::check_rc(rc)) return nullptr; + + return PyLong_FromLong(rc); + } + PyObject* get_python_class() { return Type::get_python_class(); diff --git a/pyopendds/dev/itl2py/CppOutput.py b/pyopendds/dev/itl2py/CppOutput.py index b922eb0..869d7e4 100644 --- a/pyopendds/dev/itl2py/CppOutput.py +++ b/pyopendds/dev/itl2py/CppOutput.py @@ -1,6 +1,6 @@ from jinja2 import Environment -from .ast import PrimitiveType, StructType, EnumType +from .ast import PrimitiveType, StructType, EnumType, SequenceType, ArrayType from .Output import Output @@ -13,6 +13,10 @@ def cpp_type_name(type_node): return type_node.kind.name elif isinstance(type_node, (StructType, EnumType)): return cpp_name(type_node.name.parts) + elif isinstance(type_node, (SequenceType)): + return cpp_name(type_node.name.parts); + elif isinstance(type_node, (ArrayType)): + return cpp_name(type_node.name.parts); else: raise NotImplementedError @@ -44,13 +48,16 @@ def visit_struct(self, struct_type): struct_to_lines = [ 'Ref field_value;', ] - struct_from_lines = [] + struct_from_lines = [ + 'Ref field_value;', + ] for field_name, field_node in struct_type.fields.items(): to_lines = [] from_lines = [] pyopendds_type = '' is_string = isinstance(field_node.type_node, PrimitiveType) and \ field_node.type_node.is_string() + is_sequence = isinstance(field_node.type_node, SequenceType) to_lines = [ 'Type<{pyopendds_type}>::cpp_to_python(cpp.{field_name}', @@ -61,16 +68,35 @@ def visit_struct(self, struct_type): + (', "{default_encoding}"' if is_string else '') + ');', ] + from_lines = [ + 'if (PyObject_HasAttrString(py, "{field_name}")) {{', + ' *field_value = PyObject_GetAttrString(py, "{field_name}");', + '}}', + 'if (!field_value) {{', + ' throw Exception();', + '}}' + ] + pyopendds_type = cpp_type_name(field_node.type_node) if to_lines: to_lines.extend([ - 'if (!field_value || PyObject_SetAttrString(' + 'if (!field_value || PyObject_SetAttrString(', 'py, "{field_name}", *field_value)) {{', ' throw Exception();', '}}' ]) + if from_lines: + from_lines.extend([ + 'Type<{pyopendds_type}>::python_to_cpp(*field_value, cpp.{field_name}', + '#ifdef CPP11_IDL', + ' ()', + '#endif', + ' ' + + (', "{default_encoding}"' if is_string else '') + ');' + ]) + def line_process(lines): return [''] + [ s.format( @@ -107,9 +133,65 @@ def visit_enum(self, enum_type): 'args = PyTuple_Pack(1, PyLong_FromLong(static_cast(cpp)));', ]), 'to_lines': '', - 'from_lines': '\n'.join([ - '', - '// left unimplemented' - ]), + 'from_lines': '', 'is_topic_type': False, }) + + def visit_sequence(self, sequence_type): + sequence_to_lines = [ + 'Ref field_value;', + ] + sequence_from_lines = [] + to_lines = [ + 'Ref field_elem;', + 'field_value = PyList_New(0);', + 'for (int i = 0; i < cpp.length(); i++) {{', + ' {pyopendds_type} elem = cpp[i];', + ' field_elem = nullptr;', + ' Type<{pyopendds_type}>::cpp_to_python(elem', + ' #ifdef CPP11_IDL', + ' ()', + ' #endif', + ' , *field_elem);', + ' PyList_Append(py, *field_elem);', + '}}' + ] + + pyopendds_type = cpp_type_name(sequence_type.base_type) + from_lines = [ + 'cpp.length(PyList_Size(py));', + 'for (int i = 0; i < PyList_Size(py); i++) {{', + ' {pyopendds_type} elem = cpp[i];', + ' Type<{pyopendds_type}>::python_to_cpp(PyList_GetItem(py, i), elem', + '#ifdef CPP11_IDL', + ' ()', + '#endif', + ' );', + ' cpp[i] = elem;', + '}}' + ] + + def line_process(lines): + return [''] + [ + s.format( + default_encoding=self.context['default_encoding'], + pyopendds_type=pyopendds_type, + ) for s in lines + ] + + sequence_to_lines.extend(line_process(to_lines)) + sequence_from_lines.extend(line_process(from_lines)) + + self.context['types'].append({ + 'cpp_name': cpp_name(sequence_type.name.parts), + 'name_parts': sequence_type.parent_name().parts, + 'local_name': sequence_type.local_name(), + 'to_lines': '\n'.join(sequence_to_lines), + 'from_lines': '\n'.join(sequence_from_lines), + 'new_lines': '\n'.join([ + 'args = nullptr;' + ]), + 'is_topic_type': sequence_type.is_topic_type, + 'sequence': True, + 'to_replace': False, + }) diff --git a/pyopendds/dev/itl2py/PythonOutput.py b/pyopendds/dev/itl2py/PythonOutput.py index 42920d3..b57ca86 100644 --- a/pyopendds/dev/itl2py/PythonOutput.py +++ b/pyopendds/dev/itl2py/PythonOutput.py @@ -1,4 +1,4 @@ -from .ast import PrimitiveType, StructType, EnumType +from .ast import PrimitiveType, StructType, EnumType, SequenceType, ArrayType from .Output import Output @@ -72,6 +72,10 @@ def get_python_default_value_string(self, field_type): return type_name + '()' elif isinstance(field_type, EnumType): return type_name + '.' + field_type.default_member + elif isinstance(field_type, SequenceType): + return 'field(default_factory=list)' + elif isinstance(field_type, ArrayType): + return 'field(default_factory=list)' else: raise NotImplementedError(repr(field_type) + " is not supported") @@ -99,3 +103,14 @@ def visit_enum(self, enum_type): ], ), )) + + def visit_sequence(self, sequence_type): + self.context['has_sequence'] = True + self.context['types'].append(dict( + local_name=sequence_type.local_name(), + type_support=self.context['native_package_name'] if sequence_type.is_topic_type else None, + sequence=dict( + type=sequence_type.base_type, + len=sequence_type.max_count, + ), + )) diff --git a/pyopendds/dev/itl2py/ast.py b/pyopendds/dev/itl2py/ast.py index 28a59a4..76d5d5b 100644 --- a/pyopendds/dev/itl2py/ast.py +++ b/pyopendds/dev/itl2py/ast.py @@ -195,6 +195,7 @@ def __init__(self, base_type, dimensions): def accept(self, visitor): visitor.visit_array(self) + pass def __repr__(self): return self.repr_template( @@ -210,11 +211,15 @@ def __init__(self, base_type, max_count): def accept(self, visitor): visitor.visit_sequence(self) + pass def __repr__(self): return self.repr_template(repr(self.base_type) + ("max " + str(self.max_count) if self.max_count else "no max")) + def repr_name(self): + if self.name: + return '::' + self.name.join('::') + '::_tao_seq_' + self.base_type + '_' class NodeVisitor: @@ -231,10 +236,12 @@ def visit_enum(self, enum_type): raise NotImplementedError def visit_array(self, array_type): - raise NotImplementedError + pass + #array_type.accept(self) def visit_sequence(self, sequence_type): raise NotImplementedError + #sequence_type.accept(self) def get_ast(types: dict) -> Module: diff --git a/pyopendds/dev/itl2py/itl.py b/pyopendds/dev/itl2py/itl.py index d447ee3..1ffb186 100644 --- a/pyopendds/dev/itl2py/itl.py +++ b/pyopendds/dev/itl2py/itl.py @@ -86,9 +86,16 @@ def parse_sequence(types, details): def parse_record(types, details): struct_type = StructType() for field_dict in details['fields']: - struct_type.add_field( - field_dict['name'], parse_type(types, field_dict['type']), - field_dict.get('optional', False)) + if 'sequence' in field_dict['type']: + sequence = parse_sequence(types, {'type': field_dict['type'], 'capacity': 1, 'size': None}) + sequence.set_name(itl_name=sequence.base_type.name.itl_name) + struct_type.add_field( + field_dict['name'], sequence, + field_dict.get('optional', False)) + else: + struct_type.add_field( + field_dict['name'], parse_type(types, field_dict['type']), + field_dict.get('optional', False)) return struct_type @@ -148,3 +155,4 @@ def parse_itl(types, itl): # just use the first definition we found. if parsed_type.name.itl_name not in types: types[parsed_type.name.itl_name] = parsed_type + diff --git a/pyopendds/dev/itl2py/templates/user.cpp b/pyopendds/dev/itl2py/templates/user.cpp index e57c9ca..9275491 100644 --- a/pyopendds/dev/itl2py/templates/user.cpp +++ b/pyopendds/dev/itl2py/templates/user.cpp @@ -43,6 +43,10 @@ class Type { /*{{ type.new_lines | indent(4) }}*/ py = PyObject_CallObject(cls, args); /*{% else %}*/ + /*{% if type.sequence %}*/ + if (py) Py_DECREF(py); + py = nullptr; + /*{% endif %}*/ if (py) { if (PyObject_IsInstance(cls, py) != 1) { throw Exception("Not a {{ type.py_name }}", PyExc_TypeError); @@ -59,7 +63,20 @@ class Type { static void python_to_cpp(PyObject* py, /*{{ type.cpp_name }}*/& cpp) { PyObject* cls = get_python_class(); - /*{{ type.from_lines | indent(4) }}*/ + /*{% if type.to_replace %}*/ + cpp = static_cast(PyLong_AsLong(py)); + /*{% else %}*/ + if (py) { + if (PyObject_IsInstance(py, cls) != 1) { + throw Exception("Not a {{ type.py_name }}", PyExc_TypeError); + } + } else { + PyObject* args; + /*{{ type.new_lines | indent(6) }}*/ + py = PyObject_CallObject(cls, args); + } + /*{% if type.from_lines %}*//*{{ type.from_lines | indent(4) }}*//*{% endif %}*/ + /*{% endif %}*/ } }; @@ -119,9 +136,31 @@ PyObject* pytake_next_sample(PyObject* self, PyObject* args) } } +PyObject* pywrite(PyObject* self, PyObject* args) +{ + Ref pywriter; + Ref pysample; + if (!PyArg_ParseTuple(args, "OO", &*pywriter, &*pysample)) return nullptr; + pywriter++; + pysample++; + + // Try to Get Reading Type and Do write + Ref pytopic = PyObject_GetAttrString(*pywriter, "topic"); + if (!pytopic) return nullptr; + Ref pytype = PyObject_GetAttrString(*pytopic, "type"); + if (!pytype) return nullptr; + + try { + return TopicTypeBase::find(*pytype)->write(*pywriter, *pysample); + } catch (const Exception& e) { + return e.set(); + } +} + PyMethodDef /*{{ native_package_name }}*/_Methods[] = { {"register_type", pyregister_type, METH_VARARGS, ""}, {"type_name", pytype_name, METH_VARARGS, ""}, + {"write", pywrite, METH_VARARGS, ""}, {"take_next_sample", pytake_next_sample, METH_VARARGS, ""}, {nullptr, nullptr, 0, nullptr} }; diff --git a/pyopendds/dev/itl2py/templates/user.py b/pyopendds/dev/itl2py/templates/user.py index 63aaf37..6ca3313 100644 --- a/pyopendds/dev/itl2py/templates/user.py +++ b/pyopendds/dev/itl2py/templates/user.py @@ -1,10 +1,14 @@ {% if has_struct -%} from dataclasses import dataclass as _pyopendds_struct +from dataclasses import field {%- endif %} {% if has_enum -%} from enum import IntFlag as _pyopendds_enum {%- endif %} +{% if has_sequence -%} +{%- endif %} {% for type in types -%} + {%- if type.struct %} @_pyopendds_struct @@ -20,6 +24,10 @@ class {{ type.local_name }}(_pyopendds_enum): {%- for member in type.enum.members %} {{ member.name }} = {{ member.value }} {%- endfor %} +{%- elif type.sequence %} + +class {{ type.local_name }}(list): + pass {%- else %} # {{ type.local_name }} was left unimplmented {% endif -%} diff --git a/pyopendds/ext/_pyopendds.cpp b/pyopendds/ext/_pyopendds.cpp index 0f16aaa..273ab86 100644 --- a/pyopendds/ext/_pyopendds.cpp +++ b/pyopendds/ext/_pyopendds.cpp @@ -17,6 +17,75 @@ PyObject* Errors::PyOpenDDS_Error_ = nullptr; PyObject* Errors::ReturnCodeError_ = nullptr; namespace { +class DataReaderListenerImpl : public virtual OpenDDS::DCPS::LocalObject { +public: + + DataReaderListenerImpl(PyObject * self, PyObject *callback); + + virtual void on_requested_deadline_missed( + DDS::DataReader_ptr reader, + const DDS::RequestedDeadlineMissedStatus& status) {} + + virtual void on_requested_incompatible_qos( + DDS::DataReader_ptr reader, + const DDS::RequestedIncompatibleQosStatus& status) {} + + virtual void on_sample_rejected( + DDS::DataReader_ptr reader, + const DDS::SampleRejectedStatus& status) {} + + virtual void on_liveliness_changed( + DDS::DataReader_ptr reader, + const DDS::LivelinessChangedStatus& status) {} + + virtual void on_data_available( + DDS::DataReader_ptr reader); + + virtual void on_subscription_matched( + DDS::DataReader_ptr reader, + const DDS::SubscriptionMatchedStatus& status) {} + + virtual void on_sample_lost( + DDS::DataReader_ptr reader, + const DDS::SampleLostStatus& status) {} + + private: + PyObject *_callback; + PyObject * _self; +}; + +DataReaderListenerImpl::DataReaderListenerImpl(PyObject * self, PyObject *callback): OpenDDS::DCPS::LocalObject() { + _self = self; + Py_XINCREF(_self); + _callback = callback; + Py_XINCREF(_callback); +} + +void +DataReaderListenerImpl::on_data_available(DDS::DataReader_ptr reader) +{ + PyObject *callable = _callback; + PyObject *result = NULL; + + PyGILState_STATE gstate; + gstate = PyGILState_Ensure(); + try{ + if(PyCallable_Check(callable)) { + result = PyObject_CallFunctionObjArgs(callable,nullptr); + if(result == NULL) + PyErr_Print(); + Py_XDECREF(result); + } else { + throw Exception("function is not a callback", PyExc_TypeError); + } + } catch (Exception& e ) { + // Py_XDECREF(callable); + PyGILState_Release(gstate); + throw e; + } + //Py_XDECREF(callable); + PyGILState_Release(gstate); +} /// Global Participant Factory DDS::DomainParticipantFactory_var participant_factory; @@ -321,25 +390,31 @@ void delete_datareader_var(PyObject* reader_capsule) if (PyCapsule_CheckExact(reader_capsule)) { DDS::DataReader_var reader = static_cast( PyCapsule_GetPointer(reader_capsule, nullptr)); + DDS::DataReaderListener_ptr listener = DDS::DataReader::_narrow(reader)->get_listener(); + free(listener); + listener = nullptr; reader = nullptr; } } /** - * create_datareader(datareader: DataReader, subscriber: Subscriber, topic: Topic) -> None + * create_datareader(datareader: DataReader, subscriber: Subscriber, topic: Topic, listener: pyObject) -> None */ PyObject* create_datareader(PyObject* self, PyObject* args) { Ref pydatareader; Ref pysubscriber; Ref pytopic; - if (!PyArg_ParseTuple(args, "OOO", - &*pydatareader, &*pysubscriber, &*pytopic)) { + Ref pycallback; + + if (!PyArg_ParseTuple(args, "OOOO", + &*pydatareader, &*pysubscriber, &*pytopic, &*pycallback)) { return nullptr; } pydatareader++; pysubscriber++; pytopic++; + pycallback++; // Get Subscriber DDS::Subscriber* subscriber = get_capsule(*pysubscriber); @@ -349,9 +424,20 @@ PyObject* create_datareader(PyObject* self, PyObject* args) DDS::Topic* topic = get_capsule(*pytopic); if (!topic) return nullptr; + DataReaderListenerImpl * listener = nullptr; + if(*pycallback != Py_None) { + if(PyCallable_Check(*pycallback)) { + listener = new DataReaderListenerImpl(*pydatareader, *pycallback); + } + else { + throw Exception("Callback provided is not a callable object", PyExc_TypeError); + } + + } + // Create DataReader DDS::DataReader* datareader = subscriber->create_datareader( - topic, DATAREADER_QOS_DEFAULT, nullptr, + topic, DATAREADER_QOS_DEFAULT, listener, OpenDDS::DCPS::DEFAULT_STATUS_MASK); if (!datareader) { PyErr_SetString(Errors::PyOpenDDS_Error(), "Failed to Create DataReader"); @@ -366,6 +452,59 @@ PyObject* create_datareader(PyObject* self, PyObject* args) Py_RETURN_NONE; } +/** + * Callback for Python to Call when the DataWriter Capsule is Deleted + */ +void delete_datawriter_var(PyObject* writer_capsule) +{ + if (PyCapsule_CheckExact(writer_capsule)) { + DDS::DataWriter_var writer = static_cast( + PyCapsule_GetPointer(writer_capsule, nullptr)); + writer = nullptr; + } +} + +/** + * create_datawriter(datawriter: DataWriter, publisher: Publisher, topic: Topic) -> None + */ +PyObject* create_datawriter(PyObject* self, PyObject* args) +{ + Ref pydatawriter; + Ref pypublisher; + Ref pytopic; + if (!PyArg_ParseTuple(args, "OOO", + &*pydatawriter, &*pypublisher, &*pytopic)) { + return nullptr; + } + pydatawriter++; + pypublisher++; + pytopic++; + + // Get Publisher + DDS::Publisher* publisher = get_capsule(*pypublisher); + if (!publisher) return nullptr; + + // Get Topic + DDS::Topic* topic = get_capsule(*pytopic); + if (!topic) return nullptr; + + // Create DataWriter + DDS::DataWriter* datawriter = publisher->create_datawriter( + topic, DATAWRITER_QOS_DEFAULT, nullptr, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!datawriter) { + PyErr_SetString(Errors::PyOpenDDS_Error(), "Failed to Create DataWriter"); + return nullptr; + } + + // Attach OpenDDS DataWriter to DataWriter Python Object + if (set_capsule(*pydatawriter, datawriter, delete_datawriter_var)) { + return nullptr; + } + + Py_RETURN_NONE; +} + /** * datareader_wait_for( * datareader: DataReader, status: StatusKind, @@ -400,6 +539,187 @@ PyObject* datareader_wait_for(PyObject* self, PyObject* args) Py_RETURN_NONE; } +/** + * datawriter_wait_for( + * datawriter: DataWriter, status: StatusKind, + * seconds: int, nanoseconds: int) -> None + */ +PyObject* datawriter_wait_for(PyObject* self, PyObject* args) +{ + Ref pydatawriter; + unsigned status; + int seconds; + unsigned nanoseconds; + if (!PyArg_ParseTuple(args, "OIiI", + &*pydatawriter, &status, &seconds, &nanoseconds)) { + return nullptr; + } + pydatawriter++; + + // Get DataWriter + DDS::DataWriter* writer = get_capsule(*pydatawriter); + if (!writer) return nullptr; + + // Wait + DDS::StatusCondition_var condition = writer->get_statuscondition(); + condition->set_enabled_statuses(status); + DDS::WaitSet_var waitset = new DDS::WaitSet; + if (!waitset) return PyErr_NoMemory(); + waitset->attach_condition(condition); + DDS::ConditionSeq active; + DDS::Duration_t max_duration = {seconds, nanoseconds}; + if (Errors::check_rc(waitset->wait(active, max_duration))) return nullptr; + + Py_RETURN_NONE; +} + +PyObject* update_writer_qos(PyObject* self, PyObject* args) +{ + Ref pydatawriter; + Ref pyQos; + + Ref pydurability; + Ref pyreliability; + Ref pyhistory; + Ref pydurabilityKind; + Ref pyreliabilityKind; + Ref pyhistoryKind; + + if (!PyArg_ParseTuple(args, "OO", + &*pydatawriter, &*pyQos)) { + return nullptr; + } + pydatawriter++; + pyQos++; + + // Get DataWriter + DDS::DataWriter* writer = get_capsule(*pydatawriter); + if (!writer) return nullptr; + + std::cerr << "get default qos" << std::endl; + // Create Qos for the data writer according to the spec + DDS::DataWriterQos qos; + writer->get_publisher()->get_default_datawriter_qos(qos); + + + std::cerr << "get durability" << std::endl; + pydurability = PyObject_GetAttrString(*pyQos, "durability"); + if (!pydurability) return nullptr; + pydurability ++; + + std::cerr << "get reliability" << std::endl; + pyreliability = PyObject_GetAttrString(*pyQos, "reliability"); + if (!pyreliability) return nullptr; + pyreliability ++; + + std::cerr << "get history" << std::endl; + pyhistory = PyObject_GetAttrString(*pyQos, "history"); + if (!pyhistory) return nullptr; + pyhistory ++; + + + std::cerr << "get dura kind" << std::endl; + pydurabilityKind = PyObject_GetAttrString(*pydurability, "kind"); + if (!pydurabilityKind) return nullptr; + pydurabilityKind ++; + std::cerr << "AsLong" << std::endl; + qos.durability.kind = (DDS::DurabilityQosPolicyKind) PyLong_AsLong(*pydurabilityKind); + + std::cerr << "get rela kind" << std::endl; + pyreliabilityKind = PyObject_GetAttrString(*pyreliability, "kind"); + if (!pyreliabilityKind) return nullptr; + pyreliabilityKind ++; + std::cerr << "AsLong" << std::endl; + qos.reliability.kind = (DDS::ReliabilityQosPolicyKind) PyLong_AsLong(*pyreliabilityKind); + + std::cerr << "get histo kind" << std::endl; + pyhistoryKind = PyObject_GetAttrString(*pyhistory, "kind"); + if (!pyhistoryKind) return nullptr; + pyhistoryKind ++; + + std::cerr << "AsLong" << std::endl; + qos.history.kind = (DDS::HistoryQosPolicyKind) PyLong_AsLong(*pyhistoryKind); + + std::cerr << "set QOS" << std::endl; + writer->set_qos (qos); + + std::cerr << "return" << std::endl; + Py_RETURN_NONE; +} + +PyObject* update_reader_qos(PyObject* self, PyObject* args) +{ + Ref pydatareader; + Ref pyQos; + + Ref pydurability; + Ref pyreliability; + Ref pyhistory; + Ref pydurabilityKind; + Ref pyreliabilityKind; + Ref pyhistoryKind; + Ref pyhistorydepth; + Ref pyreliabilitymax; + + if (!PyArg_ParseTuple(args, "OO", + &*pydatareader, &*pyQos)) { + return nullptr; + } + pydatareader++; + pyQos++; + + // Get DataReader + DDS::DataReader* reader = get_capsule(*pydatareader); + if (!reader) return nullptr; + + // Create Qos for the data writer according to the spec + DDS::DataReaderQos qos; + reader->get_subscriber()->get_default_datareader_qos(qos); + + pydurability = PyObject_GetAttrString(*pyQos, "durability"); + if (!pydurability) return nullptr; + pydurability ++; + + pyreliability = PyObject_GetAttrString(*pyQos, "reliability"); + if (!pyreliability) return nullptr; + pyreliability ++; + + pyhistory = PyObject_GetAttrString(*pyQos, "history"); + if (!pyhistory) return nullptr; + pyhistory ++; + + + pydurabilityKind = PyObject_GetAttrString(*pydurability, "kind"); + if (!pydurabilityKind) return nullptr; + pydurabilityKind ++; + qos.durability.kind = (DDS::DurabilityQosPolicyKind) PyLong_AsLong(*pydurabilityKind); + + pyreliabilityKind = PyObject_GetAttrString(*pyreliability, "kind"); + if (!pyreliabilityKind) return nullptr; + pyreliabilityKind ++; + qos.reliability.kind = (DDS::ReliabilityQosPolicyKind) PyLong_AsLong(*pyreliabilityKind); + + pyreliabilitymax = PyObject_GetAttrString(*pyreliability, "max_blocking_time"); + if (!pyreliabilitymax) return nullptr; + pyreliabilitymax ++; + qos.history.depth = PyLong_AsLong(*pyreliabilitymax); + + + pyhistoryKind = PyObject_GetAttrString(*pyhistory, "kind"); + if (!pyhistoryKind) return nullptr; + pyhistoryKind ++; + + qos.history.kind = (DDS::HistoryQosPolicyKind) PyLong_AsLong(*pyhistoryKind); + + pyhistorydepth = PyObject_GetAttrString(*pyhistory, "depth"); + if (!pyhistorydepth) return nullptr; + pyhistorydepth ++; + qos.history.depth = PyLong_AsLong(*pyhistorydepth); + + reader->set_qos (qos); + Py_RETURN_NONE; +} + /// Documentation for Internal Python Objects const char* internal_docstr = "Internal to PyOpenDDS, not for use directly!"; @@ -414,7 +734,11 @@ PyMethodDef pyopendds_Methods[] = { {"create_publisher", create_publisher, METH_VARARGS, internal_docstr}, {"create_topic", create_topic, METH_VARARGS, internal_docstr}, {"create_datareader", create_datareader, METH_VARARGS, internal_docstr}, + {"create_datawriter", create_datawriter, METH_VARARGS, internal_docstr}, {"datareader_wait_for", datareader_wait_for, METH_VARARGS, internal_docstr}, + {"datawriter_wait_for", datawriter_wait_for, METH_VARARGS, internal_docstr}, + {"update_writer_qos", update_writer_qos, METH_VARARGS, internal_docstr}, + {"update_reader_qos", update_reader_qos, METH_VARARGS, internal_docstr}, {nullptr, nullptr, 0, nullptr} }; diff --git a/pyopendds/init_opendds.py b/pyopendds/init_opendds.py index 8c537ad..ecf1bec 100644 --- a/pyopendds/init_opendds.py +++ b/pyopendds/init_opendds.py @@ -1,6 +1,7 @@ '''Manage the initialization of OpenDDS and related functionality. ''' +import sys def init_opendds(*args, default_rtps=True, @@ -19,7 +20,7 @@ def init_opendds(*args, verbose). It is printed to stdout. ''' - args = list(args) + args = list(sys.argv[1:]) if opendds_debug_level > 0: if not (1 <= opendds_debug_level <= 10): diff --git a/tests/basic_test/CMakeLists.txt b/tests/basic_test/CMakeLists.txt index 4b8b942..c370ca0 100644 --- a/tests/basic_test/CMakeLists.txt +++ b/tests/basic_test/CMakeLists.txt @@ -17,6 +17,18 @@ export( FILE "${CMAKE_CURRENT_BINARY_DIR}/basic_idlConfig.cmake" ) +add_library(airbus_idl SHARED) +if(${CPP11_IDL}) + set(opendds_idl_mapping_option "-Lc++11") +endif() +OPENDDS_TARGET_SOURCES(airbus_idl "airbusdds.idl" + OPENDDS_IDL_OPTIONS "-Gitl" "${opendds_idl_mapping_option}") +target_link_libraries(airbus_idl PUBLIC OpenDDS::Dcps) +export( + TARGETS airbus_idl + FILE "${CMAKE_CURRENT_BINARY_DIR}/airbus_idlConfig.cmake" +) + add_executable(publisher publisher.cpp) target_link_libraries(publisher OpenDDS::OpenDDS basic_idl) if(${CPP11_IDL}) diff --git a/tests/basic_test/basic.idl b/tests/basic_test/basic.idl index 67dbc63..82a7b1c 100644 --- a/tests/basic_test/basic.idl +++ b/tests/basic_test/basic.idl @@ -5,10 +5,18 @@ module basic { acceleration }; + struct Sample { + long value; + string where; + }; + + typedef sequence seqSample; + @topic struct Reading { ReadingKind kind; long value; string where; + seqSample sampleSeq; }; }; diff --git a/tests/basic_test/publisher.py b/tests/basic_test/publisher.py new file mode 100644 index 0000000..9c8f51e --- /dev/null +++ b/tests/basic_test/publisher.py @@ -0,0 +1,70 @@ +import sys +import time +from datetime import timedelta + +from pyopendds import \ + init_opendds, DomainParticipant, StatusKind, PyOpenDDS_Error +from pybasic.basic import * + +if __name__ == "__main__": + try: + # Initialize OpenDDS and Create DDS Entities + init_opendds(opendds_debug_level=1) + time.sleep(1) + domain = DomainParticipant(34) + time.sleep(1) + topic = domain.create_topic('Readings', Reading) + time.sleep(1) + publisher = domain.create_publisher() + time.sleep(1) + writer = publisher.create_datawriter(topic) + time.sleep(1) + + # Wait for Subscriber to Connect + print('Waiting for Subscriber...') + writer.wait_for(StatusKind.PUBLICATION_MATCHED, timedelta(seconds=60)) + print('Found subscriber!') + + write_sample_speed = Reading() + s1 = Sample() + s1.value = 1 + s1.where = "toto1" + + s2 = Sample() + s2.value = 2 + s2.where = "toto2" + + write_sample_accel = Reading() + write_sample_dist = Reading() + + while True: + time.sleep(1) + write_sample_speed.kind = ReadingKind.speed + write_sample_speed.value = 123 + write_sample_speed.where = "somewhere" + write_sample_speed.sampleSeq = seqSample([s1, s2]) + # Read and Print Sample + rc = writer.write(write_sample_speed) + print(rc) + + time.sleep(1) + write_sample_accel.kind = ReadingKind.acceleration + write_sample_accel.value = 2 + write_sample_accel.where = "everywhere" + write_sample_accel.sampleSeq = seqSample([s1, s2]) + # Read and Print Sample + rc = writer.write(write_sample_accel) + print(rc) + + time.sleep(1) + write_sample_dist.kind = ReadingKind.distance + write_sample_dist.value = 543 + write_sample_dist.where = "anywhere" + write_sample_dist.sampleSeq = seqSample([s1, s2]) + # Read and Print Sample + rc = writer.write(write_sample_dist) + print(rc) + print('Done!') + + except PyOpenDDS_Error as e: + sys.exit(e) diff --git a/tests/basic_test/run_test.sh b/tests/basic_test/run_test.sh index 1591bda..a527fe2 100644 --- a/tests/basic_test/run_test.sh +++ b/tests/basic_test/run_test.sh @@ -5,20 +5,45 @@ sub=$! cd $dir ./publisher -DCPSConfigFile ../rtps.ini & pub=$! +cd - exit_status=0 wait $pub pub_status=$? if [ $pub_status -ne 0 ] then - echo "Publisher exited with status $pub_status" 1>&2 + echo "Cpp publisher exited with status $pub_status" 1>&2 exit_status=1 fi wait $sub sub_status=$? if [ $sub_status -ne 0 ] then - echo "Subscriber exited with status $sub_status" 1>&2 + echo "Python subscriber exited with status $sub_status" 1>&2 + exit_status=1 +fi + +cd $dir +./subscriber -DCPSConfigFile ../rtps.ini & +sub=$! +cd - + +LD_LIBRARY_PATH="$LD_LIBRARY_PATH:$dir" python3 publisher.py & +pub=$! + +exit_status=0 +wait $pub +pub_status=$? +if [ $pub_status -ne 0 ] +then + echo "Python publisher exited with status $pub_status" 1>&2 + exit_status=1 +fi +wait $sub +sub_status=$? +if [ $sub_status -ne 0 ] +then + echo "Cpp subscriber exited with status $sub_status" 1>&2 exit_status=1 fi exit $exit_status diff --git a/tests/basic_test/subscriber.cpp b/tests/basic_test/subscriber.cpp new file mode 100644 index 0000000..36789d4 --- /dev/null +++ b/tests/basic_test/subscriber.cpp @@ -0,0 +1,130 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "DataReaderListenerImpl.h" +#include "basicTypeSupportImpl.h" + +#include + +using OpenDDS::DCPS::retcode_to_string; + +int main(int argc, char* argv[]) { + + try { + // Init OpenDDS + TheServiceParticipant->default_configuration_file("rtps.ini"); + DDS::DomainParticipantFactory_var opendds = + TheParticipantFactoryWithArgs(argc, argv); + + DDS::DomainParticipantQos part_qos; + opendds->get_default_participant_qos(part_qos); + DDS::DomainParticipant_var participant = opendds->create_participant( + 34, part_qos, 0, OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!participant) { + std::cerr << "Error: Failed to create participant" << std::endl; + return 1; + } + + basic::ReadingTypeSupport_var ts = new basic::ReadingTypeSupportImpl(); + DDS::ReturnCode_t rc = ts->register_type(participant.in(), ""); + if (rc != DDS::RETCODE_OK) { + std::cerr + << "Error: Failed to register type: " + << retcode_to_string(rc) << std::endl; + return 1; + } + + CORBA::String_var type_name = ts->get_type_name(); + DDS::Topic_var topic = participant->create_topic( + "Readings", type_name.in(), TOPIC_QOS_DEFAULT, 0, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!topic) { + std::cerr << "Error: Failed to create topic" << std::endl; + return 1; + } + + DDS::Subscriber_var subscriber = participant->create_subscriber( + SUBSCRIBER_QOS_DEFAULT, 0, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!subscriber) { + std::cerr << "Error: Failed to create subscriber" << std::endl; + return 1; + } + + DDS::DataReaderListener_var listener(new DataReaderListenerImpl); + DDS::DataReaderQos qos; + subscriber->get_default_datareader_qos(qos); + DDS::DataReader_var reader = subscriber->create_datareader( + topic.in(), qos, listener, + OpenDDS::DCPS::DEFAULT_STATUS_MASK); + if (!reader) { + std::cerr << "Error: Failed to create reader" << std::endl; + return 1; + } + basic::ReadingDataReader_var reader_i = + basic::ReadingDataReader::_narrow(reader); + + if (!reader_i) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" _narrow failed!\n")), + 1); + } + + // Wait for Subscriber + std::cout << "Wating for Subscriber..." << std::endl; + DDS::StatusCondition_var sc = reader->get_statuscondition(); + sc->set_enabled_statuses(DDS::SUBSCRIPTION_MATCHED_STATUS); + DDS::WaitSet_var ws = new DDS::WaitSet; + ws->attach_condition(sc); + const DDS::Duration_t max_wait = {10, 0}; + DDS::SubscriptionMatchedStatus status = {0, 0, 0, 0, 0}; + while (status.current_count < 1) { + DDS::ConditionSeq active; + if (ws->wait(active, max_wait) != DDS::RETCODE_OK) { + std::cerr << "Error: Timedout waiting for subscriber" << std::endl; + return 1; + } + if (reader->get_subscription_matched_status(status) != DDS::RETCODE_OK) { + std::cerr << "Error: Failed to get pub matched status" << std::endl; + return 1; + } + } + ws->detach_condition(sc); + std::cout << "Found Publisher..." << std::endl; + + DDS::SubscriptionMatchedStatus matches; + if (reader->get_subscription_matched_status(matches) != DDS::RETCODE_OK) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" get_subscription_matched_status failed!\n")), + 1); + } + + DDS::ConditionSeq conditions; + DDS::Duration_t timeout = { 60, 0 }; + if (ws->wait(conditions, timeout) != DDS::RETCODE_OK) { + ACE_ERROR_RETURN((LM_ERROR, + ACE_TEXT("ERROR: %N:%l: main() -") + ACE_TEXT(" wait failed!\n")), + 1); + } + + // Cleanup + participant->delete_contained_entities(); + opendds->delete_participant(participant.in()); + TheServiceParticipant->shutdown(); + + } catch (const CORBA::Exception& e) { + e._tao_print_exception("Exception caught in main():"); + return 1; + } + + return 0; +} diff --git a/tests/basic_test/subscriber.py b/tests/basic_test/subscriber.py index f5bb33b..6a3c49e 100644 --- a/tests/basic_test/subscriber.py +++ b/tests/basic_test/subscriber.py @@ -1,28 +1,40 @@ import sys +import time from datetime import timedelta from pyopendds import \ init_opendds, DomainParticipant, StatusKind, PyOpenDDS_Error -from pybasic.basic import Reading +from pybasic.basic import * + + +class TestClass: + def listener_func(self, sample: Reading): + print("main callback !", file=sys.stderr) + print(sample) + # todo: investigate the need of this sleep + time.sleep(1) + if __name__ == "__main__": try: + listener = TestClass() # Initialize OpenDDS and Create DDS Entities - init_opendds(opendds_debug_level=1) + init_opendds(opendds_debug_level=10) domain = DomainParticipant(34) topic = domain.create_topic('Readings', Reading) subscriber = domain.create_subscriber() - reader = subscriber.create_datareader(topic) + reader = subscriber.create_datareader(topic=topic, listener=listener.listener_func) # Wait for Publisher to Connect print('Waiting for Publisher...') - reader.wait_for(StatusKind.SUBSCRIPTION_MATCHED, timedelta(seconds=5)) + reader.wait_for(StatusKind.SUBSCRIPTION_MATCHED, timedelta(seconds=30)) print('Found Publisher!') # Read and Print Sample - print(reader.take_next_sample()) + # print(reader.take_next_sample()) + time.sleep(60) print('Done!') - except PyOpenDDS_Error as e: + except Exception as e: sys.exit(e)