Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
<scala.version>2.12.17</scala.version>
<scala.mayor.version>2.12</scala.mayor.version>
<spark.version>3.4.4</spark.version>
<sedona.version>1.6.1</sedona.version>
<flink.version>1.20.0</flink.version>
<calcite.version>1.39.0</calcite.version>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@ import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
import org.apache.wayang.core.optimizer.costs.LoadProfileEstimator
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate}
import org.apache.wayang.core.platform.Platform
import org.apache.wayang.core.util.{Tuple => WayangTuple}
import org.apache.wayang.basic.data.{Record, Tuple2 => WayangTuple2}
import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel};
import org.apache.wayang.basic.model.{DLModel, LogisticRegressionModel,DecisionTreeRegressionModel}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import com.google.protobuf.ByteString;
import com.google.protobuf.ByteString
import org.apache.wayang.api.python.function._
import org.tensorflow.ndarray.NdArray

Expand Down Expand Up @@ -632,6 +633,81 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
joinOperator
}

/**
* Applies a spatial filter to this instance.
*
* @param keySelector UDF to extract spatial geometry from data quanta
* @param predicateType the spatial predicate type
* @param filterGeometry the geometry to filter against
* @param columnName optional SQL column name for database pushdown
* @return a new instance representing the filtered output
*/
def spatialFilter(keySelector: Out => SpatialGeometry,
predicateType: SpatialPredicate,
filterGeometry: SpatialGeometry,
columnName: String = null): DataQuanta[Out] =
spatialFilterJava(toSerializableFunction(keySelector), predicateType, filterGeometry, columnName)

/**
* Applies a spatial filter to this instance.
*
* @param keySelector UDF to extract spatial geometry from data quanta
* @param predicateType the spatial predicate type
* @param filterGeometry the geometry to filter against
* @param columnName optional SQL column name for database pushdown
* @return a new instance representing the filtered output
*/
def spatialFilterJava(keySelector: SerializableFunction[Out, _ <: SpatialGeometry],
predicateType: SpatialPredicate,
filterGeometry: SpatialGeometry,
columnName: String = null): DataQuanta[Out] = {
val op = new SpatialFilterOperator(predicateType, keySelector, dataSetType[Out], filterGeometry)
if (columnName != null) op.getKeyDescriptor.withSqlImplementation(null, columnName)
this.connectTo(op, 0)
wrap[Out](op)
}

/**
* Feeds this and a further instance into a [[SpatialJoinOperator]].
*
* @param thisKeyUdf UDF to extract spatial geometry from this instance's elements
* @param that the other instance
* @param thatKeyUdf UDF to extract spatial geometry from `that` instance's elements
* @param predicateType the spatial predicate type for the join
* @return a new instance representing the SpatialJoinOperator's output
*/
def spatialJoin[ThatOut: ClassTag](
thisKeyUdf: Out => SpatialGeometry,
that: DataQuanta[ThatOut],
thatKeyUdf: ThatOut => SpatialGeometry,
predicateType: SpatialPredicate): DataQuanta[WayangTuple2[Out, ThatOut]] =
spatialJoinJava(toSerializableFunction(thisKeyUdf), that, toSerializableFunction(thatKeyUdf), predicateType)

/**
* Feeds this and a further instance into a [[SpatialJoinOperator]].
*
* @param thisKeyUdf UDF to extract spatial geometry from this instance's elements
* @param that the other instance
* @param thatKeyUdf UDF to extract spatial geometry from `that` instance's elements
* @param predicateType the spatial predicate type for the join
* @return a new instance representing the SpatialJoinOperator's output
*/
def spatialJoinJava[ThatOut: ClassTag](
thisKeyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
that: DataQuanta[ThatOut],
thatKeyUdf: SerializableFunction[ThatOut, _ <: SpatialGeometry],
predicateType: SpatialPredicate): DataQuanta[WayangTuple2[Out, ThatOut]] = {
require(this.planBuilder eq that.planBuilder, s"$this and $that must use the same plan builders.")
val op = new SpatialJoinOperator(
new TransformationDescriptor(thisKeyUdf.asInstanceOf[SerializableFunction[Out, SpatialGeometry]], basicDataUnitType[Out], basicDataUnitType[SpatialGeometry]),
new TransformationDescriptor(thatKeyUdf.asInstanceOf[SerializableFunction[ThatOut, SpatialGeometry]], basicDataUnitType[ThatOut], basicDataUnitType[SpatialGeometry]),
predicateType
)
this.connectTo(op, 0)
that.connectTo(op, 1)
wrap[WayangTuple2[Out, ThatOut]](op)
}

