Skip to content

Conversation

@gaogaotiantian
Copy link
Contributor

What changes were proposed in this pull request?

Use the modern itertools to do _batch function in BatchedSerializer to make code cleaner and faster.

The code is about 170% faster than the original implementation.

Result with the following code
Batching batch_original took 0.3086 seconds
Batching batch_after took 0.1159 seconds
import itertools
import time

def batch_original(iterator, batch_size):
    items = []
    count = 0
    for item in iterator:
        items.append(item)
        count += 1
        if count == batch_size:
            yield items
            items = []
            count = 0
    if items:
        yield items

def batch_list(iterator, batch_size):
    n = len(iterator)
    for i in range(0, n, batch_size):
        yield iterator[i : i + batch_size]

def batch_after(iterator, batch_size):
    it = iter(iterator)
    while batch := list(itertools.islice(it, batch_size)):
        yield batch


def do_test(iterator, batch):
    result = []
    start = time.perf_counter_ns()
    for b in batch(iterator, 10000):
        result.append(b)
    end = time.perf_counter_ns()
    print(f"Batching {batch.__name__} took {(end - start)/1e9:.4f} seconds")
    return result

if __name__ == "__main__":
    data = range(10000005)

    result_original = do_test(data, batch_original)
    result_after = do_test(data, batch_after)

    assert result_original == result_after

    data = list(range(10000005))
    result_list = do_test(data, batch_list)
    result_after = do_test(data, batch_after)
    assert result_list == result_after

Notice that __getslice__ is removed since Python 3.0, so the optimization for known size iterators like lists is not working at all. There's no simple way to know if an iterator supports slice operation now. The most straightforward way is to try it out like iterator[:1] - I don't know how frequent we are dealing with lists, if the iterator is often lists, then we can do it. The raw [:] operation is 22% faster than this implementation.

I like the simplicity without the try ... except ... block.

Why are the changes needed?

Most importantly, the code is less verbose. Also it's much faster.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

The script above checks if the result is the same as before. Also we will have CI.

Was this patch authored or co-authored using generative AI tooling?

No

@gaogaotiantian
Copy link
Contributor Author

gaogaotiantian commented Nov 17, 2025

@HyukjinKwon this should be pretty straightforward. Let me know if you have any concerns. It's a bit trivial to start a JIRA ticket, but if that's needed I can create one.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM let's file a JIRA though

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Ya, please add a JIRA information when the ASF JIRA is back to normal, @gaogaotiantian . I'll block this PR to prevent an accidental merging.

@gaogaotiantian gaogaotiantian changed the title [PYTHON] Modernize the _batched method for BatchedSerializer [SPARK-54384][PYTHON] Modernize the _batched method for BatchedSerializer Nov 17, 2025
@gaogaotiantian
Copy link
Contributor Author

The SPARK JIRA is added.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

@HyukjinKwon
Copy link
Member

Merged to master.

@gaogaotiantian gaogaotiantian deleted the modernize-batch branch November 17, 2025 23:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants