7
7
import okhttp3 .OkHttpClient ;
8
8
import org .testng .Assert ;
9
9
import org .testng .annotations .BeforeTest ;
10
+ import org .testng .annotations .DataProvider ;
10
11
import org .testng .annotations .Test ;
11
12
12
13
import java .io .*;
16
17
import java .sql .ResultSet ;
17
18
import java .sql .SQLException ;
18
19
import java .sql .Statement ;
20
+ import java .util .UUID ;
19
21
import java .util .logging .Level ;
20
22
import java .util .logging .Logger ;
21
23
@@ -53,7 +55,7 @@ private String generateRandomCSV(int lines) throws IOException {
53
55
return "" ;
54
56
}
55
57
String tmpDir = System .getProperty ("java.io.tmpdir" );
56
- String csvPath = tmpDir + "/test.csv" ;
58
+ String csvPath = tmpDir + "/" + UUID . randomUUID () ;
57
59
try (FileWriter writer = new FileWriter (csvPath )) {
58
60
for (int i = 0 ; i < lines ; i ++) {
59
61
int num = (int ) (Math .random () * 1000 );
@@ -114,6 +116,7 @@ public void testFileTransfer()
114
116
databendConnection .uploadStream (stageName , "jdbc/test/" , fileInputStream , "test.csv" , f .length (), false );
115
117
downloaded = databendConnection .downloadStream (stageName , "jdbc/test/test.csv" , false );
116
118
byte [] arr = streamToByteArray (downloaded );
119
+ System .out .println ("download size = " + arr .length );
117
120
Assert .assertEquals (arr .length , f .length ());
118
121
} finally {
119
122
if (downloaded != null ) {
@@ -124,14 +127,14 @@ public void testFileTransfer()
124
127
125
128
@ Test (groups = {"IT" })
126
129
public void testFileTransferThroughAPI () throws SQLException , IOException {
127
- String filePath = generateRandomCSV (100000 );
130
+ String filePath = generateRandomCSV (10000 );
128
131
File f = new File (filePath );
129
132
try (InputStream fileInputStream = Files .newInputStream (f .toPath ());
130
133
Connection connection = Utils .createConnectionWithPresignedUrlDisable ()) {
131
134
Logger .getLogger (OkHttpClient .class .getName ()).setLevel (Level .ALL );
132
135
133
136
134
- String stageName = "test_stage " ;
137
+ String stageName = "test_stage_np " ;
135
138
DatabendConnection databendConnection = connection .unwrap (DatabendConnection .class );
136
139
PresignContext .createStageIfNotExists (databendConnection , stageName );
137
140
databendConnection .uploadStream (stageName , "jdbc/test/" , fileInputStream , "test.csv" , f .length (), false );
@@ -165,17 +168,16 @@ public void testCopyInto() throws IOException, SQLException {
165
168
}
166
169
}
167
170
168
- @ Test (groups = {"IT" })
169
- public void testLoadStreamToTableWithStage () throws SQLException , IOException {
170
- testLoadStreamToTableInner ("stage" );
171
- }
172
-
173
- @ Test (groups = {"IT" })
174
- public void testLoadStreamToTableWithStreaming () throws SQLException , IOException {
175
- testLoadStreamToTableInner ("streaming" );
171
+ @ DataProvider (name = "streamingLoad" )
172
+ private Object [][] provideTestData () {
173
+ return new Object [][] {
174
+ {"streaming" },
175
+ {"stage" }
176
+ };
176
177
}
177
178
178
- public void testLoadStreamToTableInner (String method ) throws IOException , SQLException {
179
+ @ Test (groups = "IT" , dataProvider = "streamingLoad" )
180
+ public void testLoadStreamToTable (String method ) throws IOException , SQLException {
179
181
if (!Compatibility .driverCapability .streamingLoad ) {
180
182
System .out .println ("Skip testLoadStreamToTableInner: driver version too low" );
181
183
return ;
@@ -190,8 +192,9 @@ public void testLoadStreamToTableInner(String method) throws IOException, SQLExc
190
192
try (FileInputStream fileInputStream = new FileInputStream (f );
191
193
Connection connection = Utils .createConnectionWithPresignedUrlDisable ();
192
194
Statement statement = connection .createStatement ()) {
193
- statement .execute ("create or replace database test_load" );
194
- statement .execute ("use test_load" );
195
+ String dbName = "test_load_stream_" + method ;
196
+ statement .execute (String .format ("create or replace database %s" , dbName ));
197
+ statement .execute (String .format ("use %s" , dbName ));
195
198
statement .execute ("create or replace table test_load(i int, a Variant, b string)" );
196
199
DatabendConnection databendConnection = connection .unwrap (DatabendConnection .class );
197
200
String sql = "insert into test_load from @_databend_load file_format=(type=csv)" ;
0 commit comments