def predict[ThatOut: ClassTag](
that: DataQuanta[ThatOut],
inputType: Class[_ <: Any],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.wayang.basic.data.{Record, Tuple2 => RT2}
import org.apache.wayang.basic.model.{DLModel, Model, LogisticRegressionModel,DecisionTreeRegressionModel}
import org.apache.wayang.basic.operators.{DLTrainingOperator, GlobalReduceOperator, LocalCallbackSink, MapOperator, SampleOperator, LogisticRegressionOperator,DecisionTreeRegressionOperator, LinearSVCOperator}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.spatial.{SpatialGeometry, SpatialPredicate}
import org.apache.wayang.core.function.FunctionDescriptor.{SerializableBiFunction, SerializableBinaryOperator, SerializableFunction, SerializableIntUnaryOperator, SerializablePredicate}
import org.apache.wayang.core.optimizer.ProbabilisticDoubleInterval
import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator
Expand Down Expand Up @@ -281,6 +282,57 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
thatKeyUdf: SerializableFunction[ThatOut, Key]) =
new JoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf)

/**
* Feed the built [[DataQuanta]] into a spatial filter operator.
* Requires the wayang-spatial plugin to be loaded.
*
* @param keyUdf function to extract geometry from elements
* @param predicate the spatial predicate type
* @param filterGeometry the geometry to filter against
* @return a [[DataQuantaBuilder]] representing the filtered output
*/
def spatialFilter(
keyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
predicate: SpatialPredicate,
filterGeometry: SpatialGeometry
): SpatialFilterDataQuantaBuilder[Out] =
new SpatialFilterDataQuantaBuilder(this, keyUdf, predicate, filterGeometry)

/**
* Feed the built [[DataQuanta]] into a spatial filter operator with SQL pushdown support.
*
* @param keyUdf function to extract geometry from elements
* @param predicate the spatial predicate type
* @param filterGeometry the geometry to filter against
* @param sqlGeometryColumn the name of the geometry column in the database for SQL pushdown
* @return a [[SpatialFilterDataQuantaBuilder]] representing the filtered output
*/
def spatialFilter(
keyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
predicate: SpatialPredicate,
filterGeometry: SpatialGeometry,
sqlGeometryColumn: String
): SpatialFilterDataQuantaBuilder[Out] =
new SpatialFilterDataQuantaBuilder(this, keyUdf, predicate, filterGeometry)
.withSqlGeometryColumnName(sqlGeometryColumn)

/**
* Feed the built [[DataQuanta]] of this and the given instance into a spatial join operator.
*
* @param thisKeyUdf function to extract geometry from this instance's elements
* @param that the other [[DataQuantaBuilder]] to join with
* @param thatKeyUdf function to extract geometry from `that` instance's elements
* @param predicate the spatial predicate type
* @return a [[SpatialJoinDataQuantaBuilder]] representing the joined output as Tuple2
*/
def spatialJoin[ThatOut](
thisKeyUdf: SerializableFunction[Out, _ <: SpatialGeometry],
that: DataQuantaBuilder[_, ThatOut],
thatKeyUdf: SerializableFunction[ThatOut, _ <: SpatialGeometry],
predicate: SpatialPredicate
): SpatialJoinDataQuantaBuilder[Out, ThatOut] =
new SpatialJoinDataQuantaBuilder(this, that, thisKeyUdf, thatKeyUdf, predicate)

/**
* Feed the built [[DataQuanta]] of this and the given instance into a
* [[org.apache.wayang.basic.operators.DLTrainingOperator]].
Expand Down Expand Up @@ -510,12 +562,12 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
* @param catalog Iceberg Catalog
* @param schema Iceberg Schema of the table to create
* @param tableIdentifier Iceberg Table Identifier of the table to create
* @param outputFileFormat File format of the output data files
* @param outputFileFormat File format of the output data files
* @return the collected data quanta
*/

