|
19 | 19 | Demonstrates admission control in Python streaming data sources. |
20 | 20 |
|
21 | 21 | This example implements a simple blockchain-like streaming source that generates |
22 | | -sequential blocks and shows how to use admission control to limit batch sizes. |
| 22 | +sequential blocks and shows how to use admission control to limit batch sizes. # noqa: E501 |
23 | 23 |
|
24 | 24 | Usage: structured_blockchain_admission_control.py [<max-blocks-per-batch>] |
25 | | - <max-blocks-per-batch> Maximum number of blocks to process per microbatch (default: 10) |
| 25 | + <max-blocks-per-batch> Maximum number of blocks to process per microbatch |
| 26 | + (default: 10) |
26 | 27 |
|
27 | 28 | Run the example: |
28 | 29 | `$ bin/spark-submit examples/src/main/python/sql/streaming/\\ |
29 | 30 | structured_blockchain_admission_control.py 5` |
30 | 31 |
|
31 | | -The example will process blocks in controlled batches of 5, demonstrating admission control. |
| 32 | +The example will process blocks in controlled batches of 5, |
| 33 | +demonstrating admission control. |
32 | 34 | """ |
33 | 35 | import sys |
34 | 36 | import time |
35 | 37 |
|
36 | 38 | from pyspark.sql import SparkSession |
37 | | -from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition |
| 39 | +from pyspark.sql.datasource import ( |
| 40 | + DataSource, |
| 41 | + DataSourceStreamReader, |
| 42 | + InputPartition, |
| 43 | +) |
38 | 44 |
|
39 | 45 |
|
40 | 46 | class SimpleBlockchainReader(DataSourceStreamReader): |
41 | | - """A simple streaming source that generates sequential blockchain blocks.""" |
| 47 | + """A simple streaming source that generates sequential blockchain blocks.""" # noqa: E501 |
42 | 48 |
|
43 | 49 | def __init__(self, max_block=1000): |
44 | 50 | self.max_block = max_block |
@@ -71,8 +77,9 @@ def latestOffset(self, start=None, limit=None): |
71 | 77 | # Cap at the configured limit |
72 | 78 | end_block = min(start_block + max_blocks, latest_available) |
73 | 79 | print( |
74 | | - f" [Admission Control] Start: {start_block}, Available: {latest_available}, " |
75 | | - f"Capped: {end_block} (limit: {max_blocks})" |
| 80 | + f" [Admission Control] Start: {start_block}, " |
| 81 | + f"Available: {latest_available}, Capped: {end_block} " |
| 82 | + f"(limit: {max_blocks})" |
76 | 83 | ) |
77 | 84 | # Return tuple: (capped_offset, true_latest_offset) |
78 | 85 | return ({"block": end_block}, {"block": latest_available}) |
@@ -139,10 +146,9 @@ def streamReader(self, schema): |
139 | 146 | ================================================================= |
140 | 147 | """ |
141 | 148 | ) |
142 | | - |
143 | | - spark = ( |
144 | | - SparkSession.builder.appName("StructuredBlockchainAdmissionControl").getOrCreate() |
145 | | - ) |
| 149 | + # fmt: off |
| 150 | + spark = SparkSession.builder.appName("StructuredBlockchainAdmissionControl").getOrCreate() # noqa: E501 |
| 151 | + # fmt: on |
146 | 152 |
|
147 | 153 | # Register the custom data source |
148 | 154 | spark.dataSource.register(SimpleBlockchainSource) |
|
0 commit comments