From 717d04cf99d2394025a86ecda22f99720fb1570f Mon Sep 17 00:00:00 2001 From: Fred Yang Date: Fri, 18 Oct 2019 15:37:21 -0400 Subject: [PATCH] Add throttle feature into reassignment execution --- kafka/tools/assigner/__main__.py | 2 +- kafka/tools/assigner/actions/balance.py | 1 + kafka/tools/assigner/models/reassignment.py | 12 +++++++----- tests/tools/assigner/models/test_reassignment.py | 10 +++++----- tests/tools/assigner/test_main.py | 1 + 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/kafka/tools/assigner/__main__.py b/kafka/tools/assigner/__main__.py index cb84419..1430e9e 100644 --- a/kafka/tools/assigner/__main__.py +++ b/kafka/tools/assigner/__main__.py @@ -133,7 +133,7 @@ def main(): for i, batch in enumerate(batches): log.info("Executing partition reassignment {0}/{1}: {2}".format(i + 1, len(batches), repr(batch))) - batch.execute(i + 1, len(batches), args.zookeeper, tools_path, plugins, dry_run) + batch.execute(i + 1, len(batches), args.zookeeper, tools_path, args.throttle, plugins, dry_run) run_plugins_at_step(plugins, 'before_ple') diff --git a/kafka/tools/assigner/actions/balance.py b/kafka/tools/assigner/actions/balance.py index d57afc9..a7598f9 100644 --- a/kafka/tools/assigner/actions/balance.py +++ b/kafka/tools/assigner/actions/balance.py @@ -46,6 +46,7 @@ def _add_args(cls, parser): parser.add_argument('-t', '--types', help="Balance types to perform. Multiple may be specified and they will be run in order", required=True, choices=[klass.name for klass in balance_actions], nargs='*') parser.add_argument('--default-retention', help="Default cluster retention, in ms", required=False, type=int, default=345600000) + parser.add_argument('--throttle', help="Bytes/s to be throttled at during partition rebalance", required=False, type=int, default=-1) def process_cluster(self): for bmodule in self.modules: diff --git a/kafka/tools/assigner/models/reassignment.py b/kafka/tools/assigner/models/reassignment.py index 2c5cb15..419c48a 100644 --- a/kafka/tools/assigner/models/reassignment.py +++ b/kafka/tools/assigner/models/reassignment.py @@ -44,22 +44,23 @@ def dict_for_reassignment(self): reassignment['partitions'].append(partition.dict_for_reassignment()) return reassignment - def execute(self, num, total, zookeeper, tools_path, plugins=[], dry_run=True): + def execute(self, num, total, zookeeper, tools_path, throttle, plugins=[], dry_run=True): for plugin in plugins: plugin.before_execute_batch(num) if not dry_run: - self._execute(num, total, zookeeper, tools_path) + self._execute(num, total, zookeeper, tools_path, throttle) for plugin in plugins: plugin.after_execute_batch(num) - def _execute(self, num, total, zookeeper, tools_path): + def _execute(self, num, total, zookeeper, tools_path, throttle): with NamedTemporaryFile(mode='w') as assignfile: json.dump(self.dict_for_reassignment(), assignfile) assignfile.flush() FNULL = open(os.devnull, 'w') proc = subprocess.Popen(['{0}/kafka-reassign-partitions.sh'.format(tools_path), '--execute', '--zookeeper', zookeeper, - '--reassignment-json-file', assignfile.name], + '--reassignment-json-file', assignfile.name, + '--throttle', str(throttle)], stdout=FNULL, stderr=FNULL) proc.wait() @@ -69,10 +70,11 @@ def _execute(self, num, total, zookeeper, tools_path): if remaining_partitions == 0: break - log.info('Partition reassignment {0}/{1} in progress [ {2}/{3} partitions remain ]. Sleeping {4} seconds'.format(num, + log.info('Partition reassignment {0}/{1} in progress [ {2}/{3} partitions remain ], throttled at {4} bytes/s. Sleeping {5} seconds'.format(num, total, remaining_partitions, len(self.partitions), + throttle, self.pause_time)) time.sleep(self.pause_time) diff --git a/tests/tools/assigner/models/test_reassignment.py b/tests/tools/assigner/models/test_reassignment.py index e6491d1..4480556 100644 --- a/tests/tools/assigner/models/test_reassignment.py +++ b/tests/tools/assigner/models/test_reassignment.py @@ -41,12 +41,12 @@ def test_reassignment_repr(self): @patch.object(Reassignment, '_execute') def test_reassignment_execute_real(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=False) - mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 100000000, plugins=[self.null_plugin], dry_run=False) + mock_exec.assert_called_once_with(1, 1, 'zkconnect', '/path/to/tools', 100000000) @patch.object(Reassignment, '_execute') def test_reassignment_execute_dryrun(self, mock_exec): - self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', plugins=[self.null_plugin], dry_run=True) + self.reassignment.execute(1, 1, 'zkconnect', '/path/to/tools', 100000000, plugins=[self.null_plugin], dry_run=True) mock_exec.assert_not_called() @patch('kafka.tools.assigner.models.reassignment.subprocess.Popen', new_callable=MockPopen) @@ -55,9 +55,9 @@ def test_reassignment_internal_execute(self, mock_check, mock_popen): mock_popen.set_default() mock_check.side_effect = [10, 5, 0] - self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools') + self.reassignment._execute(1, 1, 'zkconnect', '/path/to/tools', 100000000) - compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY], + compare([call.Popen(['/path/to/tools/kafka-reassign-partitions.sh', '--execute', '--zookeeper', 'zkconnect', '--reassignment-json-file', ANY, '--throttle', ANY], stderr=ANY, stdout=ANY), call.Popen_instance.wait()], mock_popen.mock.method_calls) assert len(mock_check.mock_calls) == 3 diff --git a/tests/tools/assigner/test_main.py b/tests/tools/assigner/test_main.py index 665a6c6..0199854 100644 --- a/tests/tools/assigner/test_main.py +++ b/tests/tools/assigner/test_main.py @@ -80,6 +80,7 @@ def test_main(self, mock_plugins, mock_sizes): ple_size=2, ple_wait=120, sizer='ssh', + throttle='100000000', leadership=True, output_json=True) assert main() == 0