|
13 | 13 | import java.sql.SQLException; |
14 | 14 | import java.sql.Statement; |
15 | 15 | import java.util.ArrayList; |
| 16 | +import java.util.Arrays; |
16 | 17 | import java.util.List; |
17 | 18 | import java.util.Optional; |
18 | 19 | import java.util.Properties; |
@@ -519,6 +520,171 @@ public void testRuntimeInsertStringTable() throws IOException { |
519 | 520 | })); |
520 | 521 | } |
521 | 522 |
|
| 523 | + @Test |
| 524 | + public void testRuntimeMergeTable() throws IOException { |
| 525 | + File in = testFolder.newFile(SnowflakeUtils.randomString(8) + ".csv"); |
| 526 | + List<String> lines = |
| 527 | + Stream.of("c0:double,c1:double,c2:string", "1,1,aaa", "1,2,bbb", "1,3,ccc") |
| 528 | + .collect(Collectors.toList()); |
| 529 | + Files.write(in.toPath(), lines); |
| 530 | + |
| 531 | + String targetTableName = generateTemporaryTableName(); |
| 532 | + String targetTableFullName = |
| 533 | + String.format( |
| 534 | + "\"%s\".\"%s\".\"%s\"", TEST_SNOWFLAKE_DB, TEST_SNOWFLAKE_SCHEMA, targetTableName); |
| 535 | + runQuery( |
| 536 | + String.format( |
| 537 | + "create table %s (\"c0\" NUMBER, \"c1\" NUMBER, \"c2\" STRING)", targetTableFullName), |
| 538 | + foreachResult(rs_ -> {})); |
| 539 | + runQuery( |
| 540 | + String.format("insert into %s values (1, 1, 'ddd'), (1, 4, 'eee');", targetTableFullName), |
| 541 | + foreachResult(rs_ -> {})); |
| 542 | + |
| 543 | + final ConfigSource config = |
| 544 | + CONFIG_MAPPER_FACTORY |
| 545 | + .newConfigSource() |
| 546 | + .set("type", "snowflake") |
| 547 | + .set("user", TEST_SNOWFLAKE_USER) |
| 548 | + .set("password", TEST_SNOWFLAKE_PASSWORD) |
| 549 | + .set("host", TEST_SNOWFLAKE_HOST) |
| 550 | + .set("database", TEST_SNOWFLAKE_DB) |
| 551 | + .set("warehouse", TEST_SNOWFLAKE_WAREHOUSE) |
| 552 | + .set("schema", TEST_SNOWFLAKE_SCHEMA) |
| 553 | + .set("mode", "merge") |
| 554 | + .set("merge_keys", new ArrayList<String>(Arrays.asList("c0", "c1"))) |
| 555 | + .set("table", targetTableName); |
| 556 | + embulk.runOutput(config, in.toPath()); |
| 557 | + |
| 558 | + runQuery( |
| 559 | + String.format("select count(*) from %s;", targetTableFullName), |
| 560 | + foreachResult( |
| 561 | + rs -> { |
| 562 | + assertEquals(4, rs.getInt(1)); |
| 563 | + })); |
| 564 | + List<String> results = new ArrayList(); |
| 565 | + runQuery( |
| 566 | + "select \"c0\",\"c1\",\"c2\" from " + targetTableFullName + " order by 1, 2", |
| 567 | + foreachResult( |
| 568 | + rs -> { |
| 569 | + results.add(rs.getString(1) + "," + rs.getString(2) + "," + rs.getString(3)); |
| 570 | + })); |
| 571 | + List<String> expected = |
| 572 | + Stream.of("1,1,aaa", "1,2,bbb", "1,3,ccc", "1,4,eee").collect(Collectors.toList()); |
| 573 | + for (int i = 0; i < results.size(); i++) { |
| 574 | + assertEquals(expected.get(i), results.get(i)); |
| 575 | + } |
| 576 | + } |
| 577 | + |
| 578 | + @Test |
| 579 | + public void testRuntimeMergeTableWithMergeRule() throws IOException { |
| 580 | + File in = testFolder.newFile(SnowflakeUtils.randomString(8) + ".csv"); |
| 581 | + List<String> lines = |
| 582 | + Stream.of("c0:double,c1:string", "0.0,aaa", "0.1,bbb", "1.2,ccc") |
| 583 | + .collect(Collectors.toList()); |
| 584 | + Files.write(in.toPath(), lines); |
| 585 | + |
| 586 | + String targetTableName = generateTemporaryTableName(); |
| 587 | + String targetTableFullName = |
| 588 | + String.format( |
| 589 | + "\"%s\".\"%s\".\"%s\"", TEST_SNOWFLAKE_DB, TEST_SNOWFLAKE_SCHEMA, targetTableName); |
| 590 | + runQuery( |
| 591 | + String.format("create table %s (\"c0\" FLOAT, \"c1\" STRING)", targetTableFullName), |
| 592 | + foreachResult(rs_ -> {})); |
| 593 | + runQuery( |
| 594 | + String.format("insert into %s values (0.0, 'ddd'), (1.3, 'eee');", targetTableFullName), |
| 595 | + foreachResult(rs_ -> {})); |
| 596 | + |
| 597 | + final ConfigSource config = |
| 598 | + CONFIG_MAPPER_FACTORY |
| 599 | + .newConfigSource() |
| 600 | + .set("type", "snowflake") |
| 601 | + .set("user", TEST_SNOWFLAKE_USER) |
| 602 | + .set("password", TEST_SNOWFLAKE_PASSWORD) |
| 603 | + .set("host", TEST_SNOWFLAKE_HOST) |
| 604 | + .set("database", TEST_SNOWFLAKE_DB) |
| 605 | + .set("warehouse", TEST_SNOWFLAKE_WAREHOUSE) |
| 606 | + .set("schema", TEST_SNOWFLAKE_SCHEMA) |
| 607 | + .set("mode", "merge") |
| 608 | + .set("merge_keys", new ArrayList<String>(Arrays.asList("c0"))) |
| 609 | + .set( |
| 610 | + "merge_rule", new ArrayList<String>(Arrays.asList("\"c1\" = T.\"c1\" || S.\"c1\""))) |
| 611 | + .set("table", targetTableName); |
| 612 | + embulk.runOutput(config, in.toPath()); |
| 613 | + |
| 614 | + runQuery( |
| 615 | + String.format("select count(*) from %s;", targetTableFullName), |
| 616 | + foreachResult( |
| 617 | + rs -> { |
| 618 | + assertEquals(4, rs.getInt(1)); |
| 619 | + })); |
| 620 | + List<String> results = new ArrayList(); |
| 621 | + runQuery( |
| 622 | + "select \"c0\",\"c1\" from " + targetTableFullName + " order by 1", |
| 623 | + foreachResult( |
| 624 | + rs -> { |
| 625 | + results.add(rs.getString(1) + "," + rs.getString(2)); |
| 626 | + })); |
| 627 | + List<String> expected = |
| 628 | + Stream.of("0.0,dddaaa", "0.1,bbb", "1.2,ccc", "1.3,eee").collect(Collectors.toList()); |
| 629 | + for (int i = 0; i < results.size(); i++) { |
| 630 | + assertEquals(expected.get(i), results.get(i)); |
| 631 | + } |
| 632 | + } |
| 633 | + |
| 634 | + @Test |
| 635 | + public void testRuntimeMergeTableWithoutMergeKey() throws IOException { |
| 636 | + File in = testFolder.newFile(SnowflakeUtils.randomString(8) + ".csv"); |
| 637 | + List<String> lines = |
| 638 | + Stream.of("c0:double,c1:string", "0.0,aaa", "0.1,bbb", "1.2,ccc") |
| 639 | + .collect(Collectors.toList()); |
| 640 | + Files.write(in.toPath(), lines); |
| 641 | + |
| 642 | + String targetTableName = generateTemporaryTableName(); |
| 643 | + String targetTableFullName = |
| 644 | + String.format( |
| 645 | + "\"%s\".\"%s\".\"%s\"", TEST_SNOWFLAKE_DB, TEST_SNOWFLAKE_SCHEMA, targetTableName); |
| 646 | + runQuery( |
| 647 | + String.format( |
| 648 | + "create table %s (\"c0\" FLOAT PRIMARY KEY, \"c1\" STRING)", targetTableFullName), |
| 649 | + foreachResult(rs_ -> {})); |
| 650 | + runQuery( |
| 651 | + String.format("insert into %s values (0.0, 'ddd'), (1.3, 'eee');", targetTableFullName), |
| 652 | + foreachResult(rs_ -> {})); |
| 653 | + |
| 654 | + final ConfigSource config = |
| 655 | + CONFIG_MAPPER_FACTORY |
| 656 | + .newConfigSource() |
| 657 | + .set("type", "snowflake") |
| 658 | + .set("user", TEST_SNOWFLAKE_USER) |
| 659 | + .set("password", TEST_SNOWFLAKE_PASSWORD) |
| 660 | + .set("host", TEST_SNOWFLAKE_HOST) |
| 661 | + .set("database", TEST_SNOWFLAKE_DB) |
| 662 | + .set("warehouse", TEST_SNOWFLAKE_WAREHOUSE) |
| 663 | + .set("schema", TEST_SNOWFLAKE_SCHEMA) |
| 664 | + .set("mode", "merge") |
| 665 | + .set("table", targetTableName); |
| 666 | + embulk.runOutput(config, in.toPath()); |
| 667 | + |
| 668 | + runQuery( |
| 669 | + String.format("select count(*) from %s;", targetTableFullName), |
| 670 | + foreachResult( |
| 671 | + rs -> { |
| 672 | + assertEquals(4, rs.getInt(1)); |
| 673 | + })); |
| 674 | + List<String> results = new ArrayList(); |
| 675 | + runQuery( |
| 676 | + "select \"c0\",\"c1\" from " + targetTableFullName + " order by 1", |
| 677 | + foreachResult( |
| 678 | + rs -> { |
| 679 | + results.add(rs.getString(1) + "," + rs.getString(2)); |
| 680 | + })); |
| 681 | + List<String> expected = |
| 682 | + Stream.of("0.0,aaa", "0.1,bbb", "1.2,ccc", "1.3,eee").collect(Collectors.toList()); |
| 683 | + for (int i = 0; i < results.size(); i++) { |
| 684 | + assertEquals(expected.get(i), results.get(i)); |
| 685 | + } |
| 686 | + } |
| 687 | + |
522 | 688 | @Ignore( |
523 | 689 | "This test takes so long time because it needs to create more than 1000 tables, so ignored...") |
524 | 690 | @Test(expected = Test.None.class /* no exception expected */) |
|
0 commit comments