Skip to content

Commit fa7caac

Browse files
committed
Avro writer
1 parent 8df02f0 commit fa7caac

7 files changed

+302
-10
lines changed

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@
4545
},
4646
"dependencies": {
4747
"hyparquet": "1.12.1",
48-
"hyparquet-compressors": "1.1.1"
48+
"hyparquet-compressors": "1.1.1",
49+
"hyparquet-writer": "0.3.5"
4950
},
5051
"devDependencies": {
5152
"@types/node": "22.14.1",

src/avro.data.js

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import { parseDecimal } from 'hyparquet/src/convert.js'
66
* Read avro data blocks.
77
* Should be called after avroMetadata.
88
*
9-
* @import {DataReader} from 'hyparquet/src/types.js'
109
* @param {Object} options
1110
* @param {DataReader} options.reader
1211
* @param {Record<string, any>} options.metadata
@@ -24,7 +23,8 @@ export function avroData({ reader, metadata, syncMarker }) {
2423
recordCount = -recordCount
2524
}
2625
const blockSize = readZigZag(reader)
27-
let data = new Uint8Array(reader.view.buffer, reader.offset, blockSize)
26+
let data = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, blockSize)
27+
console.log('read block', data)
2828
reader.offset += blockSize
2929

3030
// Read and verify sync marker for the block
@@ -47,12 +47,15 @@ export function avroData({ reader, metadata, syncMarker }) {
4747
// Decode according to binary or json encoding
4848
// Loop through metadata['avro.schema'] to parse the block
4949
const { fields } = metadata['avro.schema']
50-
const dataReader = { view: new DataView(data.buffer), offset: 0 }
50+
const view = new DataView(data.buffer, data.byteOffset, data.byteLength)
51+
const dataReader = { view, offset: 0 }
5152
for (let i = 0; i < recordCount; i++) {
5253
/** @type {Record<string, any>} */
5354
const obj = {}
5455
for (const field of fields) {
56+
console.log('read field', field.name, field.type, dataReader.offset)
5557
const value = readType(dataReader, field.type)
58+
console.log('read field value', value)
5659
obj[field.name] = value
5760
}
5861
blocks.push(obj)
@@ -62,12 +65,14 @@ export function avroData({ reader, metadata, syncMarker }) {
6265
}
6366

6467
/**
68+
* @import {DataReader} from 'hyparquet/src/types.js'
6569
* @import {AvroType} from '../src/types.js'
6670
* @param {DataReader} reader
6771
* @param {AvroType} type
6872
* @returns {any}
6973
*/
7074
function readType(reader, type) {
75+
console.log('readType', type)
7176
if (type === 'null') {
7277
return undefined
7378
} else if (Array.isArray(type)) {
@@ -77,7 +82,6 @@ function readType(reader, type) {
7782
// Read recursively
7883
/** @type {Record<string, any>} */
7984
const obj = {}
80-
// assert(Array.isArray(type.fields))
8185
for (const subField of type.fields) {
8286
obj[subField.name] = readType(reader, subField.type)
8387
}
@@ -133,12 +137,13 @@ function readType(reader, type) {
133137
return value
134138
} else if (type === 'bytes') {
135139
const length = readZigZag(reader)
136-
const bytes = new Uint8Array(reader.view.buffer, reader.offset, length)
140+
const bytes = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, length)
137141
reader.offset += length
138142
return bytes
139143
} else if (type === 'string') {
140144
const length = readZigZag(reader)
141-
const text = new TextDecoder().decode(new Uint8Array(reader.view.buffer, reader.offset, length))
145+
const bytes = new Uint8Array(reader.view.buffer, reader.view.byteOffset + reader.offset, length)
146+
const text = new TextDecoder().decode(bytes)
142147
reader.offset += length
143148
return text
144149
} else {

src/avro.metadata.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ function readAvroString(reader) {
4545
const len = readZigZag(reader)
4646
const bytes = new Uint8Array(reader.view.buffer, reader.offset, len)
4747
reader.offset += len
48-
return new TextDecoder('utf-8').decode(bytes)
48+
return new TextDecoder().decode(bytes)
4949
}
5050

5151
/**
@@ -57,7 +57,7 @@ function readAvroString(reader) {
5757
export function avroMetadata(reader) {
5858
// Check avro magic bytes "Obj\x01"
5959
if (reader.view.getUint32(reader.offset) !== 0x4f626a01) {
60-
throw new Error('avro file invalid magic bytes')
60+
throw new Error('avro invalid magic bytes')
6161
}
6262
reader.offset += 4
6363

src/avro.write.js

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* @import {Writer} from 'hyparquet-writer/src/types.js'
3+
* @import {AvroRecord, AvroType} from '../src/types.js'
4+
*/
5+
6+
import { ByteWriter } from 'hyparquet-writer'
7+
8+
/**
9+
* @param {Object} options
10+
* @param {Writer} options.writer
11+
* @param {AvroRecord} options.schema
12+
* @param {Record<string, any>[]} options.records
13+
* @param {number} [options.blockSize]
14+
*/
15+
export function avroWrite({ writer, schema, records, blockSize = 512 }) {
16+
writer.appendUint32(0x016a624f) // Obj\x01
17+
18+
const meta = {
19+
'avro.schema': typeof schema === 'string' ? schema : JSON.stringify(schema),
20+
'avro.codec': 'null',
21+
}
22+
appendZigZag(writer, Object.keys(meta).length)
23+
for (const [key, value] of Object.entries(meta)) {
24+
const kb = new TextEncoder().encode(key)
25+
appendZigZag(writer, kb.length)
26+
writer.appendBytes(kb)
27+
const vb = new TextEncoder().encode(value)
28+
appendZigZag(writer, vb.length)
29+
writer.appendBytes(vb)
30+
}
31+
writer.appendVarInt(0)
32+
33+
const sync = new Uint8Array(16)
34+
for (let i = 0; i < 16; i++) sync[i] = Math.random() * 256 | 0
35+
writer.appendBytes(sync)
36+
37+
for (let i = 0; i < records.length; i += blockSize) {
38+
const block = records.slice(i, i + blockSize)
39+
appendZigZag(writer, block.length) // record count
40+
const blockWriter = new ByteWriter()
41+
for (const record of block) {
42+
for (const { name, type } of schema.fields) {
43+
writeType(blockWriter, type, record[name])
44+
}
45+
}
46+
appendZigZag(writer, blockWriter.offset) // block size
47+
writer.appendBuffer(blockWriter.getBuffer())
48+
writer.appendBytes(sync)
49+
}
50+
51+
if (writer.finish) writer.finish()
52+
}
53+
54+
/**
55+
* @param {Writer} writer
56+
* @param {AvroType} schema
57+
* @param {*} value
58+
*/
59+
function writeType(writer, schema, value) {
60+
console.log('writeType', schema, value)
61+
if (Array.isArray(schema)) {
62+
// find matching union branch
63+
const idx = schema.findIndex(s => {
64+
if (Array.isArray(s)) throw new Error('nested unions not supported')
65+
66+
// normalise branch to a tag string we can test against
67+
const tag = typeof s === 'string' ? s : 'logicalType' in s ? s.logicalType : s.type
68+
69+
if (value === null) return tag === 'null'
70+
if (tag === 'boolean') return typeof value === 'boolean'
71+
if (tag === 'int') return typeof value === 'number' && Number.isInteger(value)
72+
if (tag === 'long') return typeof value === 'bigint' || typeof value === 'number'
73+
if (tag === 'float' || tag === 'double') return typeof value === 'number'
74+
if (tag === 'string') return typeof value === 'string'
75+
if (tag === 'bytes') return value instanceof Uint8Array
76+
if (tag === 'record') return typeof value === 'object' && value !== null
77+
if (tag === 'array') return Array.isArray(value)
78+
return false
79+
})
80+
81+
if (idx === -1) throw new Error('union branch not found')
82+
writer.appendVarInt(idx)
83+
writeType(writer, schema[idx], value)
84+
} else if (typeof schema === 'string') {
85+
// primitive type
86+
if (schema === 'null') {
87+
// no-op
88+
} else if (schema === 'boolean') {
89+
writer.appendUint8(value ? 1 : 0)
90+
} else if (schema === 'int') {
91+
appendZigZag(writer, value)
92+
} else if (schema === 'long') {
93+
if (typeof value === 'bigint') {
94+
appendZigZag64(writer, value)
95+
} else {
96+
appendZigZag(writer, value)
97+
}
98+
} else if (schema === 'float') {
99+
writer.appendFloat32(value)
100+
} else if (schema === 'double') {
101+
writer.appendFloat64(value)
102+
} else if (schema === 'bytes') {
103+
const b = value instanceof Uint8Array ? value : new TextEncoder().encode(value)
104+
appendZigZag(writer, b.length)
105+
writer.appendBytes(b)
106+
107+
} else if (schema === 'string') {
108+
const b = new TextEncoder().encode(String(value))
109+
appendZigZag(writer, b.length)
110+
writer.appendBytes(b)
111+
}
112+
} else if ('logicalType' in schema) {
113+
if (schema.logicalType === 'date') {
114+
appendZigZag(writer, value instanceof Date ? Math.floor(value.getTime() / 86400000) : value)
115+
} else if (schema.logicalType === 'timestamp-millis') {
116+
appendZigZag64(writer, value instanceof Date ? BigInt(value.getTime()) : BigInt(value))
117+
} else if (schema.logicalType === 'timestamp-micros') {
118+
appendZigZag64(
119+
writer,
120+
value instanceof Date ? BigInt(value.getTime()) * 1000n : BigInt(value)
121+
)
122+
} else if (schema.logicalType === 'decimal') {
123+
const scale = 'scale' in schema ? schema.scale ?? 0 : 0
124+
let u
125+
if (typeof value === 'bigint') {
126+
u = value
127+
} else if (typeof value === 'number') {
128+
u = BigInt(Math.round(value * 10 ** scale))
129+
} else {
130+
throw new Error('decimal value must be bigint or number')
131+
}
132+
const b = bigIntToBytes(u)
133+
writer.appendVarInt(b.length)
134+
writer.appendBytes(b)
135+
} else {
136+
throw new Error(`unknown logical type ${schema.logicalType}`)
137+
}
138+
} else if (schema.type === 'record') {
139+
for (const f of schema.fields) {
140+
writeType(writer, f.type, value[f.name])
141+
}
142+
} else if (schema.type === 'array') {
143+
if (value.length) {
144+
writer.appendVarInt(value.length)
145+
for (const it of value) writeType(writer, schema.items, it)
146+
}
147+
writer.appendVarInt(0)
148+
} else {
149+
throw new Error(`unknown schema type ${JSON.stringify(schema)}`)
150+
}
151+
}
152+
153+
/**
154+
* @param {Writer} writer
155+
* @param {number} v
156+
*/
157+
function appendZigZag(writer, v) {
158+
writer.appendVarInt(v << 1 ^ v >> 31)
159+
}
160+
161+
/**
162+
* @param {Writer} writer
163+
* @param {bigint} v
164+
*/
165+
function appendZigZag64(writer, v) {
166+
writer.appendVarBigInt(v << 1n ^ v >> 63n)
167+
}
168+
169+
/**
170+
* Convert a signed BigInt into two’s-complement big-endian bytes.
171+
* @param {bigint} value
172+
* @returns {Uint8Array}
173+
*/
174+
function bigIntToBytes(value) {
175+
const neg = value < 0n
176+
let abs = neg ? -value : value
177+
const out = []
178+
while (abs > 0n) { out.unshift(Number(abs & 0xffn)); abs >>= 8n }
179+
if (out.length === 0) out.push(0)
180+
181+
if (neg) {
182+
for (let i = 0; i < out.length; i++) out[i] ^= 0xff
183+
for (let i = out.length - 1; i >= 0; i--) {
184+
out[i] = out[i] + 1 & 0xff
185+
if (out[i]) break
186+
}
187+
if ((out[0] & 0x80) === 0) out.unshift(0xff)
188+
} else if ((out[0] & 0x80) !== 0) {
189+
out.unshift(0)
190+
}
191+
192+
return Uint8Array.from(out)
193+
}

src/types.d.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,5 @@ type AvroGenericLogicalType = {
263263

264264
type AvroLogicalType = AvroDate | AvroDecimal | AvroTimestampMillis | AvroTimestampMicros | AvroGenericLogicalType
265265

266+
// Avro complex types: records, enums, arrays, maps, unions, fixed
266267
type AvroComplexType = AvroRecord | AvroArray | AvroUnion

test/avro.roundtrip.test.js

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
import { describe, expect, it } from 'vitest'
2+
import { ByteWriter } from 'hyparquet-writer'
3+
import { avroWrite } from '../src/avro.write.js'
4+
import { avroData, avroMetadata } from '../src/iceberg.js'
5+
6+
/**
7+
* @import {AvroType} from '../src/types.js'
8+
*/
9+
10+
describe('Avro round-trip', () => {
11+
it('primitive record (int, string)', () => {
12+
/** @type {AvroType} */
13+
const schema = {
14+
type: 'record',
15+
name: 'User',
16+
fields: [
17+
{ name: 'id', type: 'int' },
18+
{ name: 'name', type: 'string' },
19+
],
20+
}
21+
22+
const records = [
23+
{ id: 1, name: 'alice' },
24+
{ id: 2, name: 'bob' },
25+
]
26+
27+
const writer = new ByteWriter()
28+
avroWrite({ writer, schema, records })
29+
30+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
31+
const { metadata, syncMarker } = avroMetadata(reader)
32+
const rows = avroData({ reader, metadata, syncMarker })
33+
34+
expect(rows).toEqual(records)
35+
})
36+
37+
it('logical timestamp-millis', () => {
38+
/** @type {AvroType} */
39+
const schema = {
40+
type: 'record',
41+
name: 'Event',
42+
fields: [
43+
{
44+
name: 'ts',
45+
type: { type: 'long', logicalType: 'timestamp-millis' },
46+
},
47+
],
48+
}
49+
50+
const now = Date.now()
51+
const original = [{ ts: new Date(now) }]
52+
53+
const writer = new ByteWriter()
54+
avroWrite({ writer, schema, records: original })
55+
56+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
57+
const { metadata, syncMarker } = avroMetadata(reader)
58+
const round = avroData({ reader, metadata, syncMarker })
59+
60+
expect(round[0].ts.getTime()).toBe(original[0].ts.getTime())
61+
})
62+
63+
it('array + map round-trip', () => {
64+
/** @type {AvroType} */
65+
const schema = {
66+
type: 'record',
67+
name: 'Complex',
68+
fields: [
69+
{ name: 'tags', type: { type: 'array', items: 'string' } },
70+
{ name: 'metrics', type: ['null', { type: 'record', name: 'Metrics', fields: [
71+
{ name: 'x', type: 'int' },
72+
{ name: 'y', type: 'int' },
73+
] }] },
74+
],
75+
}
76+
77+
const recs = [
78+
{ tags: ['a', 'b'], metrics: { x: 1, y: 2 } },
79+
{ tags: [], metrics: {} },
80+
]
81+
82+
const writer = new ByteWriter()
83+
avroWrite({ writer, schema, records: recs })
84+
85+
const reader = { view: new DataView(writer.getBuffer()), offset: 0 }
86+
const { metadata, syncMarker } = avroMetadata(reader)
87+
const got = avroData({ reader, metadata, syncMarker })
88+
89+
expect(got).toEqual(recs)
90+
})
91+
})

0 commit comments

Comments
 (0)