def writeIcebergTable(catalog: Catalog,
schema: Schema,
def writeIcebergTable(catalog: Catalog,
schema: Schema,
tableIdentifier: TableIdentifier,
outputFileFormat: FileFormat,
jobName: String): Unit = {
Expand Down Expand Up @@ -1929,3 +1981,41 @@ class KeyedDataQuantaBuilder[Out, Key](private val dataQuantaBuilder: DataQuanta
dataQuantaBuilder.coGroup(this.keyExtractor, that.dataQuantaBuilder, that.keyExtractor)

}

class SpatialFilterDataQuantaBuilder[T](inputDataQuanta: DataQuantaBuilder[_, T],
keySelector: SerializableFunction[T, _ <: SpatialGeometry],
predicateType: SpatialPredicate,
filterGeometry: SpatialGeometry)
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[SpatialFilterDataQuantaBuilder[T], T] {

private var columnName: String = _

def withSqlGeometryColumnName(columnName: String): SpatialFilterDataQuantaBuilder[T] = {
this.columnName = columnName
this
}

override protected def build: DataQuanta[T] = {
val dq = inputDataQuanta.dataQuanta()
dq.spatialFilterJava(keySelector, predicateType, filterGeometry, this.columnName)
}
}

class SpatialJoinDataQuantaBuilder[In0, In1](inputDataQuanta0: DataQuantaBuilder[_, In0],
inputDataQuanta1: DataQuantaBuilder[_, In1],
keyUdf0: SerializableFunction[In0, _ <: SpatialGeometry],
keyUdf1: SerializableFunction[In1, _ <: SpatialGeometry],
predicateType: SpatialPredicate)
(implicit javaPlanBuilder: JavaPlanBuilder)
extends BasicDataQuantaBuilder[SpatialJoinDataQuantaBuilder[In0, In1], RT2[In0, In1]] {

override protected def build: DataQuanta[RT2[In0, In1]] = {
val dq0 = inputDataQuanta0.dataQuanta()
val dq1 = inputDataQuanta1.dataQuanta()
applyTargetPlatforms(
dq0.spatialJoinJava(keyUdf0, dq1, keyUdf1, predicateType)(inputDataQuanta1.classTag),
this.getTargetPlatforms()
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,22 @@ void testMapReduceBy() {
assertEquals(WayangCollections.asSet(4 + 16, 1 + 9), WayangCollections.asSet(outputCollection));
}

@Test
void testFilter() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
JavaPlanBuilder builder = new JavaPlanBuilder(wayangContext);

final List<Integer> inputValues = Arrays.asList(1, 2, 3, 4, 5, 6);

final Collection<Integer> outputValues = builder
.loadCollection(inputValues).withName("Load input values")
.filter(i -> (i & 1) == 0).withName("Filter even numbers")
.collect();

Set<Integer> expectedValues = WayangCollections.asSet(2, 4, 6);
assertEquals(expectedValues, WayangCollections.asSet(outputValues));
}

@Test
void testBroadcast2() {
WayangContext wayangContext = new WayangContext().with(Java.basicPlugin());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public static Object convert(RelDataType fieldType, String string) {
} catch (ParseException e) {
return null;
}
case GEOMETRY:
case VARCHAR:
default:
return string;
Expand Down
5 changes: 5 additions & 0 deletions wayang-benchmark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<artifactId>wayang-postgres</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-spatial</artifactId>
<version>1.1.2-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.wayang</groupId>
<artifactId>wayang-sqlite3</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.wayang.apps.spatial;

import org.apache.wayang.api.JavaPlanBuilder;
import org.apache.wayang.spatial.data.WayangGeometry;
import org.apache.wayang.core.api.Configuration;
import org.apache.wayang.core.api.WayangContext;
import org.apache.wayang.core.api.spatial.SpatialPredicate;
import org.apache.wayang.java.Java;
import org.apache.wayang.spatial.Spatial;

import java.util.Collection;

public class ComplexSpatialFilterSpark {

public static void main(String[] args) {


//// Debugging might be useful, set level to "FINEST" to see actual db query strings
System.out.println( ">>> Test a Filter Operator");

//// Db Connection, local db credentials!
Configuration configuration = new Configuration();
configuration.setProperty("wayang.postgres.jdbc.url", "jdbc:postgresql://localhost:5432/spatialdb"); // Default port 5432
configuration.setProperty("wayang.postgres.jdbc.user", "postgres");
configuration.setProperty("wayang.postgres.jdbc.password", "postgres");

WayangContext wayangContext = new WayangContext(configuration)
.withPlugin(Java.basicPlugin())
.withPlugin(Spatial.javaPlugin())
;
JavaPlanBuilder planBuilder = new JavaPlanBuilder(wayangContext)
.withJobName("Filter Test")
.withUdfJarOf(ComplexSpatialFilterSpark.class);

WayangGeometry queryGeometry = WayangGeometry.fromStringInput(
"POLYGON((12.777099609375 52.219050335542484, 13.991088867187502 52.219050335542484, 13.991088867187502 52.71766191466581, 12.777099609375 52.71766191466581, 12.777099609375 52.219050335542484))"
);

final Collection<Long> outputValues = planBuilder
.readTextFile("file:///sc/home/maximilian.speer/wayang/cemetery.csv")

.spatialFilter(
(input -> {
WayangGeometry geom = WayangGeometry.fromStringInput((input.split("\",")[0]).replace("\"", ""));
return geom;
}),
SpatialPredicate.INTERSECTS,
queryGeometry
).withTargetPlatform(Java.platform())
.withName("Spatial Filter (intersects)")
.count()
.collect();

System.out.println("Spatial Filter (intersects): " + outputValues);

return;

}
}
Loading
Loading