Skip to content

Commit b137a15

Browse files
author
Nikolai Kolesnikov
committed
1/ Improved readability added a timeout parameter
2/ Added a comment for min pages per second (11 per second) 3/ processedPage++ moved to whenCompeted 4/ Added JSON ser for persisting a state of the exporting process
1 parent de002fb commit b137a15

File tree

4 files changed

+26
-15
lines changed

4 files changed

+26
-15
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
.idea/
33
*.iml
44
*.iws
5+
*.conf
56

67
# Maven
78
target/

src/main/java/com/amazon/aws/keyspaces/PersistCassandraRowToParquet.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,8 +173,12 @@ void processRowsAsync(AsyncResultSet rs, Throwable error) {
173173

174174
if (rs.hasMorePages()) {
175175
pageLimiter.acquire();
176-
rs.fetchNextPage().whenComplete(this::processRowsAsync);
177-
processedPage++;
176+
//rs.fetchNextPage().whenComplete(this::processRowsAsync);
177+
//processedPage++;
178+
rs.fetchNextPage().whenComplete((thisRs, thisError) ->{
179+
processRowsAsync(thisRs, thisError);
180+
processedPage++;
181+
});
178182
} else {
179183
LOG.info("Completed to read pages");
180184
LOG.info("Starting page was " + startingPage);

src/main/java/com/amazon/aws/keyspaces/Runner.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,22 @@
2121
public class Runner implements Callable<Integer> {
2222

2323
private static final Logger LOG = Logger.getLogger(Runner.class.getName());
24+
// Default timeout is 86,400 seconds
25+
private static final long ONE_DAY = 1*24*60*60;
26+
27+
2428
@CommandLine.Option(names = "--recover", description = "Recovering mode")
2529
boolean recoveringMode = false;
2630
@CommandLine.Parameters(index = "0", description = "Destination of parquet files")
27-
private final String dataLocation = System.getProperty("user.dir") + "/" + "output";
31+
private String dataLocation = System.getProperty("user.dir") + "/" + "output";
2832
@CommandLine.Parameters(index = "1", description = "Query")
2933
private String query;
3034
@CommandLine.Option(names = {"-p", "--page"}, description = "Starting page")
31-
private final int startingPage = 0;
35+
private int startingPage = 0;
3236
@CommandLine.Option(names = {"-r", "--rateLimiter"}, description = "A rate limiter")
33-
private final int rateLimiter = 3000;
37+
private int rateLimiter = 3000;
38+
@CommandLine.Option(names = {"-t", "--timeout"}, description = "Override default timeout 86400 seconds")
39+
private long timeout = ONE_DAY;
3440
private final File configFile = new File(System.getProperty("user.dir") + "/application.conf");
3541

3642
public static void main(String[] args) throws IOException, NoSuchAlgorithmException, InterruptedException {
@@ -53,7 +59,7 @@ public Integer call() throws Exception {
5359
LOG.info("Export in progress...");
5460
LOG.info("Dest:" + dataLocation);
5561
persistCassandraRowToParquet.start();
56-
countDownLatch.await(604800, TimeUnit.SECONDS);
62+
countDownLatch.await(timeout, TimeUnit.SECONDS);
5763
persistCassandraRowToParquet.close();
5864
LOG.info("Amazon Keyspaces Session is closed");
5965
long elapsedTime = System.nanoTime() - startTime;
@@ -76,13 +82,14 @@ public Integer call() throws Exception {
7682
countDownLatch,
7783
configFile,
7884
state.getProcessedPages(),
85+
// 11 pages * 3500 rows per page * 100 bytes per row = 3,759 kilobytes per partition
7986
Math.min(11, (int) Math.round(state.getPageRate()))
8087
);
8188

8289
LOG.info("Export in progress...");
8390
LOG.info("Dest:" + dataLocation);
8491
persistCassandraRowToParquet.start();
85-
countDownLatch.await(604800, TimeUnit.SECONDS);
92+
countDownLatch.await(timeout, TimeUnit.SECONDS);
8693
persistCassandraRowToParquet.close();
8794
LOG.info("Amazon Keyspaces Session is closed");
8895
long elapsedTime = System.nanoTime() - startTime;

src/main/java/com/amazon/aws/keyspaces/Utils.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,20 @@
33

44
package com.amazon.aws.keyspaces;
55

6+
import com.fasterxml.jackson.databind.ObjectMapper;
7+
68
import java.io.FileInputStream;
79
import java.io.FileOutputStream;
8-
import java.io.ObjectInputStream;
9-
import java.io.ObjectOutputStream;
1010
import java.util.logging.Logger;
1111

1212
public class Utils {
1313
private static final Logger LOG = Logger.getLogger(Utils.class.getName());
1414

1515
public static void writeState(State state) {
1616
try {
17+
ObjectMapper mapper = new ObjectMapper();
1718
FileOutputStream myFileOutputStream = new FileOutputStream(System.getProperty("user.dir") + "/state.ser");
18-
ObjectOutputStream myObjectOutputStream = new ObjectOutputStream(myFileOutputStream);
19-
myObjectOutputStream.writeObject(state);
20-
myObjectOutputStream.close();
19+
mapper.writeValue(myFileOutputStream, state);
2120
} catch (Exception e) {
2221
LOG.severe("Error when saving to file." + e.getMessage());
2322
}
@@ -26,13 +25,13 @@ public static void writeState(State state) {
2625
public static State readState() {
2726
State state = new State();
2827
try {
28+
ObjectMapper mapper = new ObjectMapper();
2929
FileInputStream myFileInputStream = new FileInputStream(System.getProperty("user.dir") + "/state.ser");
30-
ObjectInputStream myObjectInputStream = new ObjectInputStream(myFileInputStream);
31-
state = (State) myObjectInputStream.readObject();
32-
myObjectInputStream.close();
30+
state = mapper.readValue(myFileInputStream, State.class);
3331
} catch (Exception e) {
3432
LOG.severe("Error when loading from file." + e.getMessage());
3533
}
3634
return state;
3735
}
36+
3837
}

0 commit comments

Comments
 (0)