From 30874dd6c469c4efe8ed896c6fa5ff7ca20be884 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Sun, 28 Nov 2021 01:37:52 +0530 Subject: [PATCH 1/6] Added throttle, save plan & execute saved plan support --- kafka/tools/assigner/__main__.py | 13 ++++++ kafka/tools/assigner/actions/execute_plan.py | 48 ++++++++++++++++++++ kafka/tools/assigner/arguments.py | 9 ++++ kafka/tools/assigner/models/reassignment.py | 3 +- kafka/tools/exceptions.py | 3 ++ 5 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 kafka/tools/assigner/actions/execute_plan.py diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index cb84419..21cb88c 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -94,6 +94,18 @@ def is_dry_run(args): return True return False +def save_plan(args, move_partitions): + if not args.existing_plan_path: + return + + folder, filename = args.save_plan_path.rsplit("/", 1) + if os.path.isdir(folder) and (not os.path.isdir(args.save_plan_path)): + plans_as_json_list = [p.dict_for_reassignment() for p in move_partitions] + with open(args.save_plan_path, "w") as f: + f.write(json.dumps(plans_as_json_list, indent=4)) + log.info("Saved plan at location={}".format(args.save_plan_path)) + else: + raise FileExistsError("Given file path={} is a directory not File".format(args.save_plan_path)) def main(): # Start by loading all the modules @@ -124,6 +136,7 @@ def main(): print_leadership("after", newcluster, args.leadership) move_partitions = cluster.changed_partitions(action_to_run.cluster) + save_plan(args, move_partitions) batches = split_partitions_into_batches(move_partitions, batch_size=args.moves, use_class=Reassignment) run_plugins_at_step(plugins, 'set_batches', batches) diff --git a/kafka/tools/assigner/actions/execute_plan.py b/kafka/tools/assigner/actions/execute_plan.py new file mode 100644 index 0000000..33b2acb --- /dev/null +++ b/kafka/tools/assigner/actions/execute_plan.py @@ -0,0 +1,48 @@ +import json +from kafka.tools.assigner.actions import ActionModule +from kafka.tools.assigner import file_path_checker +from kafka.tools.models.partition import Partition +from kafka.tools.exceptions import InvalidPlanFormatException + + +class ActionExecutePlan(ActionModule): + name = "execute_plan" + helpstr = "Execute plan from given path" + needs_sizes = False + + def __init__(self, args, cluster): + super(ActionExecutePlan, self).__init__(args, cluster) + + @classmethod + def _add_args(cls, parser): + parser.add_argument( + '--plan-file-path', + required=True, + help="File path where kafka re-assign plan is stored in json format", + type=file_path_checker + ) + + def __check_plan_format(self, plan): + """ + Expecting plan will of type list of dict + """ + if not (isinstance(list, plan) and len(plan) > 0 and isinstance(dict, plan[0])): + raise InvalidPlanFormatException() + + def process_cluster(self): + plan = json.load(open(args.plan_file_path)) + self.__check_plan_format(plan) + + topic_track = {} + + for partition_plan in plan: + if partition_plan['topic'] not in topic_track: + self.cluster.topics[partition_plan["topic"]].partitions = [] + topic_track[partition_plan['topic']] = True + + topic = self.cluster.topics[partition_plan["topic"]] + partition = Partition(topic, partition_plan["partition"]) + topic.partitions.append(partition) + + for replica in partition_plan["replicas"]: + partition.add_replica(self.cluster.brokers[replica]) diff --git a/kafka/tools/assigner/arguments.py b/kafka/tools/assigner/arguments.py index bb06b25..e00502a 100644 --- a/kafka/tools/assigner/arguments.py +++ b/kafka/tools/assigner/arguments.py @@ -15,6 +15,7 @@ # specific language governing permissions and limitations # under the License. +import os import argparse import pkg_resources @@ -35,6 +36,12 @@ def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, val_array) +def file_path_checker(path): + """Check is given file path exist or not""" + if not os.path.isfile(path): + raise FileNotFoundError("File path: {} not exists or its not file".format(path)) + return path + # action_map is a map of names to ActionModule children - the top level actions that can be called # sizer_map is a map of names to SizerModule children - the modules to get partition sizes def set_up_arguments(action_map, sizer_map, plugins): @@ -57,6 +64,8 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('--ple-wait', help="Time in seconds to wait between preferred leader elections", required=False, default=120, type=int) aparser.add_argument('--tools-path', help="Path to Kafka admin utilities, overriding PATH env var", required=False) aparser.add_argument('--output-json', help="Output JSON-formatted cluster information to stdout", default=False, action='store_true') + aparser.add_argument('--throttle', help="The movement of partitions between brokers will be throttled to this value (bytes/sec)", default=25000000, type=int) + aparser.add_argument('--save-plan-path', help="Save the generated plan at given path", required=False, type=str) # Call action module arg setup subparsers = aparser.add_subparsers(help='Select manipulation module to use') diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index 2c5cb15..a05d5f2 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -59,7 +59,8 @@ def _execute(self, num, total, zookeeper, tools_path): FNULL = open(os.devnull, 'w') proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', '--zookeeper', zookeeper, - '--reassignment-json-file', assignfile.name], + '--reassignment-json-file', assignfile.name, + '--throttle', str(throttle)], stdout=FNULL, stderr=FNULL) proc.wait() diff --git a/kafka/tools/exceptions.py b/kafka/tools/exceptions.py index cebd421..737853c 100644 --- a/kafka/tools/exceptions.py +++ b/kafka/tools/exceptions.py @@ -91,3 +91,6 @@ class OffsetError(ClientError): class ConfigurationError(ClientError): errstr = "There was an error configuring the client" + +class InvalidPlanFormatException(KafkaToolsException): + errstr = "There was an error while parsing plan for given file path. Expecting plan should be list of dict" From 7a4932b4ebacb37cc13f68c3057f9b568f2ce933 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Tue, 30 Nov 2021 17:27:34 +0530 Subject: [PATCH 2/6] Fix issues, added topic partition rack based balance as balance module, added test cases --- kafka/tools/assigner/__main__.py | 13 +- kafka/tools/assigner/actions/balance.py | 15 +- .../balancemodules/topic_partition_in_rack.py | 245 ++++++++++++++++++ kafka/tools/assigner/actions/execute_plan.py | 70 ++--- kafka/tools/assigner/arguments.py | 2 + kafka/tools/assigner/models/reassignment.py | 6 +- .../test_topic_partition_in_rack.py | 121 +++++++++ tests/tools/assigner/actions/fixtures.py | 145 ++++++++++- .../assigner/actions/test_execute_plan.py | 71 +++++ .../assigner/models/test_reassignment.py | 10 +- tests/tools/assigner/test_main.py | 6 +- 11 files changed, 655 insertions(+), 49 deletions(-) create mode 100644 kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py create mode 100644 tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py create mode 100644 tests/tools/assigner/actions/test_execute_plan.py diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index 21cb88c..6f98967 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -95,7 +95,7 @@ def is_dry_run(args): return False def save_plan(args, move_partitions): - if not args.existing_plan_path: + if not args.save_plan_path: return folder, filename = args.save_plan_path.rsplit("/", 1) @@ -115,6 +115,15 @@ def main(): # Set up and parse all CLI arguments args = set_up_arguments(action_map, sizer_map, plugins) + cluster = Cluster.create_from_zookeeper(args.zookeeper, getattr(args, 'default_retention', 1)) + + # if include_topics list is empty it means we have to include all topics. + # if include_topics is non-empty then except include topics all will be in exclude_topics list + if args.include_topics and len(args.include_topics) > 0: + for topic in cluster.topics.keys(): + if topic not in args.include_topics: + args.exclude_topics.append(topic) + run_plugins_at_step(plugins, 'set_arguments', args) tools_path = get_tools_path(args.tools_path) @@ -146,7 +155,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, args.throttle, plugins, dry_run) run_plugins_at_step(plugins, 'before_ple') diff --git a/kafka/tools/assigner/actions/balance.py b/kafka/tools/assigner/actions/balance.py index d57afc9..a16d70c 100644 --- a/kafka/tools/assigner/actions/balance.py +++ b/kafka/tools/assigner/actions/balance.py @@ -27,7 +27,20 @@ class ActionBalance(ActionModule): needs_sizes = True def __init__(self, args, cluster): - if "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware": + + + if "topic_partition_in_rack" in args.types: + if args.types[len(args.types)-1] != "topic_partition_in_rack": + raise BalanceException( + "In order to work properly, topic_partition_in_rack must always be the last module specified" + ) + + elif "rackaware" in args.types and args.types[len(args.types)-2] != "rackaware": + raise BalanceException( + "In order to work properly, if you specify topic_partition_in_rack and " + + "rackaware both module then rackaware must always be in 2nd last & topic_partition_in_rack always be last") + + elif "rackaware" in args.types and args.types[len(args.types)-1] != "rackaware": raise BalanceException("In order to work properly, rackaware must always be the last module specified") super(ActionBalance, self).__init__(args, cluster) diff --git a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py new file mode 100644 index 0000000..9275b7f --- /dev/null +++ b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py @@ -0,0 +1,245 @@ +import math +from kafka.tools import log +from prettytable import PrettyTable +from kafka.tools.exceptions import BalanceException +from kafka.tools.assigner.actions import ActionBalanceModule + + +class ActionBalanceTopicPartitionInRack(ActionBalanceModule): + name = "topic_partition_in_rack" + helpstr = "Reassign topic partition replicas to assure they are almost equal distributed among brokers in a racks" + + def __init__(self, args, cluster): + super(ActionBalanceTopicPartitionInRack, self).__init__(args, cluster) + + def __calculate_max_possible_count(self, broker_count): + total_count = sum(map(lambda x: x[1], broker_count)) + count = total_count * 1.0 / len(broker_count) + + if (total_count % len(broker_count)) == 0: + count = int(count) + else: + count = int(math.floor(count)) + 1 + return count + + def _unskew_topic_partition_in_rack(self, topic, partitions): + """ + For given partitions of a topic as params applying following algorithm + + 1. Grouping partitions based on rack id (getting rack id from broker). + 2. In a rack, sorting (reverse) brokers based on number of partitions present. + 3. Balancing partitions on brokers by moving partitions from left to right + """ + + # Creating a map of rack id, broker and count of partition on broker + rack_broker_count, broker_partition = {}, {} + for p in partitions: + for broker in p.replicas: + if broker.rack not in rack_broker_count: + rack_broker_count[broker.rack] = {} + + if broker.id not in rack_broker_count[broker.rack]: + rack_broker_count[broker.rack][broker.id] = 0 + + if broker.id not in broker_partition: + broker_partition[broker.id] = set() + + broker_partition[broker.id].add(p) + rack_broker_count[broker.rack][broker.id] += 1 + # For a rack, balancing partitions + for rack_id in rack_broker_count: + broker_count = sorted( + map( + lambda x: list(x), + rack_broker_count[rack_id].items() + ), + key=lambda x: x[1], + reverse=True + ) + + # calculating max possible number of partitions in a broker + count = self.__calculate_max_possible_count(broker_count) + + # On sorted broker_count list, + # balancing number of partitions from left to right + left, right, adjusted_count = 0, len(broker_count) - 1, 0 + while left < right: + bleft = broker_count[left] + bid, bcount = bleft[0], bleft[1] + adjusted_count = bcount - count + if adjusted_count <= 0: + left += 1 + continue + + for p in broker_partition[bid]: + bright = broker_count[right] + while right > 0 and bright[1] >= count: + right -= 1 + + if left >= right or adjusted_count == 0: + break + + replicas = [b.id for b in p.replicas] + if bright[0] not in replicas: + index = replicas.index(bid) + p.swap_replicas(p.replicas[index], self.cluster.brokers[bright[0]]) + adjusted_count -= 1 + bleft[1] -= 1 + bright[1] += 1 + left += 1 + + + def _unskew_topic_partition_leader_in_rack(self, topic, partitions): + + # Creating a map of rack id, broker and count of partition leader on broker + rack_broker_leader_count_map, broker_partition = {}, {} + for p in partitions: + for broker in p.replicas: + if broker.rack not in rack_broker_leader_count_map: + rack_broker_leader_count_map[broker.rack] = {} + + if broker.id not in rack_broker_leader_count_map[broker.rack]: + rack_broker_leader_count_map[broker.rack][broker.id] = 0 + + if p.replicas.index(broker) == 0: + rack_broker_leader_count_map[broker.rack][broker.id] += 1 + + if broker.id not in broker_partition: + broker_partition[broker.id] = set() + + broker_partition[broker.id].add(p) + + # For a rack, balancing partitions leader + for rack_id in rack_broker_leader_count_map: + broker_count = sorted( + map( + lambda x: list(x), + rack_broker_leader_count_map[rack_id].items() + ), + key=lambda x:x[1], + reverse=True + ) + + count = self.__calculate_max_possible_count(broker_count) + left, right, adjusted_count = 0, len(broker_count) - 1, 0 + while left < right: + bleft = broker_count[left] + bid, bcount = bleft[0], bleft[1] + adjusted_count = bcount - count + if adjusted_count <= 0: + left += 1 + continue + + for p in broker_partition[bid]: + bright = broker_count[right] + while right > 0 and bright[1] >= count: + right -= 1 + + if left >= right or adjusted_count == 0: + break + + replicas = [b.id for b in p.replicas] + if not(bright[0] not in replicas and replicas.index(bid) == 0): + continue + + # Swaping leader from left side broker and non leader from right side broker + p.swap_replicas(p.replicas[0], self.cluster.brokers[bright[0]]) + adjusted_count -= 1 + bleft[1] -= 1 + bright[1] += 1 + + for p2 in broker_partition[bright[0]]: + replicas = [b.id for b in p2.replicas] + if replicas.index(bright[0]) != 0 and bid not in replicas: + index2 = replicas.index(bright[0]) + p2.swap_replicas(p2.replicas[index2], self.cluster.brokers[bid]) + break + left += 1 + + def process_cluster(self): + log.info("Starting {} module".format(self.name)) + + # Check if rack information is set for the cluster + for broker in self.cluster.brokers.values(): + if broker.rack: + continue + raise BalanceException("Cannot balance cluster by rack as it has no rack information") + + for topic in self.cluster.topics: + if topic in self.args.exclude_topics: + log.debug("Skipping topic {0} as it is explicitly excluded".format(topic)) + continue + + before_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + self._unskew_topic_partition_in_rack( + topic, self.cluster.topics[topic].partitions) + + after_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + self._unskew_topic_partition_leader_in_rack( + topic, self.cluster.topics[topic].partitions) + + after_leader_rearrange = self.__create_count_map( + self.cluster.topics[topic].partitions + ) + + self.__stats( + topic, + before_rearrange, + after_rearrange, + after_leader_rearrange + ) + + def __stats(self, topic, before, after, after_leader): + table = PrettyTable() + table.field_names = [ + "broker_id", "rack_id", "before(leader + follower = total)", + "after(leader + follower = total)", "after_leader(leader + follower = total)" + ] + + fmt = "{} + {} = {}" + + for _id in before: + table.add_row([ + _id, self.cluster.brokers[_id].rack, + fmt.format( + before[_id]["leader"], before[_id]["follower"], + before[_id]["leader"] + before[_id]["follower"] + ), + fmt.format( + after[_id]["leader"], after[_id]["follower"], + after[_id]["leader"] + after[_id]["follower"] + ), + fmt.format( + after_leader[_id]["leader"], after_leader[_id]["follower"], + after_leader[_id]["leader"] + after_leader[_id]["follower"] + ) + ]) + log.info("\n" + table.get_string()) + + + def __create_count_map(self, partitions): + """Create broker, leader & follower count dict.""" + count_map = {} + for p in partitions: + for i in range(len(p.replicas)): + broker_id = p.replicas[i].id + if broker_id not in count_map: + count_map[broker_id] = {"leader": 0, "follower": 0} + if i == 0: + count_map[broker_id]["leader"] += 1 + else: + count_map[broker_id]["follower"] += 1 + return dict(sorted(count_map.items(), key= lambda x: x[0])) + + + + + +# broker_id__rack_id=3__2 old(leader=4 follower=1) rplan(leader=4 follower=3) new(leader=4 follower=3) + + +# diff --git a/kafka/tools/assigner/actions/execute_plan.py b/kafka/tools/assigner/actions/execute_plan.py index 33b2acb..d1b88c4 100644 --- a/kafka/tools/assigner/actions/execute_plan.py +++ b/kafka/tools/assigner/actions/execute_plan.py @@ -1,48 +1,48 @@ import json from kafka.tools.assigner.actions import ActionModule -from kafka.tools.assigner import file_path_checker +from kafka.tools.assigner.arguments import file_path_checker from kafka.tools.models.partition import Partition from kafka.tools.exceptions import InvalidPlanFormatException class ActionExecutePlan(ActionModule): - name = "execute_plan" - helpstr = "Execute plan from given path" - needs_sizes = False - - def __init__(self, args, cluster): - super(ActionExecutePlan, self).__init__(args, cluster) - - @classmethod - def _add_args(cls, parser): - parser.add_argument( - '--plan-file-path', - required=True, - help="File path where kafka re-assign plan is stored in json format", - type=file_path_checker - ) - - def __check_plan_format(self, plan): - """ - Expecting plan will of type list of dict - """ - if not (isinstance(list, plan) and len(plan) > 0 and isinstance(dict, plan[0])): - raise InvalidPlanFormatException() + name = "execute_plan" + helpstr = "Execute plan from given path" + needs_sizes = False + + def __init__(self, args, cluster): + super(ActionExecutePlan, self).__init__(args, cluster) + + @classmethod + def _add_args(cls, parser): + parser.add_argument( + '--plan-file-path', + required=True, + help="File path where kafka re-assign plan is stored in json format", + type=file_path_checker + ) + + def __check_plan_format(self, plan): + """ + Expecting plan will of type list of dict + """ + if not (isinstance(plan, list) and len(plan) > 0 and isinstance(plan[0], dict)): + raise InvalidPlanFormatException() def process_cluster(self): - plan = json.load(open(args.plan_file_path)) - self.__check_plan_format(plan) + plan = json.load(open(self.args.plan_file_path)) + self.__check_plan_format(plan) - topic_track = {} + topic_track = {} - for partition_plan in plan: - if partition_plan['topic'] not in topic_track: - self.cluster.topics[partition_plan["topic"]].partitions = [] - topic_track[partition_plan['topic']] = True + for partition_plan in plan: + if partition_plan['topic'] not in topic_track: + self.cluster.topics[partition_plan["topic"]].partitions = [] + topic_track[partition_plan['topic']] = True - topic = self.cluster.topics[partition_plan["topic"]] - partition = Partition(topic, partition_plan["partition"]) - topic.partitions.append(partition) + topic = self.cluster.topics[partition_plan["topic"]] + partition = Partition(topic, partition_plan["partition"]) + topic.partitions.append(partition) - for replica in partition_plan["replicas"]: - partition.add_replica(self.cluster.brokers[replica]) + for replica in partition_plan["replicas"]: + partition.add_replica(self.cluster.brokers[replica]) diff --git a/kafka/tools/assigner/arguments.py b/kafka/tools/assigner/arguments.py index e00502a..e3e780b 100644 --- a/kafka/tools/assigner/arguments.py +++ b/kafka/tools/assigner/arguments.py @@ -55,6 +55,8 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('-e', '--execute', help="Execute partition reassignment", action='store_true') aparser.add_argument('-m', '--moves', help="Max number of moves per step", required=False, default=10, type=int) aparser.add_argument('-x', '--exclude-topics', help="Comma-separated list of topics to skip when performing actions", action=CSVAction, default=[]) + aparser.add_argument('-i', '--include-topics', help="Comma-separated list of topics to include when performing actions. Default will be empty list means include all topics", + action=CSVAction, default=[]) aparser.add_argument('--sizer', help="Select module to use to get partition sizes", required=False, default='ssh', choices=sizer_map.keys()) aparser.add_argument('-p', '--property', help="Property of the form 'key=value' to be passed to modules (i.e. sizer)", required=False, default=[], action='append') diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index a05d5f2..1699736 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -44,15 +44,15 @@ def dict_for_reassignment(self): reassignment['partitions'].append(partition.dict_for_reassignment()) return reassignment - def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True): + def execute(self, num, total, zookeeper, tools_path, throttle, plugins=[], dry_run=True): for plugin in plugins: plugin.before_execute_batch(num) if not dry_run: - self._execute(num, total, zookeeper, tools_path) + self._execute(num, total, zookeeper, tools_path, throttle) for plugin in plugins: plugin.after_execute_batch(num) - def _execute(self, num, total, zookeeper, tools_path): + def _execute(self, num, total, zookeeper, tools_path, throttle): with NamedTemporaryFile(mode='w') as assignfile: json.dump(self.dict_for_reassignment(), assignfile) assignfile.flush() diff --git a/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py b/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py new file mode 100644 index 0000000..e18b6b5 --- /dev/null +++ b/tests/tools/assigner/actions/balancemodules/test_topic_partition_in_rack.py @@ -0,0 +1,121 @@ +import sys +import unittest +from argparse import Namespace +from ..fixtures import set_up_subparser, set_up_cluster_9broker +from kafka.tools.assigner.actions.balance import ActionBalance +from kafka.tools.assigner.actions.balancemodules.topic_partition_in_rack import ActionBalanceTopicPartitionInRack + + +class ActionBalanceTopicPartitionInRackTests(unittest.TestCase): + def setUp(self): + self.cluster = set_up_cluster_9broker() + self.parser, self.subparser = set_up_subparser() + self.args = Namespace(exclude_topics=[]) + + def test_configure_args(self): + ActionBalance.configure_args(self.subparser) + sys.argv = ['kafka-assigner', 'balance', '-t', 'topic_partition_in_rack'] + parsed_args = self.parser.parse_args() + assert parsed_args.action == 'balance' + + def test_create_class(self): + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + assert isinstance(action, ActionBalanceTopicPartitionInRack) + + def test_process_cluster_partition_unskew_leader_skew(self): + self.args.exclude_topics = ["topic2"] + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + + + broker_partitions_before_and_after_in_topic1 = { + 1: [3,2], + 2: [2,2], + 3: [1,2] + } + + action.process_cluster() + broker_partition_count = {} + for p in self.cluster.topics["topic1"].partitions: + for i, b in enumerate(p.replicas): + if b.id not in broker_partition_count: + broker_partition_count[b.id] = 0 + broker_partition_count[b.id] += 1 + + for i in broker_partitions_before_and_after_in_topic1: + assert (broker_partitions_before_and_after_in_topic1[i][1] ==\ + broker_partition_count[i]) + + def test_process_cluster_partition_unskew_leader_unskew(self): + self.args.exclude_topics = ["topic1"] + action = ActionBalanceTopicPartitionInRack(self.args, self.cluster) + """ + BEFORE + {'partitions': + { + 0: {'size': 0, 'replicas': [7, 8, 1]}, + 1: {'size': 0, 'replicas': [7, 3, 6]}, + 2: {'size': 0, 'replicas': [7, 9, 5]}, + 3: {'size': 0, 'replicas': [5, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + + AFTER RE-ARRANGE + {'partitions': + { + 0: {'size': 0, 'replicas': [9, 8, 2]}, + 1: {'size': 0, 'replicas': [7, 3, 6]}, + 2: {'size': 0, 'replicas': [7, 9, 6]}, + 3: {'size': 0, 'replicas': [5, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + + AFTER LEADER RE-ARRANGE + {'partitions': + { + 0: {'size': 0, 'replicas': [9, 7, 2]}, + 1: {'size': 0, 'replicas': [8, 3, 5]}, + 2: {'size': 0, 'replicas': [7, 9, 6]}, + 3: {'size': 0, 'replicas': [6, 2, 1]}, + 4: {'size': 0, 'replicas': [5, 1, 8]}} + } + """ + + # input & expected output for topic: topic2 + broker_partitions_before_and_after_in_topic2 = { + 1: [3,2], + 5: [3,2], + 6: [1,2], + 7: [3,2], + 8: [2,2], + 9: [1,2] + } + + broker_partitions_leader_before_and_after_in_topic2 = { + 1: [0,0], + 5: [2,1], + 6: [0,1], + 7: [3,1], + 8: [0,1], + 9: [0,1] + } + + action.process_cluster() + + broker_partition_count_leader_count = {} + for p in self.cluster.topics["topic2"].partitions: + for i, b in enumerate(p.replicas): + if b.id not in broker_partition_count_leader_count: + broker_partition_count_leader_count[b.id] = [0, 0] + + if i == 0: + broker_partition_count_leader_count[b.id][1] += 1 + broker_partition_count_leader_count[b.id][0] += 1 + + for i in broker_partitions_before_and_after_in_topic2: + assert (broker_partitions_leader_before_and_after_in_topic2[i][1] ==\ + broker_partition_count_leader_count[i][1]) + assert (broker_partitions_before_and_after_in_topic2[i][1] ==\ + broker_partition_count_leader_count[i][0]) + + def tearDown(self): + pass diff --git a/tests/tools/assigner/actions/fixtures.py b/tests/tools/assigner/actions/fixtures.py index b8327c0..f8a262b 100644 --- a/tests/tools/assigner/actions/fixtures.py +++ b/tests/tools/assigner/actions/fixtures.py @@ -1,7 +1,7 @@ import argparse - +import json from kafka.tools.models.broker import Broker -from kafka.tools.models.cluster import Cluster +from kafka.tools.models.cluster import Cluster, add_topic_with_replicas from kafka.tools.models.topic import Topic @@ -85,3 +85,144 @@ def set_up_subparser(): aparser = argparse.ArgumentParser(prog='kafka-assigner', description='Rejigger Kafka cluster partitions') subparsers = aparser.add_subparsers(help='Select manipulation module to use') return (aparser, subparsers) + + +def set_up_cluster_9broker(): + cluster_dict = { + "brokers": [ + { + "1": { + "id": 1, + "rack": "1", + "jmx_port": 9999, + "host": "localhost1", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "2": { + "id": 2, + "rack": "1", + "jmx_port": 9999, + "host": "localhost2", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "3": { + "id": 3, + "rack": "1", + "jmx_port": 9999, + "host": "localhost3", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + + }, + { + "4": { + "id": 4, + "rack": "2", + "jmx_port": 9999, + "host": "localhost4", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "5": { + "id": 5, + "rack": "2", + "jmx_port": 9999, + "host": "localhost5", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "6": { + "id": 6, + "rack": "2", + "jmx_port": 9999, + "host": "localhost6", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "7": { + "id": 7, + "rack": "3", + "jmx_port": 9999, + "host": "localhost7", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + + }, + { + "8": { + "id": 8, + "rack": "3", + "jmx_port": 9999, + "host": "localhost8", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + }, + { + "9": { + "id": 9, + "rack": "3", + "jmx_port": 9999, + "host": "localhost9", + "timestamp": "1631715232017", + "port": 9092, + "version": 4 + } + } + ], + "topics": { + "topic1": { + "partitions": { + "0": [1, 2, 4], + "1": [1, 5, 3], + "2": [2, 6, 8], + "3": [1, 9, 7] + } + }, + "topic2": { + "partitions": { + "0": [7, 8, 1], + "1": [7, 3, 6], + "2": [7, 9, 5], + "3": [5, 2, 1], + "4": [5, 1, 8] + } + } + } + } + + + cluster = Cluster(retention=3) + for broker in cluster_dict["brokers"]: + cluster.add_broker( + Broker.create_from_json( + int(list(broker.keys())[0]), + json.dumps(list(broker.values())[0]) + ) + ) + + for topic, topic_data in cluster_dict["topics"].items(): + add_topic_with_replicas(cluster, topic, topic_data) + return cluster diff --git a/tests/tools/assigner/actions/test_execute_plan.py b/tests/tools/assigner/actions/test_execute_plan.py new file mode 100644 index 0000000..07fcec0 --- /dev/null +++ b/tests/tools/assigner/actions/test_execute_plan.py @@ -0,0 +1,71 @@ +import sys +import json +import unittest +from mock import patch +from argparse import Namespace +from tempfile import NamedTemporaryFile +from .fixtures import set_up_cluster_9broker, set_up_subparser +from kafka.tools.assigner.actions.execute_plan import ActionExecutePlan +from kafka.tools.exceptions import InvalidPlanFormatException + + +class ActionExecutePlanTests(unittest.TestCase): + def setUp(self): + self.cluster = set_up_cluster_9broker() + self.parser, self.subparser = set_up_subparser() + self.args = Namespace() + self.wrong_plan_file = NamedTemporaryFile(mode="w") + self.correct_plan_file = NamedTemporaryFile(mode="w") + + self.__add_wrong_plan() + self.__add_correct_plan() + + def __add_wrong_plan(self): + self.wrong_plan_file.write( + json.dumps({"topic": "topic1", "partition": 0, "replicas": [1,2,8]}) + ) + self.wrong_plan_file.flush() + + def __add_correct_plan(self): + self.correct_plan_file.write( + json.dumps( + [ + {"topic": "topic1", "partition": 0,"replicas": [1,2,8]}, + {"topic": "topic1", "partition": 1, "replicas": [2,7,8]} + ] + ) + ) + self.correct_plan_file.flush() + + def test_create_class(self): + action = ActionExecutePlan(self.args, self.cluster) + assert isinstance(action, ActionExecutePlan) + + @patch("os.path.isfile") + def test_configure_args(self, mock_isfile): + ActionExecutePlan.configure_args(self.subparser) + sys.argv = ["kafka-assigner", "execute_plan", "--plan-file-path", "/tmp/path"] + args = self.parser.parse_args() + mock_isfile.assert_called_once_with("/tmp/path") + assert args.action == "execute_plan" + + def test_wrong_plan_format(self): + self.args.plan_file_path = self.wrong_plan_file.name + action = ActionExecutePlan(self.args, self.cluster) + self.assertRaises(InvalidPlanFormatException, action.process_cluster) + + def test_correct_plan_format(self): + self.args.plan_file_path = self.correct_plan_file.name + action = ActionExecutePlan(self.args, self.cluster) + action.process_cluster() + partition = list(filter(lambda x: int(x.num) == 0, self.cluster.topics["topic1"].partitions)) + assert len(partition) == 1 + self.assertListEqual( + [b.id for b in partition[0].replicas], + [1,2,8] + ) + + def tearDown(self): + self.wrong_plan_file.close() + self.correct_plan_file.close() + diff --git a/tests/tools/assigner/models/test_reassignment.py b/tests/tools/assigner/models/test_reassignment.py index e6491d1..6923c89 100644 --- a/tests/tools/assigner/models/test_reassignment.py +++ b/tests/tools/assigner/models/test_reassignment.py @@ -41,12 +41,12 @@ def test_reassignment_repr(self): @patch.object(Reassignment, '_execute') def test_reassignment_execute_real(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False) - mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 25000000, plugins=[self.null_plugin], dry_run=False) + mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', 25000000) @patch.object(Reassignment, '_execute') def test_reassignment_execute_dryrun(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=True) + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 25000000, plugins=[self.null_plugin], dry_run=True) mock_exec.assert_not_called() @patch('kafka.tools.assigner.models.reassignment.subprocess.Popen', new_callable=MockPopen) @@ -55,9 +55,9 @@ def test_reassignment_internal_execute(self, mock_check, mock_popen): mock_popen.set_default() mock_check.side_effect = [10, 5, 0] - self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools', 25000000) - compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY], + compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY, '--throttle', ANY], stderr=ANY, stdout=ANY), call.Popen_instance.wait()], mock_popen.mock.method_calls) assert len(mock_check.mock_calls) == 3 diff --git a/tests/tools/assigner/test_main.py b/tests/tools/assigner/test_main.py index 665a6c6..a66b73a 100644 --- a/tests/tools/assigner/test_main.py +++ b/tests/tools/assigner/test_main.py @@ -74,6 +74,8 @@ def test_main(self, mock_plugins, mock_sizes): moves=10, execute=False, exclude_topics=[], + include_topics=[], + existing_plan_path="/tmp/qwerty", generate=False, size=False, skip_ple=False, @@ -81,6 +83,8 @@ def test_main(self, mock_plugins, mock_sizes): ple_wait=120, sizer='ssh', leadership=True, + save_plan_path="/tmp/save_plan_path", + throttle=2500000, output_json=True) assert main() == 0 @@ -116,7 +120,7 @@ def test_get_sizes(self, mock_sizes): @patch.object(ReplicaElection, 'execute') def test_ple(self, mock_execute, mock_sleep): cluster = set_up_cluster() - args = argparse.Namespace(ple_wait=0, zookeeper='zkconnect', tools_path='/path/to/tools') + args = argparse.Namespace(ple_wait=0, zookeeper='zkconnect', tools_path='/path/to/tools', throttle=2500000) batches = [ReplicaElection(cluster.brokers[1].partitions, args.ple_wait), ReplicaElection(cluster.brokers[2].partitions, args.ple_wait)] run_preferred_replica_elections(batches, args, args.tools_path, [], False) From 3d940779a46bad6b32e004e167d2dc3324aed90c Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Wed, 1 Dec 2021 03:24:30 +0530 Subject: [PATCH 3/6] Fix issues --- .../balancemodules/topic_partition_in_rack.py | 23 ++++++------------- kafka/tools/assigner/actions/execute_plan.py | 20 ++++++++++++---- setup.py | 6 +++-- 3 files changed, 27 insertions(+), 22 deletions(-) diff --git a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py index 9275b7f..aec6a53 100644 --- a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py +++ b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py @@ -73,14 +73,14 @@ def _unskew_topic_partition_in_rack(self, topic, partitions): for p in broker_partition[bid]: bright = broker_count[right] - while right > 0 and bright[1] >= count: + if bright[1] >= count: right -= 1 if left >= right or adjusted_count == 0: break replicas = [b.id for b in p.replicas] - if bright[0] not in replicas: + if bid in replicas and bright[0] not in replicas: index = replicas.index(bid) p.swap_replicas(p.replicas[index], self.cluster.brokers[bright[0]]) adjusted_count -= 1 @@ -132,14 +132,14 @@ def _unskew_topic_partition_leader_in_rack(self, topic, partitions): for p in broker_partition[bid]: bright = broker_count[right] - while right > 0 and bright[1] >= count: + if bright[1] >= count: right -= 1 if left >= right or adjusted_count == 0: break replicas = [b.id for b in p.replicas] - if not(bright[0] not in replicas and replicas.index(bid) == 0): + if not(bid in replicas and bright[0] not in replicas and replicas.index(bid) == 0): continue # Swaping leader from left side broker and non leader from right side broker @@ -150,7 +150,7 @@ def _unskew_topic_partition_leader_in_rack(self, topic, partitions): for p2 in broker_partition[bright[0]]: replicas = [b.id for b in p2.replicas] - if replicas.index(bright[0]) != 0 and bid not in replicas: + if bright[0] in replicas and replicas.index(bright[0]) != 0 and bid not in replicas: index2 = replicas.index(bright[0]) p2.swap_replicas(p2.replicas[index2], self.cluster.brokers[bid]) break @@ -218,7 +218,7 @@ def __stats(self, topic, before, after, after_leader): after_leader[_id]["leader"] + after_leader[_id]["follower"] ) ]) - log.info("\n" + table.get_string()) + log.info("\n" + table.get_string(sortby="rack_id")) def __create_count_map(self, partitions): @@ -233,13 +233,4 @@ def __create_count_map(self, partitions): count_map[broker_id]["leader"] += 1 else: count_map[broker_id]["follower"] += 1 - return dict(sorted(count_map.items(), key= lambda x: x[0])) - - - - - -# broker_id__rack_id=3__2 old(leader=4 follower=1) rplan(leader=4 follower=3) new(leader=4 follower=3) - - -# + return count_map diff --git a/kafka/tools/assigner/actions/execute_plan.py b/kafka/tools/assigner/actions/execute_plan.py index d1b88c4..3887ef7 100644 --- a/kafka/tools/assigner/actions/execute_plan.py +++ b/kafka/tools/assigner/actions/execute_plan.py @@ -2,7 +2,7 @@ from kafka.tools.assigner.actions import ActionModule from kafka.tools.assigner.arguments import file_path_checker from kafka.tools.models.partition import Partition -from kafka.tools.exceptions import InvalidPlanFormatException +from kafka.tools.exceptions import InvalidPlanFormatException, UnknownBrokerException class ActionExecutePlan(ActionModule): @@ -23,16 +23,28 @@ def _add_args(cls, parser): ) def __check_plan_format(self, plan): - """ - Expecting plan will of type list of dict - """ + """Expecting plan will of type list of dict""" if not (isinstance(plan, list) and len(plan) > 0 and isinstance(plan[0], dict)): raise InvalidPlanFormatException() + # Check all broker id in plan must be present in cluster + brokers_in_plan, brokers_in_cluster = set(), set(self.cluster.brokers.keys()) + + for partition_plan in plan: + brokers_in_plan.update(partition_plan["replicas"]) + + missing = list(brokers_in_plan - brokers_in_cluster) + if missing: + raise UnknownBrokerException( + "Broker ids = {} in plan not present in cluster".format(str(missing)) + ) + def process_cluster(self): plan = json.load(open(self.args.plan_file_path)) self.__check_plan_format(plan) + # check broker exist in cluster or not in plan + topic_track = {} for partition_plan in plan: diff --git a/setup.py b/setup.py index 299c3ce..2dffd86 100755 --- a/setup.py +++ b/setup.py @@ -16,6 +16,7 @@ 'JPype1', 'kazoo', 'pex', + 'prettytable' ] @@ -100,12 +101,12 @@ def run(self): long_description = f.read() setup( - name='kafka-tools', + name='kafka-tools-fk', # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see # https://packaging.python.org/en/latest/single_source_version.html - version='0.1.9', + version='0.1.10', author='Todd Palino', author_email='tpalino@linkedin.com', @@ -141,6 +142,7 @@ def run(self): 'pytest', 'testfixtures', 'timeout-decorator', + 'prettytable' ], setup_requires=[ 'pytest-runner', From 20288d20591f41d80b9bcb3c8e115a392452b9b9 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Wed, 1 Dec 2021 23:19:56 +0530 Subject: [PATCH 4/6] replace math.floor and other calculation with math.ceil --- .../actions/balancemodules/topic_partition_in_rack.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py index aec6a53..e88faad 100644 --- a/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py +++ b/kafka/tools/assigner/actions/balancemodules/topic_partition_in_rack.py @@ -14,13 +14,7 @@ def __init__(self, args, cluster): def __calculate_max_possible_count(self, broker_count): total_count = sum(map(lambda x: x[1], broker_count)) - count = total_count * 1.0 / len(broker_count) - - if (total_count % len(broker_count)) == 0: - count = int(count) - else: - count = int(math.floor(count)) + 1 - return count + return int(math.ceil(total_count * 1.0 / len(broker_count))) def _unskew_topic_partition_in_rack(self, topic, partitions): """ From 3a7f1e9670b421255e905a6d59905343735634fe Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Tue, 7 Dec 2021 11:05:21 +0530 Subject: [PATCH 5/6] reverted package name --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 2dffd86..62b931f 100755 --- a/setup.py +++ b/setup.py @@ -101,7 +101,7 @@ def run(self): long_description = f.read() setup( - name='kafka-tools-fk', + name='kafka-tools', # Versions should comply with PEP440. For a discussion on single-sourcing # the version across setup.py and the project code, see From 1f51a9be28f9132bb6eacdad15ae9b3faca51709 Mon Sep 17 00:00:00 2001 From: Md Mehrab Alam Date: Thu, 9 Dec 2021 14:12:17 +0530 Subject: [PATCH 6/6] added throttle limit file also --- kafka/tools/assigner/__main__.py | 12 +++++++++++- kafka/tools/assigner/arguments.py | 6 +++++- tests/tools/assigner/test_main.py | 15 ++++++++++++++- 3 files changed, 30 insertions(+), 3 deletions(-) diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index 6f98967..6beec8f 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -107,6 +107,16 @@ def save_plan(args, move_partitions): else: raise FileExistsError("Given file path={} is a directory not File".format(args.save_plan_path)) +def get_throttle_limit(args): + throttle = args.throttle + if args.throttle_limit_file_path: + with open(args.throttle_limit_file_path) as f: + try: + throttle = int(f.read()) + except Exception as e: + log.error("Error while getting throttle limit from file {}".format(e)) + return throttle + def main(): # Start by loading all the modules action_map = get_module_map(kafka.tools.assigner.actions, kafka.tools.assigner.actions.ActionModule) @@ -155,7 +165,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, args.throttle, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, get_throttle_limit(args), plugins, dry_run) run_plugins_at_step(plugins, 'before_ple') diff --git a/kafka/tools/assigner/arguments.py b/kafka/tools/assigner/arguments.py index e3e780b..ccc7242 100644 --- a/kafka/tools/assigner/arguments.py +++ b/kafka/tools/assigner/arguments.py @@ -39,7 +39,7 @@ def __call__(self, parser, namespace, values, option_string=None): def file_path_checker(path): """Check is given file path exist or not""" if not os.path.isfile(path): - raise FileNotFoundError("File path: {} not exists or its not file".format(path)) + raise IOError("File path: {} not exists or its not file".format(path)) return path # action_map is a map of names to ActionModule children - the top level actions that can be called @@ -67,6 +67,10 @@ def set_up_arguments(action_map, sizer_map, plugins): aparser.add_argument('--tools-path', help="Path to Kafka admin utilities, overriding PATH env var", required=False) aparser.add_argument('--output-json', help="Output JSON-formatted cluster information to stdout", default=False, action='store_true') aparser.add_argument('--throttle', help="The movement of partitions between brokers will be throttled to this value (bytes/sec)", default=25000000, type=int) + aparser.add_argument('--throttle-limit-file-path', + help="Similar to throttle argument but here taking input from file. If throttle & throttle-limit-file-path is given then throttle-limit-file-path will take precedence", + required=False, type=file_path_checker + ) aparser.add_argument('--save-plan-path', help="Save the generated plan at given path", required=False, type=str) # Call action module arg setup diff --git a/tests/tools/assigner/test_main.py b/tests/tools/assigner/test_main.py index a66b73a..b785176 100644 --- a/tests/tools/assigner/test_main.py +++ b/tests/tools/assigner/test_main.py @@ -3,7 +3,7 @@ from mock import call, patch -from kafka.tools.assigner.__main__ import main, get_plugins_list, check_and_get_sizes, run_preferred_replica_elections, run_plugins_at_step, is_dry_run +from kafka.tools.assigner.__main__ import main, get_plugins_list, check_and_get_sizes, run_preferred_replica_elections, run_plugins_at_step, is_dry_run, get_throttle_limit from kafka.tools.exceptions import ProgrammingException from kafka.tools.assigner.actions.balance import ActionBalance from kafka.tools.models.broker import Broker @@ -12,6 +12,7 @@ from kafka.tools.assigner.models.replica_election import ReplicaElection from kafka.tools.assigner.plugins import PluginModule from kafka.tools.assigner.sizers.ssh import SizerSSH +from tempfile import NamedTemporaryFile def set_up_cluster(): @@ -85,6 +86,7 @@ def test_main(self, mock_plugins, mock_sizes): leadership=True, save_plan_path="/tmp/save_plan_path", throttle=2500000, + throttle_limit_file_path=None, output_json=True) assert main() == 0 @@ -128,3 +130,14 @@ def test_ple(self, mock_execute, mock_sleep): mock_sleep.assert_called_once_with(0) mock_execute.assert_has_calls([call(1, 2, 'zkconnect', '/path/to/tools', [], False), call(2, 2, 'zkconnect', '/path/to/tools', [], False)]) + + def test_throttle_limit(self): + args = argparse.Namespace(throttle_limit_file_path=None, throttle=30000) + assert get_throttle_limit(args) == 30000 + + with NamedTemporaryFile(mode='w') as f: + f.write("25000") + f.flush() + + args = argparse.Namespace(throttle_limit_file_path=f.name, throttle=30000) + assert get_throttle_limit(args) == 25000