-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathadmin_api.py
More file actions
83 lines (67 loc) · 2.96 KB
/
admin_api.py
File metadata and controls
83 lines (67 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
from confluent_kafka.admin import AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigSource
from confluent_kafka import KafkaException
import sys
import threading
import logging
# References:
# https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/adminapi.py
# https://docs.confluent.io/3.0.0/clients/confluent-kafka-python/index.html#
# https://docs.confluent.io/current/clients/python.html
class CustomAdmin:
def __init__(self, broker):
self.admin = AdminClient({'bootstrap.servers': broker})
def topic_exists(self, topic_name):
"""Return True if topic exists, False otherwise"""
metadata = self.admin.list_topics(timeout=10)
return topic_name in metadata.topics
def create_topics(self, topic_names):
""" Create topics """
new_topics = [NewTopic(topic_name,
num_partitions=3,
replication_factor=3)
for topic_name in topic_names]
# Call create_topics to asynchronously create topics, a dict
# of <topic,future> is returned.
futures = self.admin.create_topics(new_topics)
# Wait for operation to finish.
# Timeouts are preferably controlled by passing request_timeout=15.0
# to the create_topics() call.
# All futures will finish at the same time.
for topic, future in futures.items():
try:
future.result() # The result itself is None
print("Topic created: {}".format(topic))
except Exception as e:
print("Failed to create topic {}: {}".format(topic, e))
def print_all_metadata(self, args=[]):
""" list topics and cluster metadata """
""" this is copied almost entirely from the reference file"""
if len(args) == 0:
what = "all"
else:
what = args[0]
md = self.admin.list_topics(timeout=10)
print("Cluster {} metadata (response from broker {}):".format(md.cluster_id, md.orig_broker_name))
if what in ("all", "brokers"):
print(" {} brokers:".format(len(md.brokers)))
for b in iter(md.brokers.values()):
if b.id == md.controller_id:
print(" {} (controller)".format(b))
else:
print(" {}".format(b))
if what not in ("all", "topics"):
return
print(" {} topics:".format(len(md.topics)))
for t in iter(md.topics.values()):
if t.error is not None:
errstr = ": {}".format(t.error)
else:
errstr = ""
print(" \"{}\" with {} partition(s){}".format(t, len(t.partitions), errstr))
for p in iter(t.partitions.values()):
if p.error is not None:
errstr = ": {}".format(p.error)
else:
errstr = ""
print(" partition {} leader: {}, replicas: {}, isrs: {}".format(
p.id, p.leader, p.replicas, p.isrs, errstr))