Skip to content

CORE-1998 Write schemaless Parquet files to S3#1

Open
jcipar wants to merge 16 commits intoproject/datalakefrom
jcipar/parquet-arrow-writing-consumer
Open

CORE-1998 Write schemaless Parquet files to S3#1
jcipar wants to merge 16 commits intoproject/datalakefrom
jcipar/parquet-arrow-writing-consumer

Conversation

@jcipar
Copy link
Copy Markdown
Owner

@jcipar jcipar commented Apr 11, 2024

Backports Required

  • none - not a bug fix
  • none - this is a backport
  • none - issue does not exist in previous branches
  • none - papercut/not impactful enough to backport
  • v23.3.x
  • v23.2.x

Release Notes

jcipar added 16 commits March 28, 2024 12:08
This is the first step towards outputting Parquet files to support
Iceberg. It adds a class, `arrow_writing_consumer`, that can be used
with `record_batch_reader::consume` to write a log to a Parquet file.

This change also adds a call to arrow_writing_consumer from
ntp_archiver_service so that any time we write a segment to tiered
storage, we also output a local Parquet file.
This implements a simple filter based on the topic name. Topics prefixed
with `experimental_datalake_` will be written to Parquet, others will
not.

This introduces some tech debt in the interest of getting a
proof-of-concept working quickly. Eventually we want to make this a
configuration option and not a magic prefix.
Before this change, Parquet files had paths like this:
/tmp/parquet_files/0730b620/kafka/experimental_datalake_test_topic/0_22/83-86-372-3-v1.log.3

To read an entire topic would could try
`pandas.read_parquet("/tmp/ parquet_files/")`,
but that would read all data for all topics. Alternatively we could
iterate through the subdirectories,  but that is painful.

After this change, the topic name comes first:
/tmp/parquet_files/experimental_datalake_test_topic/0730b620/kafka/experimental_datalake_test_topic/0_22/83-86-372-3-v1.log.3

So reading a single topic is simply:
`pandas.read_parquet("/tmp/parquet_files/topic_name")`

Tech Debt:
I believe the prefix directories existed to load balance the s3 bucket.
By putting everything for a topic in a single prefix we're breaking the
load balancing in s3. The addition of Iceberg metadata will allow us
to read a single topic regardless of file layout, and we can undo this
change, but this is useful for near-term experimentation.
DO NOT MERGE: Fix the lingering FIXMEs and TODOs first. Most
importantly, ensure that it doesn't crash when the file doesn't exist!

This adds a new method, `put_parquet_file` that uploads the parquet file
to the appropriate location in S3. New parquet files are uploaded to s3
in a location prefixed by the topic name. This allows them to be read
with a single command like `pandas.read_parquet`.

Tech Debt:
This introduces some FIXMEs and TODOs that should be removed before merging.

Additionally, it like the commits higher in this stack, it doesn't
handle memory well. The entire Parquet file is read into an iobuf in
memory and then written to s3. We should be able to pipe the input
stream directly to the s3 client.

Also, it does not support retries or anything. I need to understand
the code better to know what happens in the event of a crash. Ideally
we should try uploading any segments that aren't there yet. This may
already be the behavior: since we upload the Parquet file *before* the
segment file, if we fail while uploading parquet the segment won't be
uploaded. The NTP archiver will try again to upload the segment, and
this code will be called again.

If we faile in between writing Parquet and uploading the segment, we
will upload the Parquet file again, which is inefficient, but should not
be incorrect.
This adds a new method, `put_parquet_file` that uploads the parquet file
to the appropriate location in S3. New parquet files are uploaded to s3
in a location prefixed by the topic name. This allows them to be read
with a single command like `pandas.read_parquet`.

Tech Debt:
It, like the commits higher in this stack, doesn't handle memory well.
The entire Parquet file is read into an iobuf in memory and then written
to s3. We should be able to pipe the input stream directly to the s3
client.

Also, it does not support retries or anything. I need to understand
the code better to know what happens in the event of a crash. Ideally
we should try uploading any segments that aren't there yet. This may
already be the behavior: since we upload the Parquet file *before* the
segment file, if we fail while uploading parquet the segment won't be
uploaded. The NTP archiver will try again to upload the segment, and
this code will be called again.

If we fail in between writing Parquet and uploading the segment, we
will upload the Parquet file again, which is inefficient, but should not
be incorrect.
This is currently getting the internal offset. I
need to figure out how to transalate this to the
Kafka offset.
I had been mistakenly limiting it to 4 KiB. This increases the limit to
10 GiB so it will read the whole batch.

Tech Debt:
This contributes to the ongoing problem: I should be streaming from the
batch to Parquet, and not reading everythign at once.
…ipar/redpanda into jcipar/parquet-arrow-writing-consumer
This factors out the IO code that actually writes the Parquet file.
Now the arrow_writing_consumer returns an arrow::Table which can be
inspected by a test. The IO code to write the file remains untested, but
is pretty simple: there is a method that takes a table and writes it to
the given filename.
…ipar/redpanda into jcipar/parquet-arrow-writing-consumer
…ipar/redpanda into jcipar/parquet-arrow-writing-consumer
jcipar pushed a commit that referenced this pull request Sep 26, 2024
=================================================================
==524==ERROR: AddressSanitizer: container-overflow on address 0x52100028a100 at pc 0x7f29c0bf51fd bp 0x7ffda75397d0 sp 0x7ffda75397c8
READ of size 4 at 0x52100028a100 thread T0
    #0 0x7f29c0bf51fc in util::mem_tracker::pretty_print_ascii() const /var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-024f8676eb7b139e9-1/redpanda/vtools/src/v/utils/tracking_allocator.cc:75:31
    #1 0x55be1ac7eb7b in mem_tracker_pretty_printing::test_method() /var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-024f8676eb7b139e9-1/redpanda/vtools/src/v/utils/tests/tracking_allocator_tests.cc:82:5
    #2 0x55be1ac7d651 in mem_tracker_pretty_printing_invoker() /var/lib/buildkite-agent/builds/buildkite-amd64-builders-i-024f8676eb7b139e9-1/redpanda/vtools/src/v/utils/tests/tracking_allocator_tests.cc:50:1

Signed-off-by: Noah Watkins <noahwatkins@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant