Skip to content

Commit 43fa221

Browse files
committed
Replace iceberg table data with new parquet file
1 parent 334ebc6 commit 43fa221

File tree

2 files changed

+242
-0
lines changed

2 files changed

+242
-0
lines changed

src/schemas.js

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* @import {AvroRecord} from '../src/types.js'
3+
*/
4+
5+
/** @type {AvroRecord} */
6+
export const manifestSchema = {
7+
type: 'record',
8+
name: 'ManifestEntry',
9+
fields: [
10+
{ 'field-id': 500, name: 'manifest_path', type: 'string' },
11+
{ 'field-id': 501, name: 'manifest_length', type: 'long' },
12+
{ 'field-id': 502, name: 'partition_spec_id', type: 'int' },
13+
{ 'field-id': 517, name: 'content', type: 'int' }, // 0=data, 1=deletes
14+
{ 'field-id': 515, name: 'sequence_number', type: ['null', 'long'], default: null },
15+
{ 'field-id': 516, name: 'min_sequence_number', type: ['null', 'long'], default: null },
16+
{ 'field-id': 503, name: 'added_snapshot_id', type: 'long' },
17+
{ 'field-id': 504, name: 'added_data_files_count', type: 'int' },
18+
{ 'field-id': 505, name: 'existing_data_files_count', type: 'int' },
19+
{ 'field-id': 506, name: 'deleted_data_files_count', type: 'int' },
20+
{ 'field-id': 512, name: 'added_rows_count', type: 'long' },
21+
{ 'field-id': 513, name: 'existing_rows_count', type: 'long' },
22+
{ 'field-id': 514, name: 'deleted_rows_count', type: 'long' },
23+
{
24+
'field-id': 507,
25+
name: 'partitions',
26+
type: [
27+
'null',
28+
{
29+
type: 'array',
30+
items: {
31+
type: 'record',
32+
name: 'FieldSummary',
33+
fields: [
34+
{ 'field-id': 509, name: 'contains-null', type: 'boolean' },
35+
{ 'field-id': 518, name: 'contains-nan', type: ['null', 'boolean'], default: null },
36+
{ 'field-id': 510, name: 'lower-bound', type: ['null', 'string'], default: null },
37+
{ 'field-id': 511, name: 'upper-bound', type: ['null', 'string'], default: null },
38+
],
39+
},
40+
},
41+
],
42+
default: null,
43+
},
44+
{ 'field-id': 519, name: 'key_metadata', type: ['null', 'bytes'], default: null },
45+
{ 'field-id': 520, name: 'first_row_id', type: ['null', 'long'], default: null },
46+
],
47+
}
48+
49+
/** @type {AvroRecord} */
50+
export const dataFileSchema = {
51+
type: 'record',
52+
name: 'data_file',
53+
fields: [
54+
{ 'field-id': 134, name: 'content', type: 'int' },
55+
{ 'field-id': 100, name: 'file_path', type: 'string' },
56+
{ 'field-id': 101, name: 'file_format', type: 'string' },
57+
{ 'field-id': 102, name: 'partition', type: { type: 'record', name: 'void', fields: [] } },
58+
{ 'field-id': 103, name: 'record_count', type: 'long' },
59+
{ 'field-id': 104, name: 'file_size_in_bytes', type: 'long' },
60+
{ 'field-id': 132, name: 'split_offsets', type: ['null', { type: 'array', items: 'long' }], default: null },
61+
],
62+
}
63+
64+
/** @type {AvroRecord} */
65+
export const manifestEntrySchema = {
66+
type: 'record',
67+
name: 'manifest_entry',
68+
fields: [
69+
{ 'field-id': 0, name: 'status', type: 'int' },
70+
{ 'field-id': 1, name: 'snapshot_id', type: ['null', 'long'], default: null },
71+
{ 'field-id': 3, name: 'sequence_number', type: ['null', 'long'], default: null },
72+
{ 'field-id': 4, name: 'file_sequence_number', type: ['null', 'long'], default: null },
73+
{ 'field-id': 2, name: 'data_file', type: dataFileSchema },
74+
],
75+
}

