diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index cb84419..6beec8f 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -94,6 +94,28 @@ def is_dry_run(args): return True return False +def save_plan(args, move_partitions): + if not args.save_plan_path: + return + + folder, filename = args.save_plan_path.rsplit("/", 1) + if os.path.isdir(folder) and (not os.path.isdir(args.save_plan_path)): + plans_as_json_list = [p.dict_for_reassignment() for p in move_partitions] + with open(args.save_plan_path, "w") as f: + f.write(json.dumps(plans_as_json_list, indent=4)) + log.info("Saved plan at location={}".format(args.save_plan_path)) + else: + raise FileExistsError("Given file path={} is a directory not File".format(args.save_plan_path)) + +def get_throttle_limit(args): + throttle = args.throttle + if args.throttle_limit_file_path: + with open(args.throttle_limit_file_path) as f: + try: + throttle = int(f.read()) + except Exception as e: + log.error("Error while getting throttle limit from file {}".format(e)) + return throttle def main(): # Start by loading all the modules @@ -103,6 +125,15 @@ def main(): # Set up and parse all CLI arguments args = set_up_arguments(action_map, sizer_map, plugins) + cluster = Cluster.create_from_zookeeper(args.zookeeper, getattr(args, 'default_retention', 1)) + + # if include_topics list is empty it means we have to include all topics. + # if include_topics is non-empty then except include topics all will be in exclude_topics list + if args.include_topics and len(args.include_topics) > 0: + for topic in cluster.topics.keys(): + if topic not in args.include_topics: + args.exclude_topics.append(topic) + run_plugins_at_step(plugins, 'set_arguments', args) tools_path = get_tools_path(args.tools_path) @@ -124,6 +155,7 @@ def main(): print_leadership("after", newcluster, args.leadership) move_partitions = cluster.changed_partitions(action_to_run.cluster) + save_plan(args, move_partitions) batches = split_partitions_into_batches(move_partitions, batch_size=args.moves, use_class=Reassignment) run_plugins_at_step(plugins, 'set_batches', batches) @@ -133,7 +165,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, get_throttle_limit(args), plugins, dry_run) run_plugins_at_step(plugins, 'before_ple') diff --git a/kafka/tools/assigner/actions/balance.py b/kafka/tools/assigner/actions/balance.py index d57afc9..a16d70c 100644 --- a/kafka/tools/assigner/actions/balance.py +++ b/kafka/tools/assigner/actions/balance.py @@ -27,7 +27,20 @@ class ActionBalance(ActionModule): needs_sizes = True def __init__(self, args, cluster): - if "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware": + + + if "topic_partition_in_rack" in args.types: + if args.types[len(args.types)-1] != "topic_partition_in_rack": + raise BalanceException( + "In order to work properly, topic_partition_in_rack must always be the last module specified" + ) + + elif "rackaware" in args.types and args.types[len(args.types)-2] != "rackaware": + raise BalanceException( + "In order to work properly, if you specify topic_partition_in_rack and " + + "rackaware both module then rackaware must always be in 2nd last & topic_partition_in_rack always be last") + + elif "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware": raise BalanceException("In order to work properly, rackaware must always be the last module specified") super(ActionBalance, self).__init__(args, cluster) diff --git a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py new file mode 100644 index 0000000..e88faad --- /dev/null +++ b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py @@ -0,0 +1,230 @@ +import math +from kafka.tools import log +from prettytable import PrettyTable +from kafka.tools.exceptions import BalanceException +from kafka.tools.assigner.actions import ActionBalanceModule + + +class ActionBalanceTopicPartitionInRack(ActionBalanceModule): + name = "topic_partition_in_rack" + helpstr = "Reassign topic partition replicas to assure they are almost equal distributed among brokers in a racks" + + def __init__(self, args, cluster): + super(ActionBalanceTopicPartitionInRack, self).__init__(args, cluster) + + def __calculate_max_possible_count(self, broker_count): + total_count = sum(map(lambda x: x[1], broker_count)) + return int(math.ceil(total_count * 1.0 / len(broker_count))) + + def _unskew_topic_partition_in_rack(self, topic, partitions): + """ + For given partitions of a topic as params applying following algorithm + + 1. Grouping partitions based on rack id (getting rack id from broker). + 2. In a rack, sorting (reverse) brokers based on number of partitions present. + 3. Balancing partitions on brokers by moving partitions from left to right + """ + + # Creating a map of rack id, broker and count of partition on broker + rack_broker_count, broker_partition = {}, {} + for p in partitions: + for broker in p.replicas: + if broker.rack not in rack_broker_count: + rack_broker_count[broker.rack] = {} + + if broker.id not in rack_broker_count[broker.rack]: + rack_broker_count[broker.rack][broker.id] = 0 + + if broker.id not in broker_partition: + broker_partition[broker.id] = set() + + broker_partition[broker.id].add(p) + rack_broker_count[broker.rack][broker.id] += 1 + # For a rack, balancing partitions + for rack_id in rack_broker_count: + broker_count = sorted( + map( + lambda x: list(x), + rack_broker_count[rack_id].items() + ), + key=lambda x: x[1], + reverse=True + ) + + # calculating max possible number of partitions in a broker + count = self.__calculate_max_possible_count(broker_count) + + # On sorted broker_count list, + # balancing number of partitions from left to right + left, right, adjusted_count = 0, len(broker_count) - 1, 0 + while left < right: + bleft = broker_count[left] + bid, bcount = bleft[0], bleft[1] + adjusted_count = bcount - count + if adjusted_count <= 0: + left += 1 + continue + + for p in broker_partition[bid]: + bright = broker_count[right] + if bright[1] >= count: + right -= 1 + + if left >= right or adjusted_count == 0: + break + + replicas = [b.id for b in p.replicas] + if bid in replicas and bright[0] not in replicas: + index = replicas.index(bid) + p.swap_replicas(p.replicas[index], self.cluster.brokers[bright[0]]) + adjusted_count -= 1 + bleft[1] -= 1 + bright[1] += 1 + left += 1 + + + def _unskew_topic_partition_leader_in_rack(self, topic, partitions): + + # Creating a map of rack id, broker and count of partition leader on broker + rack_broker_leader_count_map, broker_partition = {}, {} + for p in partitions: + for broker in p.replicas: + if broker.rack not in rack_broker_leader_count_map: + rack_broker_leader_count_map[broker.rack] = {} + + if broker.id not in rack_broker_leader_count_map[broker.rack]: + rack_broker_leader_count_map[broker.rack][broker.id] = 0 + + if p.replicas.index(broker) == 0: + rack_broker_leader_count_map[broker.rack][broker.id] += 1 + + if broker.id not in broker_partition: + broker_partition[broker.id] = set() + + broker_partition[broker.id].add(p) + + # For a rack, balancing partitions leader + for rack_id in rack_broker_leader_count_map: + broker_count = sorted( + map( + lambda x: list(x), + rack_broker_leader_count_map[rack_id].items() + ), + key=lambda x:x[1], + reverse=True + ) + + count = self.__calculate_max_possible_count(broker_count) + left, right, adjusted_count = 0, len(broker_count) - 1, 0 + while left < right: + bleft = broker_count[left] + bid, bcount = bleft[0], bleft[1] + adjusted_count = bcount - count + if adjusted_count <= 0: + left += 1 + continue + + for p in broker_partition[bid]: + bright = broker_count[right] + if bright[1] >= count: + right -= 1 + + if left >= right or adjusted_count == 0: + break + + replicas = [b.id for b in p.replicas] + if not(bid in replicas and bright[0] not in replicas and replicas.index(bid) == 0): + continue + + # Swaping leader from left side broker and non leader from right side broker + p.swap_replicas(p.replicas[0], self.cluster.brokers[bright[0]]) + adjusted_count -= 1 + bleft[1] -= 1 + bright[1] += 1 + + for p2 in broker_partition[bright[0]]: + replicas = [b.id for b in p2.replicas] + if bright[0] in replicas and replicas.index(bright[0]) != 0 and bid not in replicas: + index2 = replicas.index(bright[0]) + p2.swap_replicas(p2.replicas[index2], self.cluster.brokers[bid]) + break + left += 1 + + def process_cluster(self): + log.info("Starting {} module".format(self.name)) + + # Check if rack information is set for the cluster + for broker in self.cluster.brokers.values(): + if broker.rack: + continue + raise BalanceException("Cannot balance cluster by rack as it has no rack information") + + for topic in self.cluster.topics: + if topic in self.args.exclude_topics: + log.debug("Skipping topic {0} as it is explicitly excluded".format(topic)) + continue + + before_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + self._unskew_topic_partition_in_rack( + topic, self.cluster.topics[topic].partitions) + + after_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + self._unskew_topic_partition_leader_in_rack( + topic, self.cluster.topics[topic].partitions) + + after_leader_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + + self.__stats( + topic, + before_rearrange, + after_rearrange, + after_leader_rearrange + ) + + def __stats(self, topic, before, after, after_leader): + table = PrettyTable() + table.field_names = [ + "broker_id", "rack_id", "before(leader + follower = total)", + "after(leader + follower = total)", "after_leader(leader + follower = total)" + ] + + fmt = "{} + {} = {}" + + for _id in before: + table.add_row([ + _id, self.cluster.brokers[_id].rack, + fmt.format( + before[_id]["leader"], before[_id]["follower"], + before[_id]["leader"] + before[_id]["follower"] + ), + fmt.format( + after[_id]["leader"], after[_id]["follower"], + after[_id]["leader"] + after[_id]["follower"] + ), + fmt.format( + after_leader[_id]["leader"], after_leader[_id]["follower"], + after_leader[_id]["leader"] + after_leader[_id]["follower"] + ) + ]) + log.info("\n" + table.get_string(sortby="rack_id")) + + + def __create_count_map(self, partitions): + """Create broker, leader & follower count dict.""" + count_map = {} + for p in partitions: + for i in range(len(p.replicas)): + broker_id = p.replicas[i].id + if broker_id not in count_map: + count_map[broker_id] = {"leader": 0, "follower": 0} + if i == 0: + count_map[broker_id]["leader"] += 1 + else: + count_map[broker_id]["follower"] += 1 + return count_map diff --git a/kafka/tools/assigner/actions/execute_plan.py b/kafka/tools/assigner/actions/execute_plan.py new file mode 100644 index 0000000..3887ef7 --- /dev/null +++ b/kafka/tools/assigner/actions/execute_plan.py @@ -0,0 +1,60 @@ +import json +from kafka.tools.assigner.actions import ActionModule +from kafka.tools.assigner.arguments import file_path_checker +from kafka.tools.models.partition import Partition +from kafka.tools.exceptions import InvalidPlanFormatException, UnknownBrokerException + + +class ActionExecutePlan(ActionModule): + name = "execute_plan" + helpstr = "Execute plan from given path" + needs_sizes = False + + def __init__(self, args, cluster): + super(ActionExecutePlan, self).__init__(args, cluster) + + @classmethod + def _add_args(cls, parser): + parser.add_argument( + '--plan-file-path', + required=True, + help="File path where kafka re-assign plan is stored in json format", + type=file_path_checker + ) + + def __check_plan_format(self, plan): + """Expecting plan will of type list of dict""" + if not (isinstance(plan, list) and len(plan) > 0 and isinstance(plan[0], dict)): + raise InvalidPlanFormatException() + + # Check all broker id in plan must be present in cluster + brokers_in_plan, brokers_in_cluster = set(), set(self.cluster.brokers.keys()) + + for partition_plan in plan: + brokers_in_plan.update(partition_plan["replicas"]) + + missing = list(brokers_in_plan - brokers_in_cluster) + if missing: + raise UnknownBrokerException( + "Broker ids = {} in plan not present in cluster".format(str(missing)) + ) + + def process_cluster(self): + plan = json.load(open(self.args.plan_file_path)) + self.__check_plan_format(plan) + + # check broker exist in cluster or not in plan + + topic_track = {} + + for partition_plan in plan: + if partition_plan['topic'] not in topic_track: + self.cluster.topics[partition_plan["topic"]].partitions = [] + topic_track[partition_plan['topic']] = True + + topic = self.cluster.topics[partition_plan["topic"]] + partition = Partition(topic, partition_plan["partition"]) + topic.partitions.append(partition) + + for replica in partition_plan["replicas"]: + partition.add_replica(self.cluster.brokers[replica]) diff --git a/kafka/tools/assigner/arguments.py b/kafka/tools/assigner/arguments.py index bb06b25..ccc7242 100644 --- a/kafka/tools/assigner/arguments.py +++ b/kafka/tools/assigner/arguments.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import os import argparse import pkg_resources @@ -35,6 +36,12 @@ def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, val_array) +def file_path_checker(path): + """Check is given file path exist or not""" + if not os.path.isfile(path): + raise IOError("File path: {} not exists or its not file".format(path)) + return path + # action_map is a map of names to ActionModule children - the top level actions that can be called # sizer_map is a map of names to SizerModule children - the modules to get partition sizes def set_up_arguments(action_map, sizer_map, plugins): @@ -48,6 +55,8 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('-e', '--execute', help="Execute partition reassignment", action='store_true') aparser.add_argument('-m', '--moves', help="Max number of moves per step", required=False, default=10, type=int) aparser.add_argument('-x', '--exclude-topics', help="Comma-separated list of topics to skip when performing actions", action=CSVAction, default=[]) + aparser.add_argument('-i', '--include-topics', help="Comma-separated list of topics to include when performing actions. Default will be empty list means include all topics", + action=CSVAction, default=[]) aparser.add_argument('--sizer', help="Select module to use to get partition sizes", required=False, default='ssh', choices=sizer_map.keys()) aparser.add_argument('-p', '--property', help="Property of the form 'key=value' to be passed to modules (i.e. sizer)", required=False, default=[], action='append') @@ -57,6 +66,12 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('--ple-wait', help="Time in seconds to wait between preferred leader elections", required=False, default=120, type=int) aparser.add_argument('--tools-path', help="Path to Kafka admin utilities, overriding PATH env var", required=False) aparser.add_argument('--output-json', help="Output JSON-formatted cluster information to stdout", default=False, action='store_true') + aparser.add_argument('--throttle', help="The movement of partitions between brokers will be throttled to this value (bytes/sec)", default=25000000, type=int) + aparser.add_argument('--throttle-limit-file-path', + help="Similar to throttle argument but here taking input from file. If throttle & throttle-limit-file-path is given then throttle-limit-file-path will take precedence", + required=False, type=file_path_checker + ) + aparser.add_argument('--save-plan-path', help="Save the generated plan at given path", required=False, type=str) # Call action module arg setup subparsers = aparser.add_subparsers(help='Select manipulation module to use') diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index 2c5cb15..1699736 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -44,22 +44,23 @@ def dict_for_reassignment(self): reassignment['partitions'].append(partition.dict_for_reassignment()) return reassignment - def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True): + def execute(self, num, total, zookeeper, tools_path, throttle, plugins=[], dry_run=True): for plugin in plugins: plugin.before_execute_batch(num) if not dry_run: - self._execute(num, total, zookeeper, tools_path) + self._execute(num, total, zookeeper, tools_path, throttle) for plugin in plugins: plugin.after_execute_batch(num) - def _execute(self, num, total, zookeeper, tools_path): + def _execute(self, num, total, zookeeper, tools_path, throttle): with NamedTemporaryFile(mode='w') as assignfile: json.dump(self.dict_for_reassignment(), assignfile) assignfile.flush() FNULL = open(os.devnull, 'w') proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', '--zookeeper', zookeeper, - '--reassignment-json-file', assignfile.name], + '--reassignment-json-file', assignfile.name, + '--throttle', str(throttle)], stdout=FNULL, stderr=FNULL) proc.wait() diff --git a/kafka/tools/exceptions.py b/kafka/tools/exceptions.py index cebd421..737853c 100644 --- a/kafka/tools/exceptions.py +++ b/kafka/tools/exceptions.py @@ -91,3 +91,6 @@ class OffsetError(ClientError): class ConfigurationError(ClientError): errstr = "There was an error configuring the client" + +class InvalidPlanFormatException(KafkaToolsException): + errstr = "There was an error while parsing plan for given file path. Expecting plan should be list of dict" diff --git a/setup.py b/setup.py index 299c3ce..62b931f 100755 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ 'JPype1', 'kazoo', 'pex', + 'prettytable' ] @@ -105,7 +106,7 @@ def run(self): # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='0.1.9', + version='0.1.10', author='Todd Palino', author_email='tpalino@linkedin.com', @@ -141,6 +142,7 @@ def run(self): 'pytest', 'testfixtures', 'timeout-decorator', + 'prettytable' ], setup_requires=[ 'pytest-runner', diff --git a/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py b/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py new file mode 100644 index 0000000..e18b6b5 --- /dev/null +++ b/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py @@ -0,0 +1,121 @@ +import sys +import unittest +from argparse import Namespace +from ..fixtures import set_up_subparser, set_up_cluster_9broker +from kafka.tools.assigner.actions.balance import ActionBalance +from kafka.tools.assigner.actions.balancemodules.topic_partition_in_rack import ActionBalanceTopicPartitionInRack + + +class ActionBalanceTopicPartitionInRackTests(unittest.TestCase): + def setUp(self): + self.cluster = set_up_cluster_9broker() + self.parser, self.subparser = set_up_subparser() + self.args = Namespace(exclude_topics=[]) + + def test_configure_args(self): + ActionBalance.configure_args(self.subparser) + sys.argv = ['kafka-assigner', 'balance', '-t', 'topic_partition_in_rack'] + parsed_args = self.parser.parse_args() + assert parsed_args.action == 'balance' + + def test_create_class(self): + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + assert isinstance(action, ActionBalanceTopicPartitionInRack) + + def test_process_cluster_partition_unskew_leader_skew(self): + self.args.exclude_topics = ["topic2"] + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + + + broker_partitions_before_and_after_in_topic1 = { + 1: [3,2], + 2: [2,2], + 3: [1,2] + } + + action.process_cluster() + broker_partition_count = {} + for p in self.cluster.topics["topic1"].partitions: + for i, b in enumerate(p.replicas): + if b.id not in broker_partition_count: + broker_partition_count[b.id] = 0 + broker_partition_count[b.id] += 1 + + for i in broker_partitions_before_and_after_in_topic1: + assert (broker_partitions_before_and_after_in_topic1[i][1] ==\ + broker_partition_count[i]) + + def test_process_cluster_partition_unskew_leader_unskew(self): + self.args.exclude_topics = ["topic1"] + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + """ + BEFORE + {'partitions': + { + 0: {'size': 0, 'replicas': [7, 8, 1]}, + 1: {'size': 0, 'replicas': [7, 3, 6]}, + 2: {'size': 0, 'replicas': [7, 9, 5]}, + 3: {'size': 0, 'replicas': [5, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + + AFTER RE-ARRANGE + {'partitions': + { + 0: {'size': 0, 'replicas': [9, 8, 2]}, + 1: {'size': 0, 'replicas': [7, 3, 6]}, + 2: {'size': 0, 'replicas': [7, 9, 6]}, + 3: {'size': 0, 'replicas': [5, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + + AFTER LEADER RE-ARRANGE + {'partitions': + { + 0: {'size': 0, 'replicas': [9, 7, 2]}, + 1: {'size': 0, 'replicas': [8, 3, 5]}, + 2: {'size': 0, 'replicas': [7, 9, 6]}, + 3: {'size': 0, 'replicas': [6, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + """ + + # input & expected output for topic: topic2 + broker_partitions_before_and_after_in_topic2 = { + 1: [3,2], + 5: [3,2], + 6: [1,2], + 7: [3,2], + 8: [2,2], + 9: [1,2] + } + + broker_partitions_leader_before_and_after_in_topic2 = { + 1: [0,0], + 5: [2,1], + 6: [0,1], + 7: [3,1], + 8: [0,1], + 9: [0,1] + } + + action.process_cluster() + + broker_partition_count_leader_count = {} + for p in self.cluster.topics["topic2"].partitions: + for i, b in enumerate(p.replicas): + if b.id not in broker_partition_count_leader_count: + broker_partition_count_leader_count[b.id] = [0, 0] + + if i == 0: + broker_partition_count_leader_count[b.id][1] += 1 + broker_partition_count_leader_count[b.id][0] += 1 + + for i in broker_partitions_before_and_after_in_topic2: + assert (broker_partitions_leader_before_and_after_in_topic2[i][1] ==\ + broker_partition_count_leader_count[i][1]) + assert (broker_partitions_before_and_after_in_topic2[i][1] ==\ + broker_partition_count_leader_count[i][0]) + + def tearDown(self): + pass diff --git a/tests/tools/assigner/actions/fixtures.py b/tests/tools/assigner/actions/fixtures.py index b8327c0..f8a262b 100644 --- a/tests/tools/assigner/actions/fixtures.py +++ b/tests/tools/assigner/actions/fixtures.py @@ -1,7 +1,7 @@ import argparse - +import json from kafka.tools.models.broker import Broker -from kafka.tools.models.cluster import Cluster +from kafka.tools.models.cluster import Cluster, add_topic_with_replicas from kafka.tools.models.topic import Topic @@ -85,3 +85,144 @@ def set_up_subparser(): aparser = argparse.ArgumentParser(prog='kafka-assigner', description='Rejigger Kafka cluster partitions') subparsers = aparser.add_subparsers(help='Select manipulation module to use') return (aparser, subparsers) + + +def set_up_cluster_9broker(): + cluster_dict = { + "brokers": [ + { + "1": { + "id": 1, + "rack": "1", + "jmx_port": 9999, + "host": "localhost1", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "2": { + "id": 2, + "rack": "1", + "jmx_port": 9999, + "host": "localhost2", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "3": { + "id": 3, + "rack": "1", + "jmx_port": 9999, + "host": "localhost3", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + + }, + { + "4": { + "id": 4, + "rack": "2", + "jmx_port": 9999, + "host": "localhost4", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "5": { + "id": 5, + "rack": "2", + "jmx_port": 9999, + "host": "localhost5", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "6": { + "id": 6, + "rack": "2", + "jmx_port": 9999, + "host": "localhost6", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "7": { + "id": 7, + "rack": "3", + "jmx_port": 9999, + "host": "localhost7", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + + }, + { + "8": { + "id": 8, + "rack": "3", + "jmx_port": 9999, + "host": "localhost8", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "9": { + "id": 9, + "rack": "3", + "jmx_port": 9999, + "host": "localhost9", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + } + ], + "topics": { + "topic1": { + "partitions": { + "0": [1, 2, 4], + "1": [1, 5, 3], + "2": [2, 6, 8], + "3": [1, 9, 7] + } + }, + "topic2": { + "partitions": { + "0": [7, 8, 1], + "1": [7, 3, 6], + "2": [7, 9, 5], + "3": [5, 2, 1], + "4": [5, 1, 8] + } + } + } + } + + + cluster = Cluster(retention=3) + for broker in cluster_dict["brokers"]: + cluster.add_broker( + Broker.create_from_json( + int(list(broker.keys())[0]), + json.dumps(list(broker.values())[0]) + ) + ) + + for topic, topic_data in cluster_dict["topics"].items(): + add_topic_with_replicas(cluster, topic, topic_data) + return cluster diff --git a/tests/tools/assigner/actions/test_execute_plan.py b/tests/tools/assigner/actions/test_execute_plan.py new file mode 100644 index 0000000..07fcec0 --- /dev/null +++ b/tests/tools/assigner/actions/test_execute_plan.py @@ -0,0 +1,71 @@ +import sys +import json +import unittest +from mock import patch +from argparse import Namespace +from tempfile import NamedTemporaryFile +from .fixtures import set_up_cluster_9broker, set_up_subparser +from kafka.tools.assigner.actions.execute_plan import ActionExecutePlan +from kafka.tools.exceptions import InvalidPlanFormatException + + +class ActionExecutePlanTests(unittest.TestCase): + def setUp(self): + self.cluster = set_up_cluster_9broker() + self.parser, self.subparser = set_up_subparser() + self.args = Namespace() + self.wrong_plan_file = NamedTemporaryFile(mode="w") + self.correct_plan_file = NamedTemporaryFile(mode="w") + + self.__add_wrong_plan() + self.__add_correct_plan() + + def __add_wrong_plan(self): + self.wrong_plan_file.write( + json.dumps({"topic": "topic1", "partition": 0, "replicas": [1,2,8]}) + ) + self.wrong_plan_file.flush() + + def __add_correct_plan(self): + self.correct_plan_file.write( + json.dumps( + [ + {"topic": "topic1", "partition": 0,"replicas": [1,2,8]}, + {"topic": "topic1", "partition": 1, "replicas": [2,7,8]} + ] + ) + ) + self.correct_plan_file.flush() + + def test_create_class(self): + action = ActionExecutePlan(self.args, self.cluster) + assert isinstance(action, ActionExecutePlan) + + @patch("os.path.isfile") + def test_configure_args(self, mock_isfile): + ActionExecutePlan.configure_args(self.subparser) + sys.argv = ["kafka-assigner", "execute_plan", "--plan-file-path", "/tmp/path"] + args = self.parser.parse_args() + mock_isfile.assert_called_once_with("/tmp/path") + assert args.action == "execute_plan" + + def test_wrong_plan_format(self): + self.args.plan_file_path = self.wrong_plan_file.name + action = ActionExecutePlan(self.args, self.cluster) + self.assertRaises(InvalidPlanFormatException, action.process_cluster) + + def test_correct_plan_format(self): + self.args.plan_file_path = self.correct_plan_file.name + action = ActionExecutePlan(self.args, self.cluster) + action.process_cluster() + partition = list(filter(lambda x: int(x.num) == 0, self.cluster.topics["topic1"].partitions)) + assert len(partition) == 1 + self.assertListEqual( + [b.id for b in partition[0].replicas], + [1,2,8] + ) + + def tearDown(self): + self.wrong_plan_file.close() + self.correct_plan_file.close() + diff --git a/tests/tools/assigner/models/test_reassignment.py b/tests/tools/assigner/models/test_reassignment.py index e6491d1..6923c89 100644 --- a/tests/tools/assigner/models/test_reassignment.py +++ b/tests/tools/assigner/models/test_reassignment.py @@ -41,12 +41,12 @@ def test_reassignment_repr(self): @patch.object(Reassignment, '_execute') def test_reassignment_execute_real(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False) - mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 25000000, plugins=[self.null_plugin], dry_run=False) + mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', 25000000) @patch.object(Reassignment, '_execute') def test_reassignment_execute_dryrun(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=True) + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 25000000, plugins=[self.null_plugin], dry_run=True) mock_exec.assert_not_called() @patch('kafka.tools.assigner.models.reassignment.subprocess.Popen', new_callable=MockPopen) @@ -55,9 +55,9 @@ def test_reassignment_internal_execute(self, mock_check, mock_popen): mock_popen.set_default() mock_check.side_effect = [10, 5, 0] - self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools', 25000000) - compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY], + compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY, '--throttle', ANY], stderr=ANY, stdout=ANY), call.Popen_instance.wait()], mock_popen.mock.method_calls) assert len(mock_check.mock_calls) == 3 diff --git a/tests/tools/assigner/test_main.py b/tests/tools/assigner/test_main.py index 665a6c6..b785176 100644 --- a/tests/tools/assigner/test_main.py +++ b/tests/tools/assigner/test_main.py @@ -3,7 +3,7 @@ from mock import call, patch -from kafka.tools.assigner.__main__ import main, get_plugins_list, check_and_get_sizes, run_preferred_replica_elections, run_plugins_at_step, is_dry_run +from kafka.tools.assigner.__main__ import main, get_plugins_list, check_and_get_sizes, run_preferred_replica_elections, run_plugins_at_step, is_dry_run, get_throttle_limit from kafka.tools.exceptions import ProgrammingException from kafka.tools.assigner.actions.balance import ActionBalance from kafka.tools.models.broker import Broker @@ -12,6 +12,7 @@ from kafka.tools.assigner.models.replica_election import ReplicaElection from kafka.tools.assigner.plugins import PluginModule from kafka.tools.assigner.sizers.ssh import SizerSSH +from tempfile import NamedTemporaryFile def set_up_cluster(): @@ -74,6 +75,8 @@ def test_main(self, mock_plugins, mock_sizes): moves=10, execute=False, exclude_topics=[], + include_topics=[], + existing_plan_path="/tmp/qwerty", generate=False, size=False, skip_ple=False, @@ -81,6 +84,9 @@ def test_main(self, mock_plugins, mock_sizes): ple_wait=120, sizer='ssh', leadership=True, + save_plan_path="/tmp/save_plan_path", + throttle=2500000, + throttle_limit_file_path=None, output_json=True) assert main() == 0 @@ -116,7 +122,7 @@ def test_get_sizes(self, mock_sizes): @patch.object(ReplicaElection, 'execute') def test_ple(self, mock_execute, mock_sleep): cluster = set_up_cluster() - args = argparse.Namespace(ple_wait=0, zookeeper='zkconnect', tools_path='/path/to/tools') + args = argparse.Namespace(ple_wait=0, zookeeper='zkconnect', tools_path='/path/to/tools', throttle=2500000) batches = [ReplicaElection(cluster.brokers[1].partitions, args.ple_wait), ReplicaElection(cluster.brokers[2].partitions, args.ple_wait)] run_preferred_replica_elections(batches, args, args.tools_path, [], False) @@ -124,3 +130,14 @@ def test_ple(self, mock_execute, mock_sleep): mock_sleep.assert_called_once_with(0) mock_execute.assert_has_calls([call(1, 2, 'zkconnect', '/path/to/tools', [], False), call(2, 2, 'zkconnect', '/path/to/tools', [], False)]) + + def test_throttle_limit(self): + args = argparse.Namespace(throttle_limit_file_path=None, throttle=30000) + assert get_throttle_limit(args) == 30000 + + with NamedTemporaryFile(mode='w') as f: + f.write("25000") + f.flush() + + args = argparse.Namespace(throttle_limit_file_path=f.name, throttle=30000) + assert get_throttle_limit(args) == 25000