diff --git a/README.rst b/README.rst index 0b1f11e..30db0b2 100644 --- a/README.rst +++ b/README.rst @@ -45,6 +45,21 @@ Get a row by key: row = table.get('00001') print(row) exit() +Get multi rows by keys: + +.. code-block:: python + + import hbase + + zk = 'sis3.ustcdm.org:2181,sis4.ustcdm.org:2181' + + if __name__ == '__main__': + with hbase.ConnectionPool(zk).connect() as conn: + table = conn['mytest']['videos'] + row_kv = table.mget(['00001','00002', '00002']) + print(row_kv) + exit() + Scan a table: diff --git a/hbase/client/client.py b/hbase/client/client.py index 307e6f5..5d95d51 100644 --- a/hbase/client/client.py +++ b/hbase/client/client.py @@ -7,12 +7,14 @@ import collections import time - +from concurrent import futures +from concurrent.futures.thread import ThreadPoolExecutor from . import filters from . import region as _region from .. import protobuf from .. import services from ..exceptions import * +from ..conf import thread_pool_size, fail_task_retry DEFAULT_FAMILY = 'cf' @@ -144,6 +146,8 @@ def __init__(self, zkquorum, zk_master_path=None, zk_region_path=None): self._master_service = services.MasterService(zkquorum, zk_master_path) self._region_manager = _region.RegionManager(zkquorum, zk_region_path) + self.fail_retrys = fail_task_retry + self.pool = ThreadPoolExecutor(max_workers=thread_pool_size) def __enter__(self): return self @@ -162,6 +166,8 @@ def close(self): self._region_manager.close() self._region_manager = None + self.pool.shutdown() + def namespaces(self): """List all namespaces. @@ -723,6 +729,124 @@ def _wait_for_proc(self, proc_id, sleep): else: break + def mget(self, table, keys, columns=None, filter_=None): + """Query to get a row object with multiple row keys. + + Args: + table (str): Table name. + key (tuple[str]|list[str]): Multi row keys. + columns (tuple[str]|list[str]): Columns to fetch. + filter_ (filters.Filter): Filter object. + + Returns: + Row: The list row object. + None: The row does not exist. + + Raises: + RegionError + RequestError + + TransportError + ZookeeperProtocolError + ServiceProtocolError + NoSuchZookeeperNodeError + + """ + pb_reqs = {} + for key in keys: + region = self._region_manager.get_region(table, key) + region_service = self._region_manager.get_service(region) + print(f'{region_service.host}:{region_service.port}') + pb_req = protobuf.GetRequest() + + pb_req.region.type = 1 + pb_req.region.value = region.name.encode() + + pb_get = pb_req.get + pb_get.row = key.encode() + + if columns is not None: + qualifier_dict = collections.defaultdict(list) + for column in columns: + try: + family, qualifier = column.split(':') + except ValueError or AttributeError: + raise RequestError( + 'Invalid column name. {family}:{qualifier} expected, got %s.' % column + ) + qualifier_dict[family.encode()].append(qualifier.encode()) + for family, qualifiers in qualifier_dict.items(): + pb_column = pb_get.column.add() + pb_column.family = family + pb_column.qualifier.extend(qualifiers) + + if filter_ is not None: + pb_filter = pb_get.filter + pb_filter.name = filter_.name + pb_filter.serialized_filter = filter_.serialize() + pb_reqs.setdefault(key, (region_service, pb_req)) + + def _asyncrun(pb_reqs): + results = {} + fails = [] + + def __request(region_service, pb_req): + try: + pb_resp = region_service.request(pb_req) + except RegionError: + raise Exception(f"request {region_service} failure.") + + return self._cells_to_row(pb_resp.result.cell) + + tasks = {self.pool.submit(__request, pb_reqs.get(key)[0], pb_reqs.get(key)[1]): key for key in pb_reqs} + + for f in futures.as_completed(tasks): + key = tasks[f] + try: + results.setdefault(key, f.result()) + except Exception as err: + # print(str(err)) + fails.append(key) + return results, fails + + results, fails = _asyncrun(pb_reqs) + + count = 0 + # Retry fails + while count < self.fail_retrys and len(fails) != 0: + new_pb_reqs = {} + for k in pb_reqs: + if k in fails: + new_pb_reqs[k] = pb_reqs[k] + r, fails = _asyncrun(new_pb_reqs) + # Merge re-fetch data from rpc request. + results.update(r) + time.sleep(3) + + # message GetResponse { + # optional Result result = 1; + # } + # region_service, pb_req, key, table, region + # try: + # pb_resp = region_service.request(pb_req) + # except RegionError: + # while True: + # time.sleep(3) + # # print('DEBUG: put() RegionError') + # # print(repr(region)) + # # refresh the region information and retry the operation + # region = self._region_manager.get_region(table, key, use_cache=False) + # region_service = self._region_manager.get_service(region) + # pb_req.region.value = region.name.encode() + # # if the new region still doesn't work, it is a fatal error + # # print(repr(region)) + # try: + # pb_resp = region_service.request(pb_req) + # break + # except RegionError: + # continue + return results + def get(self, table, key, diff --git a/hbase/client/region.py b/hbase/client/region.py index 49fd95e..5864d51 100644 --- a/hbase/client/region.py +++ b/hbase/client/region.py @@ -245,15 +245,16 @@ def _remove_from_cache(self, region_or_meta_key): pass def _region_lookup(self, meta_key): + #Fix from https://github.com/3601314/hbase-python/issues/3 column = protobuf.Column() column.family = b'info' - req = protobuf.GetRequest() - req.get.row = meta_key.encode() - req.get.column.extend([column]) - req.get.closest_row_before = True + req = protobuf.ScanRequest() + req.scan.column.extend([column]) + req.scan.start_row = meta_key.encode() + req.scan.reversed = True req.region.type = 1 req.region.value = b'hbase:meta,,1' - + req.number_of_rows = 1 try: resp = self._meta_service.request(req) except exceptions.RegionError: @@ -264,7 +265,10 @@ def _region_lookup(self, meta_key): break except exceptions.RegionError: continue - cells = resp.result.cell + cells = [] + for result in resp.results: + cells = result.cell + break if len(cells) == 0: return None diff --git a/hbase/conf.py b/hbase/conf.py index 89fed91..03e7bc9 100644 --- a/hbase/conf.py +++ b/hbase/conf.py @@ -9,6 +9,8 @@ num_threads_per_conn = 5 num_tasks_per_conn = 100 +thread_pool_size = 10 +fail_task_retry = 3 # Modify This Configuration on use zk path class Conf: