diff --git a/pom.xml b/pom.xml index ec96b27..61d250e 100644 --- a/pom.xml +++ b/pom.xml @@ -12,7 +12,7 @@ 11 11 org.gridgain - 9.1.8 + 9.1.16 UTF-8 false diff --git a/src/main/java/training/ComputeApp.java b/src/main/java/training/ComputeApp.java index 0446c67..efb894b 100644 --- a/src/main/java/training/ComputeApp.java +++ b/src/main/java/training/ComputeApp.java @@ -90,14 +90,13 @@ public Marshaller reduceJobResultMarshaller() { @Override public CompletableFuture>> splitAsync(TaskExecutionContext taskExecutionContext, Integer customersCount) { this.customerCount = customersCount; - return taskExecutionContext.ignite().tables().table("InvoiceLine").partitionManager().primaryReplicasAsync() + return taskExecutionContext.ignite().tables().table("InvoiceLine").partitionDistribution().primaryReplicasAsync() .thenApply(x -> x.entrySet().stream() .map(jobParameter -> MapReduceJob.builder() .nodes(List.of(jobParameter.getValue())) .args(Tuple.create() - // FIXME: don't use hashCode once API is finalised - .set("partition",jobParameter.getKey().hashCode()) + .set("partition", jobParameter.getKey().id()) .set("count", customersCount)) .jobDescriptor( JobDescriptor.builder(TopPayingCustomersJob.class) @@ -112,7 +111,7 @@ public CompletableFuture>> splitAsync( private static class TopPayingCustomersJob implements ComputeJob { // Verify results with: select customerid, sum(quantity * unitprice) as price from invoiceline group by customerid order by price desc limit 5 - private static String sql = "select customerid, quantity * unitprice as price from invoiceline where \"__part\" = ?"; + private static final String sql = "select customerid, quantity * unitprice as price from invoiceline where \"__partition_id\" = ?"; private int customerCount = 5; @@ -127,7 +126,7 @@ public CompletableFuture executeAsync(JobExecutionContext jobEx customerCount = parameters.intValue("count"); - try (var results = jobExecutionContext.ignite().sql().execute((Transaction) null, sql, parameters.intValue("partition"))) { + try (var results = jobExecutionContext.ignite().sql().execute((Transaction) null, sql, parameters.longValue("partition"))) { while (results.hasNext()) { var row = results.next(); customerPurchases.merge(row.intValue("customerId"), row.value("price"), BigDecimal::add);