Skip to content

Commit e8ce81b

Browse files
authored
fix issue with msgpack str (#38)
Signed-off-by: James Elias Sigurdarson <jamiees2@gmail.com>
1 parent 488f367 commit e8ce81b

File tree

2 files changed

+71
-1
lines changed

2 files changed

+71
-1
lines changed

src/protocol.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -716,7 +716,36 @@ export const encodeMessage = (
716716
export const decodeClientStream = (
717717
dataStream: Readable
718718
): AsyncIterable<ClientMessage> => {
719-
return decoder().decodeStream(dataStream) as AsyncIterable<ClientMessage>;
719+
// This is a hack to avoid messagepack utf8-ization of the msgpack str format, since it mangles data.
720+
// Fluentd, when using the out_forward plugin, will pass the data in PackedForward mode using a str to represent the packed Forward messages.
721+
// This would normally end up getting decoded as utf8 by @msgpack/msgpack, and turned into complete garbage. This short circuits that function in the parser,
722+
const streamDecoder = decoder() as any;
723+
streamDecoder._decodeUtf8String = streamDecoder.decodeUtf8String;
724+
streamDecoder.decodeUtf8String = function (
725+
this: typeof streamDecoder,
726+
byteLength: number,
727+
headerOffset: number
728+
) {
729+
if (this.bytes.byteLength < this.pos + headerOffset + byteLength) {
730+
// Defer to the error handling inside the normal function, if we don't have enough data to parse
731+
return this._decodeUtf8String(byteLength, headerOffset);
732+
}
733+
const offset = this.pos + headerOffset;
734+
// If the first byte is 0x92 (fixarr of size 2), this represents a msgpack str encoded entry
735+
// Also catch 0xdc and 0xdd, which represents arrays. This should never be passed, fixarr is more efficient, but just to cover all bases.
736+
// If the first byte is 0x1f, then assume it is compressed
737+
if (
738+
this.bytes[offset] === 0x92 ||
739+
this.bytes[offset] === 0x1f ||
740+
this.bytes[offset] === 0xdc ||
741+
this.bytes[offset] === 0xdd
742+
) {
743+
return this.decodeBinary(byteLength, headerOffset);
744+
} else {
745+
return this._decodeUtf8String(byteLength, headerOffset);
746+
}
747+
}.bind(streamDecoder);
748+
return streamDecoder.decodeStream(dataStream) as AsyncIterable<ClientMessage>;
720749
};
721750

722751
/**

test/test.protocol.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
DecodeError,
88
SharedKeyMismatchError,
99
} from "../src/error";
10+
import {Readable} from "stream";
1011

1112
describe("Protocol", () => {
1213
describe("isHelo", () => {
@@ -647,4 +648,44 @@ describe("Protocol", () => {
647648
);
648649
});
649650
});
651+
describe("decodeClientStream", () => {
652+
it("should parse msgpack str encoded records", async () => {
653+
const entries = [
654+
protocol.generateEntry(0, {abc: "def"}),
655+
protocol.generateEntry(1, {ghi: "jkl"}),
656+
];
657+
const packedEntries = entries.map(protocol.packEntry);
658+
const packedEntryLength = packedEntries.reduce((r, v) => r + v.length, 0);
659+
const message = protocol.generatePackedForwardMode(
660+
"test",
661+
packedEntries,
662+
packedEntryLength
663+
);
664+
const msg = protocol.encodeMessage(message);
665+
// 1 byte for the array byte (0x93, 1 byte for the fixstr containing "test"), then one byte for each of the str
666+
// Change the 0xc4 (bin8) representing the message to a 0xd9 (str8), this is the same format as FluentD sends data as
667+
msg[1 + 1 + message[0].length] = 0xd9;
668+
669+
const s = new Readable();
670+
s.push(msg);
671+
s.push(null);
672+
673+
const iterable = protocol.decodeClientStream(s);
674+
let parsedMessage: protocol.ClientMessage | undefined;
675+
for await (const msg of iterable) {
676+
parsedMessage = msg;
677+
}
678+
679+
expect(parsedMessage).to.not.be.undefined;
680+
expect(protocol.isClientTransportMessage(parsedMessage)).to.be.true;
681+
if (!parsedMessage || !protocol.isClientTransportMessage(parsedMessage)) {
682+
return;
683+
}
684+
685+
const originalMessage = protocol.parseTransport(parsedMessage);
686+
console.log(originalMessage);
687+
expect(originalMessage.entries[0][0]).to.equal(entries[0][0]);
688+
expect(originalMessage.entries[1][0]).to.equal(entries[1][0]);
689+
});
690+
});
650691
});

0 commit comments

Comments
 (0)