From 142e1eda1db343e6a12a61f6e5842d00f05ff44f Mon Sep 17 00:00:00 2001 From: ltagliamonte Date: Fri, 7 Sep 2018 15:40:12 -0700 Subject: [PATCH 1/4] introducing support for sshuser and sshkey in ssh sizer module --- kafka/tools/assigner/sizers/ssh.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/kafka/tools/assigner/sizers/ssh.py b/kafka/tools/assigner/sizers/ssh.py index 537420a..a2eea91 100644 --- a/kafka/tools/assigner/sizers/ssh.py +++ b/kafka/tools/assigner/sizers/ssh.py @@ -36,13 +36,32 @@ def get_partition_sizes(self): # Get broker partition sizes FNULL = open(os.devnull, 'w') + for broker_id, broker in self.cluster.brokers.items(): if broker.hostname is None: raise UnknownBrokerException("Cannot get sizes for broker ID {0} which has no hostname. " "Remove the broker from the cluster before balance".format(broker_id)) - log.info("Getting partition sizes via SSH for {0}".format(broker.hostname)) - proc = subprocess.Popen(['ssh', broker.hostname, 'du -sk {0}/*'.format(self.properties['datadir'])], - stdout=subprocess.PIPE, stderr=FNULL) + if 'sshuser' in self.properties: + connection_endpoint = self.properties['sshuser']+'@'+broker.hostname + else: + connection_endpoint = broker.hostname + + if 'sshkey' in self.properties: + key = self.properties['sshkey'] + else: + key = None + + if key is None: + log.info("Getting partition sizes via SSH for {0}".format(broker.hostname)) + proc = subprocess.Popen(['ssh', connection_endpoint, 'du -sk {0}/*'.format(self.properties['datadir'])], + stdout=subprocess.PIPE, stderr=FNULL) + else: + key = self.properties['sshkey'] + log.info("Getting partition sizes via SSH using key: {0} for {1}".format(key, broker.hostname)) + proc = subprocess.Popen(['ssh','-i', key, connection_endpoint, + 'du -sk {0}/*'.format(self.properties['datadir'])], + stdout=subprocess.PIPE, stderr=FNULL) + for line in proc.stdout: self.process_df_match(self.size_re.match(line.decode()), broker_id) From 519253c7ca74f0a9187938296ca44e3a7fe310c5 Mon Sep 17 00:00:00 2001 From: ltagliamonte Date: Fri, 7 Sep 2018 15:46:19 -0700 Subject: [PATCH 2/4] remove empty line --- kafka/tools/assigner/sizers/ssh.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/tools/assigner/sizers/ssh.py b/kafka/tools/assigner/sizers/ssh.py index a2eea91..99e3611 100644 --- a/kafka/tools/assigner/sizers/ssh.py +++ b/kafka/tools/assigner/sizers/ssh.py @@ -36,7 +36,6 @@ def get_partition_sizes(self): # Get broker partition sizes FNULL = open(os.devnull, 'w') - for broker_id, broker in self.cluster.brokers.items(): if broker.hostname is None: raise UnknownBrokerException("Cannot get sizes for broker ID {0} which has no hostname. " From db0a822325f25266cd6018befc6d1c4b99675ca9 Mon Sep 17 00:00:00 2001 From: ltagliamonte Date: Fri, 7 Sep 2018 15:49:40 -0700 Subject: [PATCH 3/4] remove useless assignment --- kafka/tools/assigner/sizers/ssh.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/tools/assigner/sizers/ssh.py b/kafka/tools/assigner/sizers/ssh.py index 99e3611..8560440 100644 --- a/kafka/tools/assigner/sizers/ssh.py +++ b/kafka/tools/assigner/sizers/ssh.py @@ -56,7 +56,6 @@ def get_partition_sizes(self): proc = subprocess.Popen(['ssh', connection_endpoint, 'du -sk {0}/*'.format(self.properties['datadir'])], stdout=subprocess.PIPE, stderr=FNULL) else: - key = self.properties['sshkey'] log.info("Getting partition sizes via SSH using key: {0} for {1}".format(key, broker.hostname)) proc = subprocess.Popen(['ssh','-i', key, connection_endpoint, 'du -sk {0}/*'.format(self.properties['datadir'])], From 99fee36ef2b27a5403efa765a0fb5ab68d86b321 Mon Sep 17 00:00:00 2001 From: ltagliamonte Date: Fri, 7 Sep 2018 16:22:12 -0700 Subject: [PATCH 4/4] fix E231 missing whitespace after , --- kafka/tools/assigner/sizers/ssh.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/tools/assigner/sizers/ssh.py b/kafka/tools/assigner/sizers/ssh.py index 8560440..4cb9164 100644 --- a/kafka/tools/assigner/sizers/ssh.py +++ b/kafka/tools/assigner/sizers/ssh.py @@ -57,7 +57,7 @@ def get_partition_sizes(self): stdout=subprocess.PIPE, stderr=FNULL) else: log.info("Getting partition sizes via SSH using key: {0} for {1}".format(key, broker.hostname)) - proc = subprocess.Popen(['ssh','-i', key, connection_endpoint, + proc = subprocess.Popen(['ssh', '-i', key, connection_endpoint, 'du -sk {0}/*'.format(self.properties['datadir'])], stdout=subprocess.PIPE, stderr=FNULL)