src/write.js

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
2+
/**
3+
* @import {Writer} from 'hyparquet-writer/src/types.js'
4+
* @import {Manifest, ManifestEntry, TableMetadata} from '../src/types.js'
5+
*/
6+
7+
import { parquetMetadata } from 'hyparquet'
8+
import { avroWrite } from './avro/avro.write.js'
9+
import { icebergLatestVersion, icebergMetadata } from './metadata.js'
10+
import { manifestEntrySchema, manifestSchema } from './schemas.js'
11+
import { uuid4 } from './utils.js'
12+
13+
/**
14+
* Replace all data in an iceberg table with new parquet file.
15+
*
16+
* @param {object} options
17+
* @param {string} options.tableUrl - Base S3 URL of the table.
18+
* @param {(file: string) => Writer} options.writerFactory - Function to create writers for files in storage.
19+
* @param {ArrayBuffer} options.parquetBuffer - Buffer containing the new parquet data to write.
20+
* @returns {Promise<void>} Resolves when the replacement is complete.
21+
*/
22+
export async function icebergReplace({ tableUrl, writerFactory, parquetBuffer }) {
23+
if (!tableUrl) throw new Error('tableUrl is required')
24+
25+
// get latest metadata version (TODO: allow it to be passed in)
26+
const latestVersion = await icebergLatestVersion({ tableUrl })
27+
const latestVersionNumber = Number(latestVersion.replace('v', ''))
28+
if (isNaN(latestVersionNumber)) {
29+
throw new Error(`expected version to be a number, got ${latestVersion}`)
30+
}
31+
const metadataFileName = `v${latestVersionNumber + 1}.metadata.json`
32+
const previousMetadata = await icebergMetadata({ tableUrl, metadataFileName })
33+
const nextSequenceNumber = previousMetadata['last-sequence-number'] + 1
34+
const uuid = uuid4()
35+
const snapshotId = newId()
36+
37+
const deleted_files_count = Number(previousMetadata.snapshots?.[0]?.summary['total-data-files'] ?? 0)
38+
const deleted_rows_count = BigInt(previousMetadata.snapshots?.[0]?.summary['total-records'] ?? 0)
39+
40+
// TODO: check schema compatibility
41+
const sourceMetadata = parquetMetadata(parquetBuffer)
42+
43+
// write data file to 0000-<uuid>.parquet
44+
const file_path = `${tableUrl}/data/0000-${uuid}.parquet`
45+
const dataWriter = writerFactory(file_path)
46+
dataWriter.appendBuffer(parquetBuffer)
47+
dataWriter.finish()
48+
49+
// write manifest file to <uuid>.avro
50+
/** @type {ManifestEntry[]} */
51+
const manifestEntries = [{
52+
status: 1, // 1=added
53+
snapshot_id: undefined, // inherit
54+
sequence_number: undefined, // inherit
55+
file_sequence_number: undefined, // inherit
56+
data_file: {
57+
content: 0, // 0=data
58+
file_path,
59+
file_format: 'parquet',
60+
partition: {}, // unpartitioned
61+
record_count: BigInt(sourceMetadata.num_rows),
62+
file_size_in_bytes: BigInt(parquetBuffer.byteLength),
63+
split_offsets: undefined,
64+
},
65+
}]
66+
const manifest_path = `metadata/${snapshotId}.avro`
67+
const manifestWriter = writerFactory(`${tableUrl}/${manifest_path}`)
68+
avroWrite({ writer: manifestWriter, schema: manifestEntrySchema, records: manifestEntries })
69+
manifestWriter.finish()
70+
const manifest_length = BigInt(manifestWriter.offset)
71+
72+
// write manifest list to snap-<snapshotId>.avro
73+
const manifestListPath = `metadata/snap-${snapshotId}.avro`
74+
const manifestListUrl = `${tableUrl}/${manifestListPath}`
75+
const manifestListWriter = writerFactory(manifestListUrl)
76+
/** @type {Manifest[]} */
77+
const records = [{
78+
manifest_path,
79+
manifest_length,
80+
partition_spec_id: previousMetadata['default-spec-id'],
81+
content: 0, // 0=data
82+
sequence_number: BigInt(nextSequenceNumber),
83+
min_sequence_number: BigInt(nextSequenceNumber),
84+
added_snapshot_id: snapshotId,
85+
added_files_count: 1,
86+
existing_files_count: 0,
87+
deleted_files_count,
88+
added_rows_count: sourceMetadata.num_rows,
89+
existing_rows_count: 0n,
90+
deleted_rows_count,
91+
partitions: undefined, // unpartitioned
92+
first_row_id: undefined,
93+
}]
94+
avroWrite({
95+
writer: manifestListWriter,
96+
schema: manifestSchema,
97+
records,
98+
})
99+
100+
// write metadata file to vN.metadata.json
101+
const metadataUrl = `${tableUrl}/metadata/${metadataFileName}`
102+
const metadataWriter = writerFactory(metadataUrl)
103+
/** @type {TableMetadata} */
104+
const metadata = {
105+
'format-version': 2,
106+
'table-uuid': previousMetadata['table-uuid'],
107+
location: tableUrl,
108+
'last-sequence-number': nextSequenceNumber,
109+
'last-updated-ms': Date.now(),
110+
'last-column-id': previousMetadata['last-column-id'],
111+
'current-schema-id': previousMetadata['current-schema-id'], // TODO: update schema if needed
112+
schemas: previousMetadata.schemas, // TODO: update schemas if needed
113+
'default-spec-id': previousMetadata['default-spec-id'],
114+
'partition-specs': previousMetadata['partition-specs'],
115+
'last-partition-id': previousMetadata['last-partition-id'],
116+
// properties: { 'write.parquet.compression-codec': 'snappy' }, // TODO: add properties
117+
'current-snapshot-id': Number(snapshotId),
118+
snapshots: [
119+
...previousMetadata.snapshots ?? [],
120+
{
121+
'snapshot-id': Number(snapshotId), // can't write bigint to json (TODO: use string)
122+
'timestamp-ms': Date.now(),
123+
'sequence-number': nextSequenceNumber,
124+
'manifest-list': manifestListPath,
125+
summary: {
126+
operation: 'overwrite',
127+
'added-data-files': '1',
128+
'added-records': sourceMetadata.num_rows.toString(),
129+
'added-files-size': parquetBuffer.byteLength.toString(),
130+
'changed-partition-count': '0',
131+
'total-records': sourceMetadata.num_rows.toString(),
132+
'total-files-size': parquetBuffer.byteLength.toString(),
133+
'total-data-files': '1',
134+
'total-delete-files': '0',
135+
'total-position-deletes': '0',
136+
'total-equality-deletes': '0',
137+
},
138+
},
139+
],
140+
'sort-orders': previousMetadata['sort-orders'],
141+
'default-sort-order-id': previousMetadata['default-sort-order-id'],
142+
statistics: [],
143+
}
144+
// write metadata as json
145+
const metadataBytes = new TextEncoder().encode(JSON.stringify(metadata, null, 2))
146+
metadataWriter.appendBytes(metadataBytes)
147+
metadataWriter.finish()
148+
149+
// update version-hint.text
150+
const versionHintUrl = `${tableUrl}/version-hint.text`
151+
const versionHintWriter = writerFactory(versionHintUrl)
152+
const versionHintBytes = new TextEncoder().encode(String(latestVersionNumber + 1))
153+
versionHintWriter.appendBytes(versionHintBytes)
154+
versionHintWriter.finish()
155+
156+
// TODO: commit to catalog (update metadata_location)
157+
}
158+
159+
/**
160+
* new snapshot-id: microsecond wall-clock, then 12 random bits
161+
* @returns {bigint} new snapshot id
162+
*/
163+
export function newId() {
164+
const micros = BigInt(Date.now()) * 1000n
165+
const rand = BigInt(Math.floor(Math.random() * 4096)) // 12 bits
166+
return micros << 12n | rand
167+
}

0 commit comments

Comments
 (0)