From beedcee68741a107c764cfe85c69da8da6dbbcc0 Mon Sep 17 00:00:00 2001 From: Dede Indrapurna Date: Mon, 7 Sep 2015 16:50:17 +0700 Subject: [PATCH] - Added support for RECORD type column, nested RECORD, and RECORD inside array. - Added "underscore-deep-extend" dependency. - Updated tests for the new functions. - Added .idea & *.iml in gitignore. --- .gitignore | 4 + lib/gcp.js | 129 +++++++++++++++++++++++++------- package.json | 3 +- test/gcp_test.js | 189 ++++++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 287 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index f59d45b..68dd8c2 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,7 @@ node_modules # Working directory temp + +# IDE files +.idea +*.iml \ No newline at end of file diff --git a/lib/gcp.js b/lib/gcp.js index b51388b..f11ae5d 100644 --- a/lib/gcp.js +++ b/lib/gcp.js @@ -3,11 +3,14 @@ var util = require('util'); var path = require('path'); var _ = require('underscore'); +var underscoreDeepExtend = require('underscore-deep-extend'); var gcloud = require('gcloud'); var moment = require('moment'); var ObjectID = require('mongodb').ObjectID; var through2 = require('through2'); +_.mixin({deepExtend: underscoreDeepExtend(_)}); + function GCP(opts) { this.projectSettings = { projectId: opts.project @@ -154,6 +157,8 @@ GCP.BigQueryTable = { return item !== null; }); } + } else if(v === Object(v)) { + return v; } else { return JSON.stringify(v); } @@ -166,8 +171,12 @@ GCP.BigQueryTable = { return 'FLOAT'; } else if (typeof v === 'boolean') { return 'BOOLEAN'; + } else if(v instanceof ObjectID) { + return 'STRING'; } else if (util.isArray(v) && v[0]) { - return this.type(v[0]); + return GCP.BigQueryTable.type(v[0]); + } else if (v === Object(v)) { + return 'RECORD'; } return 'STRING'; }, @@ -181,44 +190,101 @@ GCP.BigQueryTable = { }, column: function (k, v) { - return { - name: GCP.safeName(k), - type: GCP.BigQueryTable.type(v), - mode: GCP.BigQueryTable.mode(v) - }; + var colType = GCP.BigQueryTable.type(v); + + if(colType === 'RECORD') + return { + name: GCP.safeName(k), + type: colType, + mode: GCP.BigQueryTable.mode(v), + fields: {} + }; + else + return { + name: GCP.safeName(k), + type: colType, + mode: GCP.BigQueryTable.mode(v) + }; + }, createJSONStream: function (fields, autoSchemaDetection) { var schema = {}; return through2.obj(function (data, encoding, next) { - data.id = data._id; - delete data._id; - - var out = {}; - var keys = Object.keys(data); - var len = keys.length; - var i, k, v, col; - - for (i = 0; i < len; ++i) { - k = keys[i]; - v = data[k]; - - if (v !== null) { - col = GCP.safeName(k); - out[col] = GCP.BigQueryTable.value(v); - if (autoSchemaDetection && !schema[col]) { - schema[col] = GCP.BigQueryTable.column(k, v); - } - } - } - this.push(JSON.stringify(out) + '\n'); + var out = GCP.BigQueryTable.parseJSONLine(Object.keys(data), data, autoSchemaDetection); + schema = _.deepExtend(schema, out.schema); + this.push(JSON.stringify(out.record) + '\n'); next(); }).on('end', function () { GCP.BigQueryTable.convertSchemaFields(schema, fields); }); }, + parseJSONLine: function(keys, data, autoSchemaDetection){ + var record = {}; + var schema = {}; + var k, v, col; + + for(var i=0; i