Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions lib/transaction.dart
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,20 @@ class Transaction<T> {
header: header, maxLen: maxLen));
}

/// Create a transaction that uses MagicHeaderFixedLengthByteTransformer
///
/// ```dart
/// Transaction.magicHeaderFixedLength(p.inputStream, Uint8List.fromList([65,65,65]), len: 10); // expects magic header AAA and fixed length 10.
/// ```
static Transaction<Uint8List> magicHeaderFixedLength(
Stream<Uint8List> stream, List<int> header, int len,
{int maxLen = 1024}) {
return Transaction<Uint8List>(
stream,
MagicHeaderFixedLengthByteTransformer.broadcast(
header: header, len: len, maxLen: maxLen));
}

/// Create a transaction that transforms the incoming stream into
/// events delimited by 'terminator', returning Strings.
///
Expand Down
63 changes: 63 additions & 0 deletions lib/transformers.dart
Original file line number Diff line number Diff line change
Expand Up @@ -385,3 +385,66 @@ class MagicHeaderAndLengthByteTransformer
_controller.close();
}
}

class MagicHeaderFixedLengthByteTransformer
extends MagicHeaderAndLengthByteTransformer {
final int len;

MagicHeaderFixedLengthByteTransformer(
{bool sync = false,
bool? cancelOnError,
List<int?>? header,
required this.len,
int maxLen = 1024,
Duration clearTimeout = const Duration(seconds: 1)})
: super(
sync: sync,
cancelOnError: cancelOnError,
header: header,
maxLen: maxLen,
clearTimeout: clearTimeout);

MagicHeaderFixedLengthByteTransformer.broadcast(
{bool sync = false,
bool? cancelOnError,
List<int?>? header,
required this.len,
int maxLen = 1024,
Duration clearTimeout = const Duration(seconds: 1)})
: super.broadcast(
sync: sync,
cancelOnError: cancelOnError,
header: header,
maxLen: maxLen,
clearTimeout: clearTimeout);

@override
void onData(Uint8List data) {
_dataSinceLastTick = true;
if (_partial.length > maxLen) {
_partial = _partial.sublist(_partial.length - maxLen);
}

_partial.addAll(data);

while (_partial.length > 0) {
int index = wildcardFind(header, _partial);
if (index < 0) {
return;
}

if (index > 0) {
_partial = _partial.sublist(index);
}

if (_partial.length < header!.length + len + 1) {
// not completely arrived yet.
return;
}

_controller
.add(Uint8List.fromList(_partial.sublist(0, len + header!.length + 1)));
_partial = _partial.sublist(len + header!.length + 1);
}
}
}