-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
What happened?
I am running a simple beam pipeline on a local flink cluster, however, I am seeing this if I tried to
ReadFromKafka()
with multiple topics, and then it will shows this error. but if I only do with single topic, then things is fine. Looking up on google, it seems like could be related to an issue with dill package, where they fixed the issue in a later version (3.3.0). uqfoundation/dill#318
However, currently beam is having a very narrow version requirement and thus I'm unable to bump dill package. Any idea how I can work around this issue ? Thanks!
taskmanager_1 | Traceback (most recent call last):
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 287, in _execute
taskmanager_1 | response = task()
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 360, in <lambda>
taskmanager_1 | lambda: self.create_worker().do_instruction(request), request)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 596, in do_instruction
taskmanager_1 | return getattr(self, request_type)(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 627, in process_bundle
taskmanager_1 | bundle_processor = self.bundle_processor_cache.get(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/sdk_worker.py", line 459, in get
taskmanager_1 | processor = bundle_processor.BundleProcessor(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 871, in __init__
taskmanager_1 | self.ops = self.create_execution_tree(self.process_bundle_descriptor)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 924, in create_execution_tree
taskmanager_1 | return collections.OrderedDict([(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 927, in <listcomp>
taskmanager_1 | get_operation(transform_id))) for transform_id in sorted(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 814, in wrapper
taskmanager_1 | result = cache[args] = func(*args)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
taskmanager_1 | transform_consumers = {
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <dictcomp>
taskmanager_1 | tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <listcomp>
taskmanager_1 | tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 814, in wrapper
taskmanager_1 | result = cache[args] = func(*args)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 906, in get_operation
taskmanager_1 | transform_consumers = {
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <dictcomp>
taskmanager_1 | tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 907, in <listcomp>
taskmanager_1 | tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 814, in wrapper
taskmanager_1 | result = cache[args] = func(*args)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 911, in get_operation
taskmanager_1 | return transform_factory.create_operation(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1206, in create_operation
taskmanager_1 | return creator(self, transform_id, transform_proto, payload, consumers)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1553, in create_par_do
taskmanager_1 | return _create_pardo_operation(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1596, in _create_pardo_operation
taskmanager_1 | dofn_data = pickler.loads(serialized_fn)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/pickler.py", line 51, in loads
taskmanager_1 | return desired_pickle_lib.loads(
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/apache_beam/internal/dill_pickler.py", line 289, in loads
taskmanager_1 | return dill.loads(s)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 275, in loads
taskmanager_1 | return load(file, ignore, **kwds)
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 270, in load
taskmanager_1 | return Unpickler(file, ignore=ignore, **kwds).load()
taskmanager_1 | File "/usr/local/lib/python3.8/site-packages/dill/_dill.py", line 472, in load
taskmanager_1 | obj = StockUnpickler.load(self)
taskmanager_1 | TypeError: an integer is required (got type bytes)
Issue Priority
Priority: 3 (minor)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner