From 01f990d59244f715ab4f96255e76bacdd1152a94 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 10:50:08 +0100 Subject: [PATCH 1/8] Handle EOFError in RecordAdapter When stdin is empty, raise EOFError and log this as a warning. fixes #202 --- flow/record/base.py | 2 ++ flow/record/stream.py | 9 +++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/flow/record/base.py b/flow/record/base.py index 7b574ee4..80acf344 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -893,6 +893,8 @@ def RecordAdapter( "entering record text, rather than a record stream? This can be fixed by using " "'rdump -w -' to write a record stream to stdout." ) + elif not peek_data: + raise EOFError("Empty input stream") raise RecordAdapterNotFound("Could not find adapter for file-like object") # Now that we found an adapter, we will fall back into the same code path as when a URL is given. As the url diff --git a/flow/record/stream.py b/flow/record/stream.py index 07892238..0fae0fb1 100644 --- a/flow/record/stream.py +++ b/flow/record/stream.py @@ -164,11 +164,13 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R print("[reading from stdin]", file=sys.stderr) # Initial value for reader, in case of exception message - reader = "RecordReader" + reader: str | AbstractReader = "RecordReader" try: reader = RecordReader(src, selector=selector) yield from reader - reader.close() + except EOFError as e: + # End of file reached, likely no records in source + log.warning("%s(%r): %s", reader, src, e) except IOError as e: if len(sources) == 1: raise @@ -184,6 +186,9 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R else: log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e)) continue + finally: + if isinstance(reader, AbstractReader): + reader.close() class PathTemplateWriter: From ba25045a43d8e39b130f2da17b161fd24267591f Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 11:19:44 +0100 Subject: [PATCH 2/8] Add tests --- tests/record/test_adapter.py | 8 ++++++++ tests/tools/test_rdump.py | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/tests/record/test_adapter.py b/tests/record/test_adapter.py index a11c8470..6d49d311 100644 --- a/tests/record/test_adapter.py +++ b/tests/record/test_adapter.py @@ -499,3 +499,11 @@ def test_file_like_writer_reader() -> None: assert len(read_records) == 10 for idx, record in enumerate(read_records): assert record == test_records[idx] + + +def test_empty_stdin(monkeypatch: pytest.MonkeyPatch) -> None: + # Mock stdin to be empty + monkeypatch.setattr(sys, "stdin", BytesIO(b"")) + + with pytest.raises(EOFError, match="Empty input stream"): + RecordAdapter() diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 0877a25e..67b767dd 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -797,3 +797,39 @@ def test_rdump_catch_sigpipe(tmp_path: Path) -> None: assert "test/record count=0" in stdout assert "test/record count=1" in stdout assert len(stdout.splitlines()) == 2 + + +def test_rdump_empty_records_pipe(tmp_path: Path) -> None: + """Test that rdump handles empty records as input gracefully.""" + + # create an empty records file + path = tmp_path / "empty.records" + with RecordWriter(path): + pass + + # rdump empty.records | rdump -l + p1 = subprocess.Popen(["rdump", str(path)], stdout=subprocess.PIPE) + p2 = subprocess.Popen( + ["rdump", "-l"], + stdin=p1.stdout, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = p2.communicate() + assert b"RecordReader('-'): Empty input stream" in stderr.strip() + assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() + + +def test_rdump_empty_stdin_pipe(tmp_path: Path) -> None: + """Test that rdump handles empty stdin as input gracefully.""" + + # rdump empty.records | rdump -l + pipe = subprocess.Popen( + ["rdump", "-", "-l"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = pipe.communicate(input=None) + assert b"RecordReader('-'): Empty input stream" in stderr.strip() + assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() From e9793ffd740ceed199a41708e6c8acc1a36a2d3e Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 11:23:52 +0100 Subject: [PATCH 3/8] Improve test --- tests/tools/test_rdump.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 67b767dd..13dd47b2 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -807,6 +807,10 @@ def test_rdump_empty_records_pipe(tmp_path: Path) -> None: with RecordWriter(path): pass + # although the records file is empty, it should exist and have a RECORDSTREAM header + assert path.exists() + assert b"RECORDSTREAM" in path.read_bytes() + # rdump empty.records | rdump -l p1 = subprocess.Popen(["rdump", str(path)], stdout=subprocess.PIPE) p2 = subprocess.Popen( From 37c5c02266b1ab4d22d12579b3419490ea826079 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 11:31:08 +0100 Subject: [PATCH 4/8] Fix issues found by colpilot review --- tests/tools/test_rdump.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 13dd47b2..a1fa9191 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -824,12 +824,12 @@ def test_rdump_empty_records_pipe(tmp_path: Path) -> None: assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() -def test_rdump_empty_stdin_pipe(tmp_path: Path) -> None: +def test_rdump_empty_stdin_pipe() -> None: """Test that rdump handles empty stdin as input gracefully.""" - # rdump empty.records | rdump -l + # rdump -l (with empty stdin) pipe = subprocess.Popen( - ["rdump", "-", "-l"], + ["rdump", "-l"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, From 0e3963d7e8e5bdcf7741fd969a9018f0c8294175 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 11:39:02 +0100 Subject: [PATCH 5/8] Also check return code --- tests/tools/test_rdump.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index a1fa9191..2808ae5f 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -820,6 +820,7 @@ def test_rdump_empty_records_pipe(tmp_path: Path) -> None: stderr=subprocess.PIPE, ) stdout, stderr = p2.communicate() + assert p2.returncode == 0 assert b"RecordReader('-'): Empty input stream" in stderr.strip() assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() @@ -835,5 +836,6 @@ def test_rdump_empty_stdin_pipe() -> None: stderr=subprocess.PIPE, ) stdout, stderr = pipe.communicate(input=None) + assert pipe.returncode == 0 assert b"RecordReader('-'): Empty input stream" in stderr.strip() assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() From a53b428191bd46bef09af190cca4435665e8e8d8 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 11:52:37 +0100 Subject: [PATCH 6/8] Linting --- flow/record/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/record/base.py b/flow/record/base.py index 80acf344..44740e80 100644 --- a/flow/record/base.py +++ b/flow/record/base.py @@ -893,7 +893,7 @@ def RecordAdapter( "entering record text, rather than a record stream? This can be fixed by using " "'rdump -w -' to write a record stream to stdout." ) - elif not peek_data: + if not peek_data: raise EOFError("Empty input stream") raise RecordAdapterNotFound("Could not find adapter for file-like object") From ca1c69ea6e847ec0f3084ec8315a31701247bf1f Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 12:12:08 +0100 Subject: [PATCH 7/8] Add test with malformed input and test exit code --- tests/tools/test_rdump.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 2808ae5f..af67ab05 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -821,8 +821,8 @@ def test_rdump_empty_records_pipe(tmp_path: Path) -> None: ) stdout, stderr = p2.communicate() assert p2.returncode == 0 - assert b"RecordReader('-'): Empty input stream" in stderr.strip() - assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() + assert b"RecordReader('-'): Empty input stream" in stderr + assert b"Processed 0 records (matched=0, unmatched=0)" in stdout def test_rdump_empty_stdin_pipe() -> None: @@ -837,5 +837,21 @@ def test_rdump_empty_stdin_pipe() -> None: ) stdout, stderr = pipe.communicate(input=None) assert pipe.returncode == 0 - assert b"RecordReader('-'): Empty input stream" in stderr.strip() - assert b"Processed 0 records (matched=0, unmatched=0)" in stdout.strip() + assert b"RecordReader('-'): Empty input stream" in stderr + assert b"Processed 0 records (matched=0, unmatched=0)" in stdout + + +def test_rdump_invalid_stdin_pipe() -> None: + """Test that rdump handles invalid stdin as input gracefully.""" + + # rdump -l (with invalid stdin) + pipe = subprocess.Popen( + ["rdump", "-l"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + stdout, stderr = pipe.communicate(input=b"this is not a valid record stream") + assert pipe.returncode == 1, "rdump should exit with error code 1 on invalid input" + assert b"rdump encountered a fatal error: Could not find adapter for file-like object" in stderr + assert b"Processed 0 records (matched=0, unmatched=0)" in stdout From faa60893bebd455904b003be47b7095aeddde2c6 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 19 Jan 2026 12:43:30 +0100 Subject: [PATCH 8/8] Parameterize some tests just to be extra sure --- tests/tools/test_rdump.py | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index af67ab05..9f4df6b9 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -825,7 +825,14 @@ def test_rdump_empty_records_pipe(tmp_path: Path) -> None: assert b"Processed 0 records (matched=0, unmatched=0)" in stdout -def test_rdump_empty_stdin_pipe() -> None: +@pytest.mark.parametrize( + "stdin_bytes", + [ + b"", + None, + ], +) +def test_rdump_empty_stdin_pipe(stdin_bytes: bytes | None) -> None: """Test that rdump handles empty stdin as input gracefully.""" # rdump -l (with empty stdin) @@ -841,8 +848,16 @@ def test_rdump_empty_stdin_pipe() -> None: assert b"Processed 0 records (matched=0, unmatched=0)" in stdout -def test_rdump_invalid_stdin_pipe() -> None: - """Test that rdump handles invalid stdin as input gracefully.""" +@pytest.mark.parametrize( + "stdin_bytes", + [ + b"\n", + b"this is not a valid record stream", + b"RANDOMDATA", + ], +) +def test_rdump_invalid_stdin_pipe(stdin_bytes: bytes) -> None: + """Test that rdump handles invalid stdin as an error""" # rdump -l (with invalid stdin) pipe = subprocess.Popen( @@ -851,7 +866,7 @@ def test_rdump_invalid_stdin_pipe() -> None: stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) - stdout, stderr = pipe.communicate(input=b"this is not a valid record stream") + stdout, stderr = pipe.communicate(input=stdin_bytes) assert pipe.returncode == 1, "rdump should exit with error code 1 on invalid input" assert b"rdump encountered a fatal error: Could not find adapter for file-like object" in stderr assert b"Processed 0 records (matched=0, unmatched=0)" in stdout