Skip to content

Commit 1839413

Browse files
committed
Avro writer
1 parent c9293ed commit 1839413

File tree

5 files changed

+319
-3
lines changed

5 files changed

+319
-3
lines changed

eslint.config.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export default [
2828
functions: 'never',
2929
}],
3030
'eol-last': 'error',
31-
eqeqeq: 'error',
31+
eqeqeq: ['error', 'smart'],
3232
'func-style': ['error', 'declaration'],
3333
indent: ['error', 2],
3434
'jsdoc/check-param-names': 'error',

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
},
4646
"dependencies": {
4747
"hyparquet": "1.13.0",
48-
"hyparquet-compressors": "1.1.1"
48+
"hyparquet-compressors": "1.1.1",
49+
"hyparquet-writer": "0.3.5"
4950
},
5051
"devDependencies": {
5152
"@types/node": "22.15.3",

src/avro.write.js

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import { ByteWriter } from 'hyparquet-writer'
2+
3+
/**
4+
* @param {Object} options
5+
* @param {Writer} options.writer
6+
* @param {AvroRecord} options.schema
7+
* @param {Record<string, any>[]} options.records
8+
* @param {number} [options.blockSize]
9+
*/
10+
export function avroWrite({ writer, schema, records, blockSize = 512 }) {
11+
writer.appendUint32(0x016a624f) // Obj\x01
12+
13+
const meta = {
14+
'avro.schema': typeof schema === 'string' ? schema : JSON.stringify(schema),
15+
'avro.codec': 'null',
16+
}
17+
appendZigZag(writer, Object.keys(meta).length)
18+
for (const [key, value] of Object.entries(meta)) {
19+
const kb = new TextEncoder().encode(key)
20+
appendZigZag(writer, kb.length)
21+
writer.appendBytes(kb)
22+
const vb = new TextEncoder().encode(value)
23+
appendZigZag(writer, vb.length)
24+
writer.appendBytes(vb)
25+
}
26+
writer.appendVarInt(0)
27+
28+
const sync = new Uint8Array(16)
29+
for (let i = 0; i < 16; i++) sync[i] = Math.random() * 256 | 0
30+
writer.appendBytes(sync)
31+
32+
for (let i = 0; i < records.length; i += blockSize) {
33+
const block = records.slice(i, i + blockSize)
34+
appendZigZag(writer, block.length) // record count
35+
const blockWriter = new ByteWriter()
36+
for (const record of block) {
37+
for (const { name, type } of schema.fields) {
38+
writeType(blockWriter, type, record[name])
39+
}
40+
}
41+
appendZigZag(writer, blockWriter.offset) // block size
42+
writer.appendBuffer(blockWriter.getBuffer())
43+
writer.appendBytes(sync)
44+
}
45+
46+
if (writer.finish) writer.finish()
47+
}
48+
49+
/**
50+
* @import {Writer} from 'hyparquet-writer/src/types.js'
51+
* @import {AvroRecord, AvroType} from '../src/types.js'
52+
* @param {Writer} writer
53+
* @param {AvroType} schema
54+
* @param {*} value
55+
*/
56+
function writeType(writer, schema, value) {
57+
if (Array.isArray(schema)) {
58+
// find matching union branch
59+
const unionIndex = schema.findIndex(s => {
60+
if (Array.isArray(s)) throw new Error('nested unions not supported')
61+
62+
// normalise branch to a tag string we can test against
63+
const tag = typeof s === 'string' ? s : 'logicalType' in s ? s.logicalType : s.type
64+
65+
if (value == null) return tag === 'null'
66+
if (tag === 'boolean') return typeof value === 'boolean'
67+
if (tag === 'int') return typeof value === 'number' && Number.isInteger(value)
68+
if (tag === 'long') return typeof value === 'bigint' || typeof value === 'number'
69+
if (tag === 'float' || tag === 'double') return typeof value === 'number'
70+
if (tag === 'string') return typeof value === 'string'
71+
if (tag === 'bytes') return value instanceof Uint8Array
72+
if (tag === 'record') return typeof value === 'object' && value !== null
73+
if (tag === 'array') return Array.isArray(value)
74+
return false
75+
})
76+
77+
if (unionIndex === -1) throw new Error('union branch not found')
78+
appendZigZag(writer, unionIndex)
79+
writeType(writer, schema[unionIndex], value)
80+
} else if (typeof schema === 'string') {
81+
// primitive type
82+
if (schema === 'null') {
83+
// no-op
84+
} else if (schema === 'boolean') {
85+
writer.appendUint8(value ? 1 : 0)
86+
} else if (schema === 'int') {
87+
if (typeof value !== 'number' || !Number.isInteger(value)) {
88+
throw new Error('expected integer value')
89+
}
90+
appendZigZag(writer, value)
91+
} else if (schema === 'long') {
92+
if (typeof value !== 'bigint') throw new Error('expected bigint value')
93+
appendZigZag64(writer, value)
94+
} else if (schema === 'float') {
95+
if (typeof value !== 'number') throw new Error('expected number value')
96+
writer.appendFloat32(value)
97+
} else if (schema === 'double') {
98+
if (typeof value !== 'number') throw new Error('expected number value')
99+
writer.appendFloat64(value)
100+
} else if (schema === 'bytes') {
101+
if (!(value instanceof Uint8Array)) throw new Error('expected Uint8Array value')
102+
appendZigZag(writer, value.length)
103+
writer.appendBytes(value)
104+
} else if (schema === 'string') {
105+
if (typeof value !== 'string') throw new Error('expected string value')
106+
const b = new TextEncoder().encode(value)
107+
appendZigZag(writer, b.length)
108+
writer.appendBytes(b)
109+
}
110+
} else if ('logicalType' in schema) {
111+
if (schema.logicalType === 'date') {
112+
appendZigZag(writer, value instanceof Date ? Math.floor(value.getTime() / 86400000) : value)
113+
} else if (schema.logicalType === 'timestamp-millis') {
114+
appendZigZag64(writer, value instanceof Date ? BigInt(value.getTime()) : BigInt(value))
115+
} else if (schema.logicalType === 'timestamp-micros') {
116+
appendZigZag64(
117+
writer,
118+
value instanceof Date ? BigInt(value.getTime()) * 1000n : BigInt(value)
119+
)
120+
} else if (schema.logicalType === 'decimal') {
121+
const scale = 'scale' in schema ? schema.scale ?? 0 : 0
122+
let u
123+
if (typeof value === 'bigint') {
124+
u = value
125+
} else if (typeof value === 'number') {
126+
u = BigInt(Math.round(value * 10 ** scale))
127+
} else {
128+
throw new Error('decimal value must be bigint or number')
129+
}
130+
const b = bigIntToBytes(u)
131+
writer.appendVarInt(b.length)
132+
writer.appendBytes(b)
133+
} else {
134+
throw new Error(`unknown logical type ${schema.logicalType}`)
135+
}
136+
} else if (schema.type === 'record') {
137+
for (const f of schema.fields) {
138+
writeType(writer, f.type, value[f.name])
139+
}
140+
} else if (schema.type === 'array') {
141+
if (value.length) {
142+
appendZigZag(writer, value.length)
143+
for (const it of value) {
144+
writeType(writer, schema.items, it)
145+
}
146+
}
147+
writer.appendVarInt(0)
148+
} else {
149+
throw new Error(`unknown schema type ${JSON.stringify(schema)}`)
150+
}
151+
}
152+
153+
/**
154+
* @param {Writer} writer
155+
* @param {number} v
156+
*/
157+
function appendZigZag(writer, v) {
158+
writer.appendVarInt(v << 1 ^ v >> 31)
159+
}
160+
161+
/**
162+
* @param {Writer} writer
163+
* @param {bigint} v
164+
*/
165+
function appendZigZag64(writer, v) {
166+
writer.appendVarBigInt(v << 1n ^ v >> 63n)
167+
}
168+
169+
/**
170+
* Convert a signed BigInt into two’s-complement big-endian bytes.
171+
* @param {bigint} value
172+
* @returns {Uint8Array}
173+
*/
174+
function bigIntToBytes(value) {
175+
const neg = value < 0n
176+
let abs = neg ? -value : value
177+
const out = []
178+
while (abs > 0n) { out.unshift(Number(abs & 0xffn)); abs >>= 8n }
179+
if (out.length === 0) out.push(0)
180+
181+
if (neg) {
182+
for (let i = 0; i < out.length; i++) out[i] ^= 0xff
183+
for (let i = out.length - 1; i >= 0; i--) {
184+
out[i] = out[i] + 1 & 0xff
185+
if (out[i]) break
186+
}
187+
if ((out[0] & 0x80) === 0) out.unshift(0xff)
188+
} else if ((out[0] & 0x80) !== 0) {
189+
out.unshift(0)
190+
}
191+
192+
return Uint8Array.from(out)
193+
}

test/avro.roundtrip.test.js

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
import { describe, expect, it } from 'vitest'
2+
import { ByteWriter } from 'hyparquet-writer'
3+
import { avroWrite } from '../src/avro.write.js'
4+
import { avroData, avroMetadata } from '../src/iceberg.js'
5+
6+
/**
7+
* @import {AvroType} from '../src/types.js'
8+
*/
9+
10+
describe('Avro round-trip', () => {
11+
it('primitive records', () => {
12+
/** @type {AvroType} */
13+
const schema = {
14+
type: 'record',
15+
name: 'User',
16+
fields: [
17+
{ name: 'id', type: 'long' },
18+
{ name: 'name', type: 'string' },
19+
{ name: 'age', type: 'int' },
20+
{ name: 'alive', type: 'boolean' },
21+
{ name: 'weight', type: 'float' },
22+
{ name: 'height', type: 'double' },
23+
],
24+
}
25+
26+
const records = [
27+
{ id: 1n, name: 'alice', age: 30, alive: true, weight: 65.5, height: 1.75 },
28+
{ id: 2n, name: 'bob', age: 25, alive: false, weight: 70.0, height: 1.80 },
29+
]
30+
31+
const writer = new ByteWriter()
32+
avroWrite({ writer, schema, records })
33+
34+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
35+
const { metadata, syncMarker } = avroMetadata(reader)
36+
const rows = avroData({ reader, metadata, syncMarker })
37+
38+
expect(rows).toEqual(records)
39+
})
40+
41+
it('nullable round-trip', () => {
42+
/** @type {AvroType} */
43+
const schema = {
44+
type: 'record',
45+
name: 'Found',
46+
fields: [
47+
{ name: 'nullable', type: ['null', 'string'] },
48+
],
49+
}
50+
51+
const recs = [
52+
{ nullable: 'meaning' },
53+
{ nullable: undefined },
54+
]
55+
56+
const writer = new ByteWriter()
57+
avroWrite({ writer, schema, records: recs })
58+
59+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
60+
const { metadata, syncMarker } = avroMetadata(reader)
61+
const got = avroData({ reader, metadata, syncMarker })
62+
63+
expect(got).toEqual(recs)
64+
})
65+
66+
it('logical timestamp-millis', () => {
67+
/** @type {AvroType} */
68+
const schema = {
69+
type: 'record',
70+
name: 'Event',
71+
fields: [
72+
{
73+
name: 'ts',
74+
type: { type: 'long', logicalType: 'timestamp-millis' },
75+
},
76+
],
77+
}
78+
79+
const now = Date.now()
80+
const original = [{ ts: new Date(now) }]
81+
82+
const writer = new ByteWriter()
83+
avroWrite({ writer, schema, records: original })
84+
85+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
86+
const { metadata, syncMarker } = avroMetadata(reader)
87+
const round = avroData({ reader, metadata, syncMarker })
88+
89+
expect(round[0].ts.getTime()).toBe(original[0].ts.getTime())
90+
})
91+
92+
it('array + map round-trip', () => {
93+
/** @type {AvroType} */
94+
const schema = {
95+
type: 'record',
96+
name: 'Complex',
97+
fields: [
98+
{ name: 'tags', type: { type: 'array', items: 'string' } },
99+
{ name: 'metrics', type: ['null', { type: 'record', name: 'Metrics', fields: [
100+
{ name: 'x', type: 'int' },
101+
{ name: 'y', type: ['null', 'int'] },
102+
] }] },
103+
],
104+
}
105+
106+
const recs = [
107+
{ tags: ['a', 'b'], metrics: { x: 1, y: 2 } },
108+
{ tags: [], metrics: { x: -1, y: undefined } },
109+
{ tags: [], metrics: undefined },
110+
]
111+
112+
const writer = new ByteWriter()
113+
avroWrite({ writer, schema, records: recs })
114+
115+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
116+
const { metadata, syncMarker } = avroMetadata(reader)
117+
const got = avroData({ reader, metadata, syncMarker })
118+
119+
expect(got).toEqual(recs)
120+
})
121+
})

test/package.test.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ describe('package.json', () => {
1414
expect(packageJson.license).toBe('MIT')
1515
})
1616

17-
it('should have at most 2 dependencies', () => {
17+
it('should have at most 3 dependencies', () => {
1818
const { dependencies } = packageJson
1919
expect(Object.keys(dependencies)).toEqual([
2020
'hyparquet',
2121
'hyparquet-compressors',
22+
'hyparquet-writer',
2223
])
2324
})
2425

0 commit comments

Comments
 (0)