Skip to content

Commit fd6d1b3

Browse files
[HttpParser] Memory view compliant, Zero copies (#1067)
* Remove usage of `tobytes` * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix chunk parser * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Lint fixes Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent f48aac4 commit fd6d1b3

File tree

14 files changed

+310
-268
lines changed

14 files changed

+310
-268
lines changed

proxy/http/handler.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,9 +268,7 @@ def _discover_plugin_klass(self, protocol: int) -> Optional[Type['HttpProtocolHa
268268
def _parse_first_request(self, data: memoryview) -> bool:
269269
# Parse http request
270270
try:
271-
# TODO(abhinavsingh): Remove .tobytes after parser is
272-
# memoryview compliant
273-
self.request.parse(data.tobytes())
271+
self.request.parse(data)
274272
except HttpProtocolException as e: # noqa: WPS329
275273
self.work.queue(BAD_REQUEST_RESPONSE_PKT)
276274
raise e

proxy/http/parser/chunk.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,13 @@ def __init__(self) -> None:
3434
# Expected size of next following chunk
3535
self.size: Optional[int] = None
3636

37-
def parse(self, raw: bytes) -> bytes:
37+
def parse(self, raw: memoryview) -> memoryview:
3838
more = len(raw) > 0
3939
while more and self.state != chunkParserStates.COMPLETE:
40-
more, raw = self.process(raw)
40+
more, raw = self.process(raw.tobytes())
4141
return raw
4242

43-
def process(self, raw: bytes) -> Tuple[bool, bytes]:
43+
def process(self, raw: bytes) -> Tuple[bool, memoryview]:
4444
if self.state == chunkParserStates.WAITING_FOR_SIZE:
4545
# Consume prior chunk in buffer
4646
# in case chunk size without CRLF was received
@@ -69,7 +69,7 @@ def process(self, raw: bytes) -> Tuple[bool, bytes]:
6969
self.state = chunkParserStates.WAITING_FOR_SIZE
7070
self.chunk = b''
7171
self.size = None
72-
return len(raw) > 0, raw
72+
return len(raw) > 0, memoryview(raw)
7373

7474
@staticmethod
7575
def to_chunks(raw: bytes, chunk_size: int = DEFAULT_BUFFER_SIZE) -> bytes:

proxy/http/parser/parser.py

Lines changed: 38 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(
7777
# Total size of raw bytes passed for parsing
7878
self.total_size: int = 0
7979
# Buffer to hold unprocessed bytes
80-
self.buffer: bytes = b''
80+
self.buffer: Optional[memoryview] = None
8181
# Internal headers data structure:
8282
# - Keys are lower case header names.
8383
# - Values are 2-tuple containing original
@@ -102,13 +102,13 @@ def request(
102102
httpParserTypes.REQUEST_PARSER,
103103
enable_proxy_protocol=enable_proxy_protocol,
104104
)
105-
parser.parse(raw)
105+
parser.parse(memoryview(raw))
106106
return parser
107107

108108
@classmethod
109109
def response(cls: Type[T], raw: bytes) -> T:
110110
parser = cls(httpParserTypes.RESPONSE_PARSER)
111-
parser.parse(raw)
111+
parser.parse(memoryview(raw))
112112
return parser
113113

114114
def header(self, key: bytes) -> bytes:
@@ -206,14 +206,21 @@ def body_expected(self) -> bool:
206206
"""Returns true if content or chunked response is expected."""
207207
return self._content_expected or self._is_chunked_encoded
208208

209-
def parse(self, raw: bytes, allowed_url_schemes: Optional[List[bytes]] = None) -> None:
209+
def parse(
210+
self,
211+
raw: memoryview,
212+
allowed_url_schemes: Optional[List[bytes]] = None,
213+
) -> None:
210214
"""Parses HTTP request out of raw bytes.
211215
212216
Check for `HttpParser.state` after `parse` has successfully returned."""
213217
size = len(raw)
214218
self.total_size += size
215-
raw = self.buffer + raw
216-
self.buffer, more = b'', size > 0
219+
if self.buffer:
220+
# TODO(abhinavsingh): Instead of tobytes our parser
221+
# must be capable of working with arrays of memoryview
222+
raw = memoryview(self.buffer.tobytes() + raw.tobytes())
223+
self.buffer, more = None, size > 0
217224
while more and self.state != httpParserStates.COMPLETE:
218225
# gte with HEADERS_COMPLETE also encapsulated RCVING_BODY state
219226
if self.state >= httpParserStates.HEADERS_COMPLETE:
@@ -237,7 +244,7 @@ def parse(self, raw: bytes, allowed_url_schemes: Optional[List[bytes]] = None) -
237244
not (self._content_expected or self._is_chunked_encoded) and \
238245
raw == b'':
239246
self.state = httpParserStates.COMPLETE
240-
self.buffer = raw
247+
self.buffer = None if raw == b'' else raw
241248

242249
def build(self, disable_headers: Optional[List[bytes]] = None, for_proxy: bool = False) -> bytes:
243250
"""Rebuild the request object."""
@@ -278,7 +285,7 @@ def build_response(self) -> bytes:
278285
body=self._get_body_or_chunks(),
279286
)
280287

281-
def _process_body(self, raw: bytes) -> Tuple[bool, bytes]:
288+
def _process_body(self, raw: memoryview) -> Tuple[bool, memoryview]:
282289
# Ref: http://www.ietf.org/rfc/rfc2616.txt
283290
# 3.If a Content-Length header field (section 14.13) is present, its
284291
# decimal value in OCTETs represents both the entity-length and the
@@ -297,7 +304,8 @@ def _process_body(self, raw: bytes) -> Tuple[bool, bytes]:
297304
self.body = self.chunk.body
298305
self.state = httpParserStates.COMPLETE
299306
more = False
300-
elif self._content_expected:
307+
return more, raw
308+
if self._content_expected:
301309
self.state = httpParserStates.RCVING_BODY
302310
if self.body is None:
303311
self.body = b''
@@ -307,23 +315,21 @@ def _process_body(self, raw: bytes) -> Tuple[bool, bytes]:
307315
if self.body and \
308316
len(self.body) == int(self.header(b'content-length')):
309317
self.state = httpParserStates.COMPLETE
310-
more, raw = len(raw) > 0, raw[total_size - received_size:]
311-
else:
312-
self.state = httpParserStates.RCVING_BODY
313-
# Received a packet without content-length header
314-
# and no transfer-encoding specified.
315-
#
316-
# This can happen for both HTTP/1.0 and HTTP/1.1 scenarios.
317-
# Currently, we consume the remaining buffer as body.
318-
#
319-
# Ref https://github.com/abhinavsingh/proxy.py/issues/398
320-
#
321-
# See TestHttpParser.test_issue_398 scenario
322-
self.body = raw
323-
more, raw = False, b''
324-
return more, raw
325-
326-
def _process_headers(self, raw: bytes) -> Tuple[bool, bytes]:
318+
return len(raw) > 0, raw[total_size - received_size:]
319+
# Received a packet without content-length header
320+
# and no transfer-encoding specified.
321+
#
322+
# This can happen for both HTTP/1.0 and HTTP/1.1 scenarios.
323+
# Currently, we consume the remaining buffer as body.
324+
#
325+
# Ref https://github.com/abhinavsingh/proxy.py/issues/398
326+
#
327+
# See TestHttpParser.test_issue_398 scenario
328+
self.state = httpParserStates.RCVING_BODY
329+
self.body = raw
330+
return False, memoryview(b'')
331+
332+
def _process_headers(self, raw: memoryview) -> Tuple[bool, memoryview]:
327333
"""Returns False when no CRLF could be found in received bytes.
328334
329335
TODO: We should not return until parser reaches headers complete
@@ -334,10 +340,10 @@ def _process_headers(self, raw: bytes) -> Tuple[bool, bytes]:
334340
This will also help make the parser even more stateless.
335341
"""
336342
while True:
337-
parts = raw.split(CRLF, 1)
343+
parts = raw.tobytes().split(CRLF, 1)
338344
if len(parts) == 1:
339345
return False, raw
340-
line, raw = parts[0], parts[1]
346+
line, raw = parts[0], memoryview(parts[1])
341347
if self.state in (httpParserStates.LINE_RCVD, httpParserStates.RCVING_HEADERS):
342348
if line == b'' or line.strip() == b'': # Blank line received.
343349
self.state = httpParserStates.HEADERS_COMPLETE
@@ -352,14 +358,14 @@ def _process_headers(self, raw: bytes) -> Tuple[bool, bytes]:
352358

353359
def _process_line(
354360
self,
355-
raw: bytes,
361+
raw: memoryview,
356362
allowed_url_schemes: Optional[List[bytes]] = None,
357-
) -> Tuple[bool, bytes]:
363+
) -> Tuple[bool, memoryview]:
358364
while True:
359-
parts = raw.split(CRLF, 1)
365+
parts = raw.tobytes().split(CRLF, 1)
360366
if len(parts) == 1:
361367
return False, raw
362-
line, raw = parts[0], parts[1]
368+
line, raw = parts[0], memoryview(parts[1])
363369
if self.type == httpParserTypes.REQUEST_PARSER:
364370
if self.protocol is not None and self.protocol.version is None:
365371
# We expect to receive entire proxy protocol v1 line

proxy/http/proxy/server.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -276,11 +276,8 @@ async def read_from_descriptors(self, r: Readables) -> bool:
276276
if self.response.is_complete:
277277
self.handle_pipeline_response(raw)
278278
else:
279-
# TODO(abhinavsingh): Remove .tobytes after parser is
280-
# memoryview compliant
281-
chunk = raw.tobytes()
282-
self.response.parse(chunk)
283-
self.emit_response_events(len(chunk))
279+
self.response.parse(raw)
280+
self.emit_response_events(len(raw))
284281
else:
285282
self.response.total_size += len(raw)
286283
# queue raw data for client
@@ -430,7 +427,6 @@ def on_client_data(self, raw: memoryview) -> None:
430427
# must be treated as WebSocket protocol packets.
431428
self.upstream.queue(raw)
432429
return
433-
434430
if self.pipeline_request is None:
435431
# For pipeline requests, we never
436432
# want to use --enable-proxy-protocol flag
@@ -443,10 +439,7 @@ def on_client_data(self, raw: memoryview) -> None:
443439
self.pipeline_request = HttpParser(
444440
httpParserTypes.REQUEST_PARSER,
445441
)
446-
447-
# TODO(abhinavsingh): Remove .tobytes after parser is
448-
# memoryview compliant
449-
self.pipeline_request.parse(raw.tobytes())
442+
self.pipeline_request.parse(raw)
450443
if self.pipeline_request.is_complete:
451444
for plugin in self.plugins.values():
452445
assert self.pipeline_request is not None
@@ -555,9 +548,7 @@ def handle_pipeline_response(self, raw: memoryview) -> None:
555548
self.pipeline_response = HttpParser(
556549
httpParserTypes.RESPONSE_PARSER,
557550
)
558-
# TODO(abhinavsingh): Remove .tobytes after parser is memoryview
559-
# compliant
560-
self.pipeline_response.parse(raw.tobytes())
551+
self.pipeline_response.parse(raw)
561552
if self.pipeline_response.is_complete:
562553
self.pipeline_response = None
563554

proxy/http/server/web.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,7 @@ def on_client_data(self, raw: memoryview) -> None:
201201
self.pipeline_request = HttpParser(
202202
httpParserTypes.REQUEST_PARSER,
203203
)
204-
# TODO(abhinavsingh): Remove .tobytes after parser is memoryview
205-
# compliant
206-
self.pipeline_request.parse(raw.tobytes())
204+
self.pipeline_request.parse(raw)
207205
if self.pipeline_request.is_complete:
208206
self.route.handle_request(self.pipeline_request)
209207
if not self.pipeline_request.is_http_1_1_keep_alive:

proxy/http/websocket/client.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def upgrade(self) -> None:
7777
),
7878
)
7979
response = HttpParser(httpParserTypes.RESPONSE_PARSER)
80-
response.parse(self.sock.recv(DEFAULT_BUFFER_SIZE))
80+
response.parse(memoryview(self.sock.recv(DEFAULT_BUFFER_SIZE)))
8181
accept = response.header(b'Sec-Websocket-Accept')
8282
assert WebsocketFrame.key_to_accept(key) == accept
8383

