67 lines
1.6 KiB
TypeScript
67 lines
1.6 KiB
TypeScript
import type { Readable } from "node:stream";
|
|
import { StringDecoder } from "node:string_decoder";
|
|
|
|
/**
|
|
* Serialize a single strict JSONL record.
|
|
*
|
|
* Framing is LF-only. Payload strings may contain other Unicode separators such as
|
|
* U+2028 and U+2029. Clients must split records on `\n` only.
|
|
*/
|
|
export function serializeJsonLine(value: unknown): string {
|
|
return `${JSON.stringify(value)}\n`;
|
|
}
|
|
|
|
/**
|
|
* Attach an LF-only JSONL reader to a stream.
|
|
*
|
|
* This intentionally does not use Node readline. Readline splits on additional
|
|
* Unicode separators that are valid inside JSON strings and therefore does not
|
|
* implement strict JSONL framing.
|
|
*/
|
|
export function attachJsonlLineReader(
|
|
stream: Readable,
|
|
onLine: (line: string) => void,
|
|
): () => void {
|
|
const decoder = new StringDecoder("utf8");
|
|
let buffer = "";
|
|
|
|
const emitLine = (line: string) => {
|
|
onLine(line.endsWith("\r") ? line.slice(0, -1) : line);
|
|
};
|
|
|
|
const onData = (chunk: string | Buffer) => {
|
|
buffer += typeof chunk === "string" ? chunk : decoder.write(chunk);
|
|
|
|
while (true) {
|
|
const newlineIndex = buffer.indexOf("\n");
|
|
if (newlineIndex === -1) {
|
|
return;
|
|
}
|
|
|
|
emitLine(buffer.slice(0, newlineIndex));
|
|
buffer = buffer.slice(newlineIndex + 1);
|
|
}
|
|
};
|
|
|
|
const onEnd = () => {
|
|
buffer += decoder.end();
|
|
if (buffer.length > 0) {
|
|
emitLine(buffer);
|
|
buffer = "";
|
|
}
|
|
};
|
|
|
|
const onError = (_err: Error) => {
|
|
// Stream errors are non-fatal for JSONL reading
|
|
};
|
|
|
|
stream.on("data", onData);
|
|
stream.on("end", onEnd);
|
|
stream.on("error", onError);
|
|
|
|
return () => {
|
|
stream.off("data", onData);
|
|
stream.off("end", onEnd);
|
|
stream.off("error", onError);
|
|
};
|
|
}
|