@@ -18,6 +18,7 @@ def initialize(task, schema, fields = nil)
1818 @schema = schema
1919 reset_fields ( fields ) if fields
2020 @project = @task [ 'project' ]
21+ @destination_project = @task [ 'destination_project' ]
2122 @dataset = @task [ 'dataset' ]
2223 @location = @task [ 'location' ]
2324 @location_for_log = @location . nil? ? 'us/eu' : @location
@@ -80,7 +81,7 @@ def load_from_gcs(object_uris, table)
8081 # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
8182 # we should generate job_id in client code, otherwise, retrying would cause duplication
8283 job_id = "embulk_load_job_#{ SecureRandom . uuid } "
83- Embulk . logger . info { "embulk-output-bigquery: Load job starting... job_id:[#{ job_id } ] #{ object_uris } => #{ @project } :#{ @dataset } .#{ table } in #{ @location_for_log } " }
84+ Embulk . logger . info { "embulk-output-bigquery: Load job starting... job_id:[#{ job_id } ] #{ object_uris } => #{ @destination_project } :#{ @dataset } .#{ table } in #{ @location_for_log } " }
8485
8586 body = {
8687 job_reference : {
@@ -90,7 +91,7 @@ def load_from_gcs(object_uris, table)
9091 configuration : {
9192 load : {
9293 destination_table : {
93- project_id : @project ,
94+ project_id : @destination_project ,
9495 dataset_id : @dataset ,
9596 table_id : table ,
9697 } ,
@@ -130,7 +131,7 @@ def load_from_gcs(object_uris, table)
130131 Embulk . logger . error {
131132 "embulk-output-bigquery: insert_job(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
132133 }
133- raise Error , "failed to load #{ object_uris } to #{ @project } :#{ @dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
134+ raise Error , "failed to load #{ object_uris } to #{ @destination_project } :#{ @dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
134135 end
135136 end
136137 end
@@ -171,7 +172,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
171172 # As https://cloud.google.com/bigquery/docs/managing_jobs_datasets_projects#managingjobs says,
172173 # we should generate job_id in client code, otherwise, retrying would cause duplication
173174 job_id = "embulk_load_job_#{ SecureRandom . uuid } "
174- Embulk . logger . info { "embulk-output-bigquery: Load job starting... job_id:[#{ job_id } ] #{ path } => #{ @project } :#{ @dataset } .#{ table } in #{ @location_for_log } " }
175+ Embulk . logger . info { "embulk-output-bigquery: Load job starting... job_id:[#{ job_id } ] #{ path } => #{ @destination_project } :#{ @dataset } .#{ table } in #{ @location_for_log } " }
175176 else
176177 Embulk . logger . info { "embulk-output-bigquery: Load job starting... #{ path } does not exist, skipped" }
177178 return
@@ -185,7 +186,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
185186 configuration : {
186187 load : {
187188 destination_table : {
188- project_id : @project ,
189+ project_id : @destination_project ,
189190 dataset_id : @dataset ,
190191 table_id : table ,
191192 } ,
@@ -232,7 +233,7 @@ def load(path, table, write_disposition: 'WRITE_APPEND')
232233 Embulk . logger . error {
233234 "embulk-output-bigquery: insert_job(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
234235 }
235- raise Error , "failed to load #{ path } to #{ @project } :#{ @dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
236+ raise Error , "failed to load #{ path } to #{ @destination_project } :#{ @dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
236237 end
237238 end
238239 end
@@ -245,7 +246,7 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
245246
246247 Embulk . logger . info {
247248 "embulk-output-bigquery: Copy job starting... job_id:[#{ job_id } ] " \
248- "#{ @project } :#{ @dataset } .#{ source_table } => #{ @project } :#{ destination_dataset } .#{ destination_table } "
249+ "#{ @destination_project } :#{ @dataset } .#{ source_table } => #{ @destination_project } :#{ destination_dataset } .#{ destination_table } "
249250 }
250251
251252 body = {
@@ -258,12 +259,12 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
258259 create_deposition : 'CREATE_IF_NEEDED' ,
259260 write_disposition : write_disposition ,
260261 source_table : {
261- project_id : @project ,
262+ project_id : @destination_project ,
262263 dataset_id : @dataset ,
263264 table_id : source_table ,
264265 } ,
265266 destination_table : {
266- project_id : @project ,
267+ project_id : @destination_project ,
267268 dataset_id : destination_dataset ,
268269 table_id : destination_table ,
269270 } ,
@@ -284,8 +285,8 @@ def copy(source_table, destination_table, destination_dataset = nil, write_dispo
284285 Embulk . logger . error {
285286 "embulk-output-bigquery: insert_job(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
286287 }
287- raise Error , "failed to copy #{ @project } :#{ @dataset } .#{ source_table } " \
288- "to #{ @project } :#{ destination_dataset } .#{ destination_table } , response:#{ response } "
288+ raise Error , "failed to copy #{ @destination_project } :#{ @dataset } .#{ source_table } " \
289+ "to #{ @destination_project } :#{ destination_dataset } .#{ destination_table } , response:#{ response } "
289290 end
290291 end
291292 end
@@ -354,7 +355,7 @@ def wait_load(kind, response)
354355 def create_dataset ( dataset = nil , reference : nil )
355356 dataset ||= @dataset
356357 begin
357- Embulk . logger . info { "embulk-output-bigquery: Create dataset... #{ @project } :#{ dataset } in #{ @location_for_log } " }
358+ Embulk . logger . info { "embulk-output-bigquery: Create dataset... #{ @destination_project } :#{ dataset } in #{ @location_for_log } " }
358359 hint = { }
359360 if reference
360361 response = get_dataset ( reference )
@@ -382,25 +383,25 @@ def create_dataset(dataset = nil, reference: nil)
382383 Embulk . logger . error {
383384 "embulk-output-bigquery: insert_dataset(#{ @project } , #{ body } , #{ opts } ), response:#{ response } "
384385 }
385- raise Error , "failed to create dataset #{ @project } :#{ dataset } in #{ @location_for_log } , response:#{ response } "
386+ raise Error , "failed to create dataset #{ @destination_project } :#{ dataset } in #{ @location_for_log } , response:#{ response } "
386387 end
387388 end
388389
389390 def get_dataset ( dataset = nil )
390391 dataset ||= @dataset
391392 begin
392- Embulk . logger . info { "embulk-output-bigquery: Get dataset... #{ @project } :#{ dataset } " }
393- with_network_retry { client . get_dataset ( @project , dataset ) }
393+ Embulk . logger . info { "embulk-output-bigquery: Get dataset... #{ @destination_project } :#{ dataset } " }
394+ with_network_retry { client . get_dataset ( @destination_project , dataset ) }
394395 rescue Google ::Apis ::ServerError , Google ::Apis ::ClientError , Google ::Apis ::AuthorizationError => e
395396 if e . status_code == 404
396- raise NotFoundError , "Dataset #{ @project } :#{ dataset } is not found"
397+ raise NotFoundError , "Dataset #{ @destination_project } :#{ dataset } is not found"
397398 end
398399
399400 response = { status_code : e . status_code , message : e . message , error_class : e . class }
400401 Embulk . logger . error {
401- "embulk-output-bigquery: get_dataset(#{ @project } , #{ dataset } ), response:#{ response } "
402+ "embulk-output-bigquery: get_dataset(#{ @destination_project } , #{ dataset } ), response:#{ response } "
402403 }
403- raise Error , "failed to get dataset #{ @project } :#{ dataset } , response:#{ response } "
404+ raise Error , "failed to get dataset #{ @destination_project } :#{ dataset } , response:#{ response } "
404405 end
405406 end
406407
@@ -414,7 +415,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
414415 table = Helper . chomp_partition_decorator ( table )
415416 end
416417
417- Embulk . logger . info { "embulk-output-bigquery: Create table... #{ @project } :#{ dataset } .#{ table } " }
418+ Embulk . logger . info { "embulk-output-bigquery: Create table... #{ @destination_project } :#{ dataset } .#{ table } " }
418419 body = {
419420 table_reference : {
420421 table_id : table ,
@@ -452,7 +453,7 @@ def create_table_if_not_exists(table, dataset: nil, options: nil)
452453 Embulk . logger . error {
453454 "embulk-output-bigquery: insert_table(#{ @project } , #{ dataset } , #{ @location_for_log } , #{ body } , #{ opts } ), response:#{ response } "
454455 }
455- raise Error , "failed to create table #{ @project } :#{ dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
456+ raise Error , "failed to create table #{ @destination_project } :#{ dataset } .#{ table } in #{ @location_for_log } , response:#{ response } "
456457 end
457458 end
458459
@@ -469,8 +470,8 @@ def delete_partition(table, dataset: nil)
469470 def delete_table_or_partition ( table , dataset : nil )
470471 begin
471472 dataset ||= @dataset
472- Embulk . logger . info { "embulk-output-bigquery: Delete table... #{ @project } :#{ dataset } .#{ table } " }
473- with_network_retry { client . delete_table ( @project , dataset , table ) }
473+ Embulk . logger . info { "embulk-output-bigquery: Delete table... #{ @destination_project } :#{ dataset } .#{ table } " }
474+ with_network_retry { client . delete_table ( @destination_project , dataset , table ) }
474475 rescue Google ::Apis ::ServerError , Google ::Apis ::ClientError , Google ::Apis ::AuthorizationError => e
475476 if e . status_code == 404 && /Not found:/ =~ e . message
476477 # ignore 'Not Found' error
@@ -479,9 +480,9 @@ def delete_table_or_partition(table, dataset: nil)
479480
480481 response = { status_code : e . status_code , message : e . message , error_class : e . class }
481482 Embulk . logger . error {
482- "embulk-output-bigquery: delete_table(#{ @project } , #{ dataset } , #{ table } ), response:#{ response } "
483+ "embulk-output-bigquery: delete_table(#{ @destination_project } , #{ dataset } , #{ table } ), response:#{ response } "
483484 }
484- raise Error , "failed to delete table #{ @project } :#{ dataset } .#{ table } , response:#{ response } "
485+ raise Error , "failed to delete table #{ @destination_project } :#{ dataset } .#{ table } , response:#{ response } "
485486 end
486487 end
487488
@@ -497,18 +498,18 @@ def get_partition(table, dataset: nil)
497498 def get_table_or_partition ( table , dataset : nil )
498499 begin
499500 dataset ||= @dataset
500- Embulk . logger . info { "embulk-output-bigquery: Get table... #{ @project } :#{ dataset } .#{ table } " }
501- with_network_retry { client . get_table ( @project , dataset , table ) }
501+ Embulk . logger . info { "embulk-output-bigquery: Get table... #{ @destination_project } :#{ dataset } .#{ table } " }
502+ with_network_retry { client . get_table ( @destination_project , dataset , table ) }
502503 rescue Google ::Apis ::ServerError , Google ::Apis ::ClientError , Google ::Apis ::AuthorizationError => e
503504 if e . status_code == 404
504- raise NotFoundError , "Table #{ @project } :#{ dataset } .#{ table } is not found"
505+ raise NotFoundError , "Table #{ @destination_project } :#{ dataset } .#{ table } is not found"
505506 end
506507
507508 response = { status_code : e . status_code , message : e . message , error_class : e . class }
508509 Embulk . logger . error {
509- "embulk-output-bigquery: get_table(#{ @project } , #{ dataset } , #{ table } ), response:#{ response } "
510+ "embulk-output-bigquery: get_table(#{ @destination_project } , #{ dataset } , #{ table } ), response:#{ response } "
510511 }
511- raise Error , "failed to get table #{ @project } :#{ dataset } .#{ table } , response:#{ response } "
512+ raise Error , "failed to get table #{ @destination_project } :#{ dataset } .#{ table } , response:#{ response } "
512513 end
513514 end
514515 end
0 commit comments