Skip to content

Commit 4bec6bc

Browse files
authored
Make Message Object Instantiable By Users (#2128)
* dev changes to allow message object to be instantiated * add test cases for message class * modified some comments * remove data member definition from type hints * reformat test_message.py
1 parent 36a9e6c commit 4bec6bc

File tree

3 files changed

+174
-8
lines changed

3 files changed

+174
-8
lines changed

src/confluent_kafka/cimpl.pyi

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -76,9 +76,14 @@ class KafkaException(Exception):
7676
args: Tuple[Any, ...]
7777

7878
class Message:
79-
def topic(self) -> str: ...
80-
def partition(self) -> int: ...
81-
def offset(self) -> int: ...
79+
def __init__(self, topic: Optional[str] = ..., partition: Optional[int] = ..., offset: Optional[int] = ...,
80+
key: Optional[bytes] = ..., value: Optional[bytes] = ...,
81+
headers: Optional[HeadersType] = ..., error: Optional[KafkaError] = ...,
82+
timestamp: Optional[Tuple[int, int]] = ..., latency: Optional[float] = ...,
83+
leader_epoch: Optional[int] = ...) -> None: ...
84+
def topic(self) -> Optional[str]: ...
85+
def partition(self) -> Optional[int]: ...
86+
def offset(self) -> Optional[int]: ...
8287
def key(self) -> Optional[bytes]: ...
8388
def value(self) -> Optional[bytes]: ...
8489
def headers(self) -> Optional[HeadersType]: ...

src/confluent_kafka/src/confluent_kafka.c

Lines changed: 105 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -689,13 +689,114 @@ static int Message_clear (Message *self) {
689689
return 0;
690690
}
691691

692-
693692
static void Message_dealloc (Message *self) {
694693
Message_clear(self);
695694
PyObject_GC_UnTrack(self);
696695
Py_TYPE(self)->tp_free((PyObject *)self);
697696
}
698697

698+
static int Message_init (PyObject *self0, PyObject *args, PyObject *kwargs) {
699+
Message *self = (Message *)self0;
700+
PyObject *topic = NULL;
701+
int32_t partition = RD_KAFKA_PARTITION_UA;
702+
int64_t offset = RD_KAFKA_OFFSET_INVALID;
703+
PyObject *key = NULL;
704+
PyObject *value = NULL;
705+
PyObject *headers = NULL;
706+
PyObject *error = NULL;
707+
PyObject *timestamp = NULL;
708+
double latency = -1;
709+
int32_t leader_epoch = -1;
710+
711+
static char *kws[] = { "topic",
712+
"partition",
713+
"offset",
714+
"key",
715+
"value",
716+
"headers",
717+
"error",
718+
"timestamp",
719+
"latency",
720+
"leader_epoch",
721+
NULL };
722+
723+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OiLOOOOOdi", kws,
724+
&topic, &partition, &offset,
725+
&key, &value, &headers, &error,
726+
&timestamp, &latency, &leader_epoch)) {
727+
return -1;
728+
}
729+
730+
/* Initialize all PyObject fields to NULL first */
731+
self->topic = NULL;
732+
self->value = NULL;
733+
self->key = NULL;
734+
self->headers = NULL;
735+
#ifdef RD_KAFKA_V_HEADERS
736+
self->c_headers = NULL;
737+
#endif
738+
self->error = NULL;
739+
740+
/* Set topic (string) */
741+
if (topic && topic != Py_None) {
742+
Py_INCREF(topic);
743+
self->topic = topic;
744+
}
745+
746+
/* Set key (bytes) */
747+
if (key && key != Py_None) {
748+
Py_INCREF(key);
749+
self->key = key;
750+
}
751+
752+
/* Set value (bytes) */
753+
if (value && value != Py_None) {
754+
Py_INCREF(value);
755+
self->value = value;
756+
}
757+
758+
/* Set headers (list of tuples) */
759+
if (headers && headers != Py_None) {
760+
Py_INCREF(headers);
761+
self->headers = headers;
762+
}
763+
764+
/* Set error (KafkaError) */
765+
if (error && error != Py_None) {
766+
Py_INCREF(error);
767+
self->error = error;
768+
}
769+
770+
/* Set timestamp (tuple of (timestamp_type, timestamp)) */
771+
if (timestamp && timestamp != Py_None) {
772+
if (!PyTuple_Check(timestamp) || PyTuple_Size(timestamp) != 2) {
773+
PyErr_SetString(PyExc_TypeError,
774+
"timestamp must be a tuple of (int, int)");
775+
return -1;
776+
}
777+
self->tstype = (rd_kafka_timestamp_type_t)cfl_PyInt_AsInt(PyTuple_GET_ITEM(timestamp, 0));
778+
self->timestamp = PyLong_AsLongLong(PyTuple_GET_ITEM(timestamp, 1));
779+
}
780+
else {
781+
self->tstype = RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
782+
self->timestamp = 0;
783+
}
784+
785+
self->partition = partition < 0 ? RD_KAFKA_PARTITION_UA: partition;
786+
self->offset = offset < 0 ? RD_KAFKA_OFFSET_INVALID: offset;
787+
self->leader_epoch = leader_epoch < 0 ? -1: leader_epoch;
788+
self->latency = (int64_t)(latency < 0 ? -1: latency * 1000000.0);
789+
790+
return 0;
791+
}
792+
793+
794+
795+
static PyObject *Message_new (PyTypeObject *type, PyObject *args,
796+
PyObject *kwargs) {
797+
return type->tp_alloc(type, 0);
798+
}
799+
699800

