diff --git a/kafka/tools/assigner/sizers/ssh.py b/kafka/tools/assigner/sizers/ssh.py index 537420a..4cb9164 100644 --- a/kafka/tools/assigner/sizers/ssh.py +++ b/kafka/tools/assigner/sizers/ssh.py @@ -41,8 +41,25 @@ def get_partition_sizes(self): raise UnknownBrokerException("Cannot get sizes for broker ID {0} which has no hostname. " "Remove the broker from the cluster before balance".format(broker_id)) - log.info("Getting partition sizes via SSH for {0}".format(broker.hostname)) - proc = subprocess.Popen(['ssh', broker.hostname, 'du -sk {0}/*'.format(self.properties['datadir'])], - stdout=subprocess.PIPE, stderr=FNULL) + if 'sshuser' in self.properties: + connection_endpoint = self.properties['sshuser']+'@'+broker.hostname + else: + connection_endpoint = broker.hostname + + if 'sshkey' in self.properties: + key = self.properties['sshkey'] + else: + key = None + + if key is None: + log.info("Getting partition sizes via SSH for {0}".format(broker.hostname)) + proc = subprocess.Popen(['ssh', connection_endpoint, 'du -sk {0}/*'.format(self.properties['datadir'])], + stdout=subprocess.PIPE, stderr=FNULL) + else: + log.info("Getting partition sizes via SSH using key: {0} for {1}".format(key, broker.hostname)) + proc = subprocess.Popen(['ssh', '-i', key, connection_endpoint, + 'du -sk {0}/*'.format(self.properties['datadir'])], + stdout=subprocess.PIPE, stderr=FNULL) + for line in proc.stdout: self.process_df_match(self.size_re.match(line.decode()), broker_id)