Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -893,6 +893,8 @@ def RecordAdapter(
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
)
if not peek_data:
raise EOFError("Empty input stream")
raise RecordAdapterNotFound("Could not find adapter for file-like object")

# Now that we found an adapter, we will fall back into the same code path as when a URL is given. As the url
Expand Down
9 changes: 7 additions & 2 deletions flow/record/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,13 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R
print("[reading from stdin]", file=sys.stderr)

# Initial value for reader, in case of exception message
reader = "RecordReader"
reader: str | AbstractReader = "RecordReader"
try:
reader = RecordReader(src, selector=selector)
yield from reader
reader.close()
except EOFError as e:
# End of file reached, likely no records in source
log.warning("%s(%r): %s", reader, src, e)
except IOError as e:
if len(sources) == 1:
raise
Expand All @@ -184,6 +186,9 @@ def record_stream(sources: list[str], selector: str | None = None) -> Iterator[R
else:
log.warning("Exception in %r for %r: %s -- skipping to next reader", reader, src, aRepr.repr(e))
continue
finally:
if isinstance(reader, AbstractReader):
reader.close()


class PathTemplateWriter:
Expand Down
8 changes: 8 additions & 0 deletions tests/record/test_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -499,3 +499,11 @@ def test_file_like_writer_reader() -> None:
assert len(read_records) == 10
for idx, record in enumerate(read_records):
assert record == test_records[idx]


def test_empty_stdin(monkeypatch: pytest.MonkeyPatch) -> None:
# Mock stdin to be empty
monkeypatch.setattr(sys, "stdin", BytesIO(b""))

with pytest.raises(EOFError, match="Empty input stream"):
RecordAdapter()
73 changes: 73 additions & 0 deletions tests/tools/test_rdump.py
Original file line number Diff line number Diff line change
Expand Up @@ -797,3 +797,76 @@ def test_rdump_catch_sigpipe(tmp_path: Path) -> None:
assert "test/record count=0" in stdout
assert "test/record count=1" in stdout
assert len(stdout.splitlines()) == 2


def test_rdump_empty_records_pipe(tmp_path: Path) -> None:
"""Test that rdump handles empty records as input gracefully."""

# create an empty records file
path = tmp_path / "empty.records"
with RecordWriter(path):
pass

# although the records file is empty, it should exist and have a RECORDSTREAM header
assert path.exists()
assert b"RECORDSTREAM" in path.read_bytes()

# rdump empty.records | rdump -l
p1 = subprocess.Popen(["rdump", str(path)], stdout=subprocess.PIPE)
p2 = subprocess.Popen(
["rdump", "-l"],
stdin=p1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = p2.communicate()
assert p2.returncode == 0
assert b"RecordReader('-'): Empty input stream" in stderr
assert b"Processed 0 records (matched=0, unmatched=0)" in stdout


@pytest.mark.parametrize(
"stdin_bytes",
[
b"",
None,
],
)
def test_rdump_empty_stdin_pipe(stdin_bytes: bytes | None) -> None:
"""Test that rdump handles empty stdin as input gracefully."""

# rdump -l (with empty stdin)
pipe = subprocess.Popen(
["rdump", "-l"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = pipe.communicate(input=None)
assert pipe.returncode == 0
assert b"RecordReader('-'): Empty input stream" in stderr
assert b"Processed 0 records (matched=0, unmatched=0)" in stdout


@pytest.mark.parametrize(
"stdin_bytes",
[
b"\n",
b"this is not a valid record stream",
b"RANDOMDATA",
],
)
def test_rdump_invalid_stdin_pipe(stdin_bytes: bytes) -> None:
"""Test that rdump handles invalid stdin as an error"""

# rdump -l (with invalid stdin)
pipe = subprocess.Popen(
["rdump", "-l"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
stdout, stderr = pipe.communicate(input=stdin_bytes)
assert pipe.returncode == 1, "rdump should exit with error code 1 on invalid input"
assert b"rdump encountered a fatal error: Could not find adapter for file-like object" in stderr
assert b"Processed 0 records (matched=0, unmatched=0)" in stdout