diff --git a/singer/messages.py b/singer/messages.py index 3848801..e53c48d 100644 --- a/singer/messages.py +++ b/singer/messages.py @@ -222,22 +222,22 @@ def format_message(message): return json.dumps(message.asdict(), use_decimal=True) -def write_message(message): - sys.stdout.write(format_message(message) + '\n') - sys.stdout.flush() +def write_message(message, out=sys.stdout): + out.write(format_message(message) + '\n') + out.flush() -def write_record(stream_name, record, stream_alias=None, time_extracted=None): +def write_record(stream_name, record, stream_alias=None, time_extracted=None, out=sys.stdout): """Write a single record for the given stream. write_record("users", {"id": 2, "email": "mike@stitchdata.com"}) """ write_message(RecordMessage(stream=(stream_alias or stream_name), record=record, - time_extracted=time_extracted)) + time_extracted=time_extracted), out=out) -def write_records(stream_name, records): +def write_records(stream_name, records, out=sys.stdout): """Write a list of records for the given stream. chris = {"id": 1, "email": "chris@stitchdata.com"} @@ -245,10 +245,11 @@ def write_records(stream_name, records): write_records("users", [chris, mike]) """ for record in records: - write_record(stream_name, record) + write_record(stream_name, record, out=out) -def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None): +def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream_alias=None, + out=sys.stdout): """Write a schema message. stream = 'test' @@ -266,22 +267,22 @@ def write_schema(stream_name, schema, key_properties, bookmark_properties=None, stream=(stream_alias or stream_name), schema=schema, key_properties=key_properties, - bookmark_properties=bookmark_properties)) + bookmark_properties=bookmark_properties), out=out) -def write_state(value): +def write_state(value, out=sys.stdout): """Write a state message. write_state({'last_updated_at': '2017-02-14T09:21:00'}) """ - write_message(StateMessage(value=value)) + write_message(StateMessage(value=value), out=out) -def write_version(stream_name, version): +def write_version(stream_name, version, out=sys.stdout): """Write an activate version message. stream = 'test' version = int(time.time()) write_version(stream, version) """ - write_message(ActivateVersionMessage(stream_name, version)) + write_message(ActivateVersionMessage(stream_name, version), out=out)