Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 57 additions & 16 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ Check [the complete schedule](https://www.gridgain.com/products/services/trainin
```

2. (optionally), open the project in your favourite IDE such as IntelliJ or Eclipse, or just use a simple text editor
and command-line instructions prepared for all the samples.
and command-line instructions prepared for all the samples.

## Sign up for GridGain's Nebula service

Expand Down Expand Up @@ -51,28 +51,28 @@ Start a two-node Ignite cluster:
8. Click `Continue`
9. Click `Attach`
10. Initialise the cluster by clicking the `Initialise` button at the top-right of the screen

## Creating Media Store Schema and Loading Data

Now you need to create a Media Store schema and load the cluster with sample data. Use SQLLine tool to achieve that:

1. Open a terminal window and navigate to the root directory of this project.
2. Load the media store database:

a. Start the Command Line Interface (CLI)

```bash
docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./config/media_store.sql:/opt/ignite/downloads/media_store.sql --rm --network ignite3_default -it apacheignite/ignite:3.0.0 cli
docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./config/media_store.sql:/opt/ignite/downloads/media_store.sql --rm --network ignite3_default -it apacheignite/ignite:3.1.0 cli
```

b. Connect to the cluster.

```bash
connect http://node1:10300
```

c. Execute SQL command to load the sample data.

```bash
sql --file=/opt/ignite/downloads/media_store.sql
```
Expand All @@ -88,7 +88,7 @@ With the Media Store database loaded, you can check how Ignite distributed the r

## Affinity Co-location - Optimizing Complex SQL Queries With JOINs

Ignite supports SQL for data processing including distributed joins, grouping and sorting. In this section, you're
Ignite supports SQL for data processing including distributed joins, grouping and sorting. In this section, you're
going to run basic SQL operations as well as more advanced ones.

### Querying Single Table
Expand Down Expand Up @@ -121,36 +121,77 @@ JOIN with the `Artist` table:
3. Examine the output. Your instructor will give hints for what to look for. It will look something like this:

```bash
Limit(fetch=[20]): rowcount = 20.0, cumulative cost = IgniteCost [rowCount=15318.06, cpu=77499.96615043783, memory=33461.76, io=178134.0, network=101068.0], id = 35293
Sort
collation: [DURATION DESC]
fetch: 20
est: (rows=1)
ColocatedHashAggregate
fieldNames: [TRACKID, TRACK_NAME, GENRE, ARTIST, DURATION]
group: [TRACKID, TRACK_NAME, GENRE, ARTIST]
aggregation: [MAX($f4)]
est: (rows=1)
Project
fieldNames: [TRACKID, TRACK_NAME, GENRE, ARTIST, $f4]
projection: [TRACKID, NAME, NAME$1, NAME$0, /(MILLISECONDS, *(1000, 60))]
est: (rows=1)
HashJoin
predicate: =(GENREID, GENREID$0)
fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS, ARTISTID$0, NAME$0, GENREID$0, NAME$1]
type: inner
est: (rows=1)
Exchange
distribution: single
est: (rows=1)
HashJoin
predicate: =(ARTISTID, ARTISTID$0)
fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS, ARTISTID$0, NAME$0]
type: left
est: (rows=1)
TableScan
table: PUBLIC.TRACK
predicate: <(GENREID, 17)
fieldNames: [TRACKID, NAME, ARTISTID, GENREID, MILLISECONDS]
est: (rows=1)
TableScan
table: PUBLIC.ARTIST
fieldNames: [ARTISTID, NAME]
est: (rows=1)
Exchange
distribution: single
est: (rows=1)
TableScan
table: PUBLIC.GENRE
fieldNames: [GENREID, NAME]
est: (rows=1)
```

## Running Co-located Compute Tasks

Run `training.ComputeApp` that uses Apache Ignite compute capabilities for a calculation of top-5 paying customers.
The compute task executes on every cluster node, iterates through local records and responds to the application that
The compute task executes on every cluster node, iterates through local records and responds to the application that
merges partial results.

1. Build an executable JAR with the applications' classes (or just start the app with IntelliJ IDEA or Eclipse):

```bash
mvn clean package
mvn clean package
```
2. Load the code into your cluster:

a. Start the CLI.

```bash
docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./target/ignite-essentials-developer-training-1.0-SNAPSHOT.jar:/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar --rm --network ignite3_default -it apacheignite/ignite:3.0.0 cli
docker run -e LANG=C.UTF-8 -e LC_ALL=C.UTF-8 -v ./target/ignite-essentials-developer-training-1.0-SNAPSHOT.jar:/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar --rm --network ignite3_default -it apacheignite/ignite:3.1.0 cli
```

b. Connect to the cluster.

```bash
connect http://node1:10300
```

c. Deploy the code to the cluster.

```bash
cluster unit deploy --version 1.0.0 --path=/opt/ignite/downloads/ignite-essentials-developer-training-1.0-SNAPSHOT.jar essentialsCompute
```
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ name: ignite3

x-ignite-def:
&ignite-def
image: apacheignite/ignite:3.0.0
image: apacheignite/ignite:3.1.0
environment:
- JVM_MAX_MEM=3g
- JVM_MIN_MEM=3g
Expand Down
3 changes: 2 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<ignite.project>org.apache.ignite</ignite.project>
<ignite.version>3.0.0</ignite.version>
<ignite.version>3.1.0</ignite.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<exec.cleanupDaemonThreads>false</exec.cleanupDaemonThreads>
</properties>

<dependencies>
Expand Down
66 changes: 66 additions & 0 deletions src/main/java/training/ByteArrayMarshaller.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package training;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;

public class ByteArrayMarshaller<T> implements Marshaller<T, byte[]> {

static <T> ByteArrayMarshaller<T> create() {
return new ByteArrayMarshaller<>();
}

@Override
public byte[] marshal(T object) {
if (object == null) {
return null;
}

if (object instanceof Serializable) {
try (var baos = new ByteArrayOutputStream(); var out = new ObjectOutputStream(baos)) {
out.writeObject(object);
out.flush();

return baos.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
}

throw new UnsupportedObjectTypeMarshallingException(object.getClass());
}

@Override
public T unmarshal(byte[] raw) {
if (raw == null) {
return null;
}

try (var bais = new ByteArrayInputStream(raw); var ois = new ObjectInputStream(bais)) {
return (T) ois.readObject();
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
}
44 changes: 22 additions & 22 deletions src/main/java/training/ComputeApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.Ignite;
import training.model.CustomerPrice;
import training.model.TopCustomer;

/**
Expand Down Expand Up @@ -74,7 +74,7 @@ private static void calculateTopPayingCustomers(Ignite ignite) {
/**
* Task that is executed on every cluster node and calculates top-5 local paying customers stored on a node.
*/
private static class TopPayingCustomersTask implements MapReduceTask<Integer, Tuple, Tuple, TopCustomer[]> {
private static class TopPayingCustomersTask implements MapReduceTask<Integer, Tuple, CustomerPrice[], TopCustomer[]> {

public TopPayingCustomersTask() {
}
Expand All @@ -87,12 +87,12 @@ public Marshaller<TopCustomer[], byte[]> reduceJobResultMarshaller() {
}

@Override
public CompletableFuture<List<MapReduceJob<Tuple, Tuple>>> splitAsync(TaskExecutionContext taskExecutionContext, Integer customersCount) {
public CompletableFuture<List<MapReduceJob<Tuple, CustomerPrice[]>>> splitAsync(TaskExecutionContext taskExecutionContext, Integer customersCount) {
this.customerCount = customersCount;
return taskExecutionContext.ignite().tables().table("InvoiceLine").partitionManager().primaryReplicasAsync()
.thenApply(x -> x.entrySet().stream()
.map(jobParameter ->
MapReduceJob.<Tuple,Tuple>builder()
MapReduceJob.<Tuple,CustomerPrice[]>builder()
.nodes(List.of(jobParameter.getValue()))
.args(Tuple.create()
// FIXME: don't use hashCode once API is finalised
Expand All @@ -101,21 +101,27 @@ public CompletableFuture<List<MapReduceJob<Tuple, Tuple>>> splitAsync(TaskExecut
.jobDescriptor(
JobDescriptor.builder(TopPayingCustomersJob.class)
.units(deploymentUnit)
.resultMarshaller(ByteArrayMarshaller.create())
.build()
)
.build())
.collect(Collectors.toList())
);
}

private static class TopPayingCustomersJob implements ComputeJob<Tuple, Tuple> {
private static class TopPayingCustomersJob implements ComputeJob<Tuple, CustomerPrice[]> {
// Verify results with: select customerid, sum(quantity * unitprice) as price from invoiceline group by customerid order by price desc limit 5
private static String sql = "select customerid, quantity * unitprice as price from invoiceline where \"__part\" = ?";

private int customerCount = 5;

@Override
public CompletableFuture<Tuple> executeAsync(JobExecutionContext jobExecutionContext, Tuple parameters) {
public Marshaller<CustomerPrice[], byte[]> resultMarshaller() {
return ByteArrayMarshaller.create();
}

@Override
public CompletableFuture<CustomerPrice[]> executeAsync(JobExecutionContext jobExecutionContext, Tuple parameters) {
final HashMap<Integer, BigDecimal> customerPurchases = new HashMap<>();

customerCount = parameters.intValue("count");
Expand All @@ -131,35 +137,29 @@ public CompletableFuture<Tuple> executeAsync(JobExecutionContext jobExecutionCon
r.sort(Map.Entry.comparingByValue());
Collections.reverse(r);

var results = Tuple.create();
var results = new ArrayList<CustomerPrice>();
for (var p = 0; p < r.size() && p < customerCount; p++) {
results.set(r.get(p).getKey().toString(), r.get(p).getValue());
results.add(new CustomerPrice(r.get(p).getKey(), r.get(p).getValue()));
}
return CompletableFuture.completedFuture(results);
return CompletableFuture.completedFuture(results.toArray(new CustomerPrice[customerCount]));
}
}

@Override
public CompletableFuture<TopCustomer[]> reduceAsync(TaskExecutionContext taskExecutionContext, Map<UUID, Tuple> map) {
var r = new HashMap<Integer,BigDecimal>();
public CompletableFuture<TopCustomer[]> reduceAsync(TaskExecutionContext taskExecutionContext, Map<UUID, CustomerPrice[]> map) {
var orderedResults = new ArrayList<CustomerPrice>();
for (var result : map.values()) {
for (var e = 0; e < result.columnCount(); e++) {
// FIXME: where do the quotes come from?
var customerId = result.columnName(e).replaceAll("\"", "");
var price = result.<BigDecimal>value(customerId);
r.put(Integer.valueOf(customerId), price);
}
orderedResults.addAll(Arrays.stream(result).filter(Objects::nonNull).collect(Collectors.toCollection(LinkedList::new)));
}
var orderedResults = new ArrayList<>(r.entrySet());
orderedResults.sort(Map.Entry.comparingByValue());
orderedResults.sort(Comparator.comparing(CustomerPrice::getPrice));
Collections.reverse(orderedResults);

var customersCache = taskExecutionContext.ignite().tables().table("Customer").recordView();
var results = new ArrayList<TopCustomer>();
for (var p = 0; p < orderedResults.size() && p < customerCount; p++) {
var newRecord = Tuple.create();

var key = orderedResults.get(p).getKey();
var key = orderedResults.get(p).getCustomerId();

var customerRecord = customersCache.get(null, Tuple.create().set("customerId", key));

Expand All @@ -175,9 +175,9 @@ public CompletableFuture<TopCustomer[]> reduceAsync(TaskExecutionContext taskExe
.set("city", "unknown")
.set("country", "unknown");
}
newRecord.set("price", orderedResults.get(p).getValue());
newRecord.set("price", orderedResults.get(p).getPrice());

var val = new TopCustomer(key, orderedResults.get(p).getValue());
var val = new TopCustomer(key, orderedResults.get(p).getPrice());
val.setFullName(customerRecord.stringValue("firstName") + " " + customerRecord.stringValue("lastName"));
val.setCity(customerRecord.stringValue("city"));
val.setCountry(customerRecord.stringValue("country"));
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/training/model/CustomerPrice.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package training.model;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Comparator;

public class CustomerPrice implements Serializable, Comparator<CustomerPrice> {
private final Integer customerId;
private final BigDecimal price;

public CustomerPrice(Integer customerId, BigDecimal price) {
this.customerId = customerId;
this.price = price;
}

public Integer getCustomerId() {
return customerId;
}

public BigDecimal getPrice() {
return price;
}

@Override
public int compare(CustomerPrice o1, CustomerPrice o2) {
return o1.getPrice().compareTo(o2.getPrice());
}

@Override
public String toString() {
return "CustomerPrice{" +
"customerId=" + customerId +
", price=" + price +
'}';
}
}