diff --git a/README.md b/README.md index 287c397..a2206c2 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ Check [the complete schedule](https://www.gridgain.com/products/services/trainin ``` 2. (optionally), open the project in your favourite IDE such as IntelliJ or Eclipse, or just use a simple text editor -and command-line instructions prepared for all the samples. +and command-line instructions prepared for all the samples. ## Sign up for GridGain's Nebula service @@ -51,28 +51,28 @@ Start a two-node Ignite cluster: 8. Click `Continue` 9. Click `Attach` 10. Initialise the cluster by clicking the `Initialise` button at the top-right of the screen - + ## Creating Media Store Schema and Loading Data Now you need to create a Media Store schema and load the cluster with sample data. Use SQLLine tool to achieve that: 1. Open a terminal window and navigate to the root directory of this project. 2. Load the media store database: - + a. Start the Command Line Interface (CLI) - + ```bash - docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./config/media_store.sql:/opt/ignite/downloads/media_store.sql --rm --network ignite3_default -it apacheignite/ignite:3.0.0 cli + docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./config/media_store.sql:/opt/ignite/downloads/media_store.sql --rm --network ignite3_default -it apacheignite/ignite:3.1.0 cli ``` - + b. Connect to the cluster. ```bash connect http://node1:10300 ``` - + c. Execute SQL command to load the sample data. - + ```bash sql --file=/opt/ignite/downloads/media_store.sql ``` @@ -88,7 +88,7 @@ With the Media Store database loaded, you can check how Ignite distributed the r ## Affinity Co-location - Optimizing Complex SQL Queries With JOINs -Ignite supports SQL for data processing including distributed joins, grouping and sorting. In this section, you're +Ignite supports SQL for data processing including distributed joins, grouping and sorting. In this section, you're going to run basic SQL operations as well as more advanced ones. ### Querying Single Table @@ -121,36 +121,77 @@ JOIN with the `Artist` table: 3. Examine the output. Your instructor will give hints for what to look for. It will look something like this: ```bash - Limit(fetch=[20]): rowcount = 20.0, cumulative cost = IgniteCost [rowCount=15318.06, cpu=77499.96615043783, memory=33461.76, io=178134.0, network=101068.0], id = 35293 + Sort + collation: [DURATION DESC] + fetch: 20 + est: (rows=1) + ColocatedHashAggregate + fieldNames: [TRACKID, TRACK_NAME, GENRE, ARTIST, DURATION] + group: [TRACKID, TRACK_NAME, GENRE, ARTIST] + aggregation: [MAX($f4)] + est: (rows=1) + Project + fieldNames: [TRACKID, TRACK_NAME, GENRE, ARTIST, $f4] + projection: [TRACKID, NAME, NAME$1, NAME$0, /(MILLISECONDS, *(1000, 60))] + est: (rows=1) + HashJoin + predicate: =(GENREID, GENREID$0) + fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS, ARTISTID$0, NAME$0, GENREID$0, NAME$1] + type: inner + est: (rows=1) + Exchange + distribution: single + est: (rows=1) + HashJoin + predicate: =(ARTISTID, ARTISTID$0) + fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS, ARTISTID$0, NAME$0] + type: left + est: (rows=1) + TableScan + table: PUBLIC.TRACK + predicate: <(GENREID, 17) + fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS] + est: (rows=1) + TableScan + table: PUBLIC.ARTIST + fieldNames: [ARTISTID, NAME] + est: (rows=1) + Exchange + distribution: single + est: (rows=1) + TableScan + table: PUBLIC.GENRE + fieldNames: [GENREID, NAME] + est: (rows=1) ``` ## Running Co-located Compute Tasks Run `training.ComputeApp` that uses Apache Ignite compute capabilities for a calculation of top-5 paying customers. -The compute task executes on every cluster node, iterates through local records and responds to the application that +The compute task executes on every cluster node, iterates through local records and responds to the application that merges partial results. 1. Build an executable JAR with the applications' classes (or just start the app with IntelliJ IDEA or Eclipse): ```bash - mvn clean package + mvn clean package ``` 2. Load the code into your cluster: a. Start the CLI. ```bash - docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./target/ignite-essentials-developer-training-1.0-SNAPSHOT.jar:/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar --rm --network ignite3_default -it apacheignite/ignite:3.0.0 cli + docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./target/ignite-essentials-developer-training-1.0-SNAPSHOT.jar:/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar --rm --network ignite3_default -it apacheignite/ignite:3.1.0 cli ``` b. Connect to the cluster. - + ```bash connect http://node1:10300 ``` - + c. Deploy the code to the cluster. - + ```bash cluster unit deploy --version 1.0.0 --path=/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar essentialsCompute ``` diff --git a/docker-compose.yml b/docker-compose.yml index 4d55a94..21c470d 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -17,7 +17,7 @@ name: ignite3 x-ignite-def: &ignite-def - image: apacheignite/ignite:3.0.0 + image: apacheignite/ignite:3.1.0 environment: - JVM_MAX_MEM=3g - JVM_MIN_MEM=3g diff --git a/pom.xml b/pom.xml index 83b60fd..0c20fe5 100644 --- a/pom.xml +++ b/pom.xml @@ -12,8 +12,9 @@ 11 11 org.apache.ignite - 3.0.0 + 3.1.0 UTF-8 + false diff --git a/src/main/java/training/ByteArrayMarshaller.java b/src/main/java/training/ByteArrayMarshaller.java new file mode 100644 index 0000000..a366f3f --- /dev/null +++ b/src/main/java/training/ByteArrayMarshaller.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package training; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import org.apache.ignite.marshalling.Marshaller; +import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException; + +public class ByteArrayMarshaller implements Marshaller { + + static ByteArrayMarshaller create() { + return new ByteArrayMarshaller<>(); + } + + @Override + public byte[] marshal(T object) { + if (object == null) { + return null; + } + + if (object instanceof Serializable) { + try (var baos = new ByteArrayOutputStream(); var out = new ObjectOutputStream(baos)) { + out.writeObject(object); + out.flush(); + + return baos.toByteArray(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + throw new UnsupportedObjectTypeMarshallingException(object.getClass()); + } + + @Override + public T unmarshal(byte[] raw) { + if (raw == null) { + return null; + } + + try (var bais = new ByteArrayInputStream(raw); var ois = new ObjectInputStream(bais)) { + return (T) ois.readObject(); + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } +} diff --git a/src/main/java/training/ComputeApp.java b/src/main/java/training/ComputeApp.java index e2ef5a5..63be13a 100644 --- a/src/main/java/training/ComputeApp.java +++ b/src/main/java/training/ComputeApp.java @@ -30,10 +30,10 @@ import org.apache.ignite.compute.task.MapReduceTask; import org.apache.ignite.compute.task.TaskExecutionContext; import org.apache.ignite.deployment.DeploymentUnit; -import org.apache.ignite.marshalling.ByteArrayMarshaller; import org.apache.ignite.marshalling.Marshaller; import org.apache.ignite.table.Tuple; import org.apache.ignite.Ignite; +import training.model.CustomerPrice; import training.model.TopCustomer; /** @@ -74,7 +74,7 @@ private static void calculateTopPayingCustomers(Ignite ignite) { /** * Task that is executed on every cluster node and calculates top-5 local paying customers stored on a node. */ - private static class TopPayingCustomersTask implements MapReduceTask { + private static class TopPayingCustomersTask implements MapReduceTask { public TopPayingCustomersTask() { } @@ -87,12 +87,12 @@ public Marshaller reduceJobResultMarshaller() { } @Override - public CompletableFuture>> splitAsync(TaskExecutionContext taskExecutionContext, Integer customersCount) { + public CompletableFuture>> splitAsync(TaskExecutionContext taskExecutionContext, Integer customersCount) { this.customerCount = customersCount; return taskExecutionContext.ignite().tables().table("InvoiceLine").partitionManager().primaryReplicasAsync() .thenApply(x -> x.entrySet().stream() .map(jobParameter -> - MapReduceJob.builder() + MapReduceJob.builder() .nodes(List.of(jobParameter.getValue())) .args(Tuple.create() // FIXME: don't use hashCode once API is finalised @@ -101,6 +101,7 @@ public CompletableFuture>> splitAsync(TaskExecut .jobDescriptor( JobDescriptor.builder(TopPayingCustomersJob.class) .units(deploymentUnit) + .resultMarshaller(ByteArrayMarshaller.create()) .build() ) .build()) @@ -108,14 +109,19 @@ public CompletableFuture>> splitAsync(TaskExecut ); } - private static class TopPayingCustomersJob implements ComputeJob { + private static class TopPayingCustomersJob implements ComputeJob { // Verify results with: select customerid, sum(quantity * unitprice) as price from invoiceline group by customerid order by price desc limit 5 private static String sql = "select customerid, quantity * unitprice as price from invoiceline where \"__part\" = ?"; private int customerCount = 5; @Override - public CompletableFuture executeAsync(JobExecutionContext jobExecutionContext, Tuple parameters) { + public Marshaller resultMarshaller() { + return ByteArrayMarshaller.create(); + } + + @Override + public CompletableFuture executeAsync(JobExecutionContext jobExecutionContext, Tuple parameters) { final HashMap customerPurchases = new HashMap<>(); customerCount = parameters.intValue("count"); @@ -131,27 +137,21 @@ public CompletableFuture executeAsync(JobExecutionContext jobExecutionCon r.sort(Map.Entry.comparingByValue()); Collections.reverse(r); - var results = Tuple.create(); + var results = new ArrayList(); for (var p = 0; p < r.size() && p < customerCount; p++) { - results.set(r.get(p).getKey().toString(), r.get(p).getValue()); + results.add(new CustomerPrice(r.get(p).getKey(), r.get(p).getValue())); } - return CompletableFuture.completedFuture(results); + return CompletableFuture.completedFuture(results.toArray(new CustomerPrice[customerCount])); } } @Override - public CompletableFuture reduceAsync(TaskExecutionContext taskExecutionContext, Map map) { - var r = new HashMap(); + public CompletableFuture reduceAsync(TaskExecutionContext taskExecutionContext, Map map) { + var orderedResults = new ArrayList(); for (var result : map.values()) { - for (var e = 0; e < result.columnCount(); e++) { - // FIXME: where do the quotes come from? - var customerId = result.columnName(e).replaceAll("\"", ""); - var price = result.value(customerId); - r.put(Integer.valueOf(customerId), price); - } + orderedResults.addAll(Arrays.stream(result).filter(Objects::nonNull).collect(Collectors.toCollection(LinkedList::new))); } - var orderedResults = new ArrayList<>(r.entrySet()); - orderedResults.sort(Map.Entry.comparingByValue()); + orderedResults.sort(Comparator.comparing(CustomerPrice::getPrice)); Collections.reverse(orderedResults); var customersCache = taskExecutionContext.ignite().tables().table("Customer").recordView(); @@ -159,7 +159,7 @@ public CompletableFuture reduceAsync(TaskExecutionContext taskExe for (var p = 0; p < orderedResults.size() && p < customerCount; p++) { var newRecord = Tuple.create(); - var key = orderedResults.get(p).getKey(); + var key = orderedResults.get(p).getCustomerId(); var customerRecord = customersCache.get(null, Tuple.create().set("customerId", key)); @@ -175,9 +175,9 @@ public CompletableFuture reduceAsync(TaskExecutionContext taskExe .set("city", "unknown") .set("country", "unknown"); } - newRecord.set("price", orderedResults.get(p).getValue()); + newRecord.set("price", orderedResults.get(p).getPrice()); - var val = new TopCustomer(key, orderedResults.get(p).getValue()); + var val = new TopCustomer(key, orderedResults.get(p).getPrice()); val.setFullName(customerRecord.stringValue("firstName") + " " + customerRecord.stringValue("lastName")); val.setCity(customerRecord.stringValue("city")); val.setCountry(customerRecord.stringValue("country")); diff --git a/src/main/java/training/model/CustomerPrice.java b/src/main/java/training/model/CustomerPrice.java new file mode 100644 index 0000000..c4a952f --- /dev/null +++ b/src/main/java/training/model/CustomerPrice.java @@ -0,0 +1,36 @@ +package training.model; + +import java.io.Serializable; +import java.math.BigDecimal; +import java.util.Comparator; + +public class CustomerPrice implements Serializable, Comparator { + private final Integer customerId; + private final BigDecimal price; + + public CustomerPrice(Integer customerId, BigDecimal price) { + this.customerId = customerId; + this.price = price; + } + + public Integer getCustomerId() { + return customerId; + } + + public BigDecimal getPrice() { + return price; + } + + @Override + public int compare(CustomerPrice o1, CustomerPrice o2) { + return o1.getPrice().compareTo(o2.getPrice()); + } + + @Override + public String toString() { + return "CustomerPrice{" + + "customerId=" + customerId + + ", price=" + price + + '}'; + } +}