@@ -100,8 +100,6 @@ def run_once(self) -> bool:
100100
self.closed = True
101101
return True
102102
frame = WebsocketFrame()
103-
# TODO(abhinavsingh): Remove .tobytes after parser is
104-
# memoryview compliant
105103
frame.parse(raw.tobytes())
106104
self.on_message(frame)
107105
elif mask & selectors.EVENT_WRITE:

proxy/plugin/modify_chunk_response.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def __init__(self, *args: Any, **kwargs: Any) -> None:
3232
def handle_upstream_chunk(self, chunk: memoryview) -> Optional[memoryview]:
3333
# Parse the response.
3434
# Note that these chunks also include headers
35-
self.response.parse(chunk.tobytes())
35+
self.response.parse(chunk)
3636
# If response is complete, modify and dispatch to client
3737
if self.response.is_complete:
3838
# Avoid setting a body for responses where a body is not expected.

tests/http/parser/test_chunk_parser.py

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,18 @@ def setUp(self) -> None:
2020

2121
def test_chunk_parse_basic(self) -> None:
2222
self.parser.parse(
23-
b''.join([
24-
b'4\r\n',
25-
b'Wiki\r\n',
26-
b'5\r\n',
27-
b'pedia\r\n',
28-
b'E\r\n',
29-
b' in\r\n\r\nchunks.\r\n',
30-
b'0\r\n',
31-
b'\r\n',
32-
]),
23+
memoryview(
24+
b''.join([
25+
b'4\r\n',
26+
b'Wiki\r\n',
27+
b'5\r\n',
28+
b'pedia\r\n',
29+
b'E\r\n',
30+
b' in\r\n\r\nchunks.\r\n',
31+
b'0\r\n',
32+
b'\r\n',
33+
]),
34+
),
3335
)
3436
self.assertEqual(self.parser.chunk, b'')
3537
self.assertEqual(self.parser.size, None)
@@ -38,55 +40,55 @@ def test_chunk_parse_basic(self) -> None:
3840