700801
static int Message_traverse (Message *self,
701802
visitproc visit, void *arg) {
@@ -749,8 +850,6 @@ PyTypeObject MessageType = {
749850
"object is a proper message (error() returns None) or an "
750851
"error/event.\n"
751852
"\n"
752-
"This class is not user-instantiable.\n"
753-
"\n"
754853
".. py:function:: len()\n"
755854
"\n"
756855
" :returns: Message value (payload) size in bytes\n"
@@ -770,8 +869,9 @@ PyTypeObject MessageType = {
770869
0, /* tp_descr_get */
771870
0, /* tp_descr_set */
772871
0, /* tp_dictoffset */
773-
0, /* tp_init */
774-
0 /* tp_alloc */
872+
Message_init, /* tp_init */
873+
0, /* tp_alloc */
874+
Message_new /* tp_new */
775875
};
776876

777877
/**

tests/test_message.py

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
# #!/usr/bin/env python
2+
3+
from confluent_kafka import Message, KafkaError
4+
5+
6+
def test_init_no_params():
7+
m = Message()
8+
assert m.topic() is None
9+
assert m.partition() is None
10+
assert m.offset() is None
11+
assert m.key() is None
12+
assert m.value() is None
13+
assert m.headers() is None
14+
assert m.error() is None
15+
assert m.timestamp() == (0, 0)
16+
assert m.latency() is None
17+
assert m.leader_epoch() is None
18+
19+
20+
def test_init_all_params():
21+
m = Message(topic="test", partition=1, offset=2, key=b"key", value=b"value", headers=[("h1", "v1")],
22+
error=KafkaError(0),
23+
timestamp=(1, 1762499956), latency=0.05, leader_epoch=1762499956)
24+
assert m.topic() == "test"
25+
assert m.partition() == 1
26+
assert m.offset() == 2
27+
assert m.key() == b"key"
28+
assert m.value() == b"value"
29+
assert m.headers() == [("h1", "v1")]
30+
assert m.error() == KafkaError(0)
31+
assert m.timestamp() == (1, 1762499956)
32+
assert m.latency() == 0.05
33+
assert m.leader_epoch() == 1762499956
34+
35+
36+
def test_init_negative_param_values():
37+
m = Message(partition=-1, offset=-1, latency=-1.0, leader_epoch=-1762499956)
38+
assert m.partition() is None
39+
assert m.offset() is None
40+
assert m.latency() is None
41+
assert m.leader_epoch() is None
42+
43+
44+
def test_set_headers():
45+
m = Message()
46+
m.set_headers([("h1", "v1")])
47+
assert m.headers() == [("h1", "v1")]
48+
m.set_headers([("h2", "v2")])
49+
assert m.headers() == [("h2", "v2")]
50+
51+
52+
def test_set_key():
53+
m = Message()
54+
m.set_key(b"key")
55+
assert m.key() == b"key"
56+
57+
58+
def test_set_value():
59+
m = Message()
60+
m.set_value(b"value")
61+
assert m.value() == b"value"

0 commit comments

Comments
 (0)