3941
def test_chunk_parse_issue_27(self) -> None:
4042
"""Case when data ends with the chunk size but without ending CRLF."""
41-
self.parser.parse(b'3')
43+
self.parser.parse(memoryview(b'3'))
4244
self.assertEqual(self.parser.chunk, b'3')
4345
self.assertEqual(self.parser.size, None)
4446
self.assertEqual(self.parser.body, b'')
4547
self.assertEqual(
4648
self.parser.state,
4749
chunkParserStates.WAITING_FOR_SIZE,
4850
)
49-
self.parser.parse(b'\r\n')
51+
self.parser.parse(memoryview(b'\r\n'))
5052
self.assertEqual(self.parser.chunk, b'')
5153
self.assertEqual(self.parser.size, 3)
5254
self.assertEqual(self.parser.body, b'')
5355
self.assertEqual(
5456
self.parser.state,
5557
chunkParserStates.WAITING_FOR_DATA,
5658
)
57-
self.parser.parse(b'abc')
59+
self.parser.parse(memoryview(b'abc'))
5860
self.assertEqual(self.parser.chunk, b'')
5961
self.assertEqual(self.parser.size, None)
6062
self.assertEqual(self.parser.body, b'abc')
6163
self.assertEqual(
6264
self.parser.state,
6365
chunkParserStates.WAITING_FOR_SIZE,
6466
)
65-
self.parser.parse(b'\r\n')
67+
self.parser.parse(memoryview(b'\r\n'))
6668
self.assertEqual(self.parser.chunk, b'')
6769
self.assertEqual(self.parser.size, None)
6870
self.assertEqual(self.parser.body, b'abc')
6971
self.assertEqual(
7072
self.parser.state,
7173
chunkParserStates.WAITING_FOR_SIZE,
7274
)
73-
self.parser.parse(b'4\r\n')
75+
self.parser.parse(memoryview(b'4\r\n'))
7476
self.assertEqual(self.parser.chunk, b'')
7577
self.assertEqual(self.parser.size, 4)
7678
self.assertEqual(self.parser.body, b'abc')
7779
self.assertEqual(
7880
self.parser.state,
7981
chunkParserStates.WAITING_FOR_DATA,
8082
)
81-
self.parser.parse(b'defg\r\n0')
83+
self.parser.parse(memoryview(b'defg\r\n0'))
8284
self.assertEqual(self.parser.chunk, b'0')
8385
self.assertEqual(self.parser.size, None)
8486
self.assertEqual(self.parser.body, b'abcdefg')
8587
self.assertEqual(
8688
self.parser.state,
8789
chunkParserStates.WAITING_FOR_SIZE,
8890
)
89-
self.parser.parse(b'\r\n\r\n')
91+
self.parser.parse(memoryview(b'\r\n\r\n'))
9092
self.assertEqual(self.parser.chunk, b'')
9193
self.assertEqual(self.parser.size, None)
9294
self.assertEqual(self.parser.body, b'abcdefg')

0 commit comments

Comments
 (0)