Skip to content

Commit 5f1ee8d

Browse files
Jolanrensenasm0dey
authored andcommitted
ref: adds forEachPartition for datasets, takeValues and takeKeys for key/value like datasets, and added property ref based sort function for Datasets (#91)
1 parent 3fde1f1 commit 5f1ee8d

File tree

4 files changed

+123
-1
lines changed
  • kotlin-spark-api
    • 2.4/src
      • main/kotlin/org/jetbrains/kotlinx/spark/api
      • test/kotlin/org/jetbrains/kotlinx/spark/api
    • 3.0/src
      • main/kotlin/org/jetbrains/kotlinx/spark/api
      • test/kotlin/org/jetbrains/kotlinx/spark/api

4 files changed

+123
-1
lines changed

kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,22 @@ inline fun <reified KEY, reified VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduc
180180
reduceGroups(ReduceFunction(func))
181181
.map { t -> t._1 to t._2 }
182182

183+
@JvmName("takeKeysTuple2")
184+
inline fun <reified T1, T2> Dataset<Tuple2<T1, T2>>.takeKeys(): Dataset<T1> = map { it._1() }
185+
186+
inline fun <reified T1, T2> Dataset<Pair<T1, T2>>.takeKeys(): Dataset<T1> = map { it.first }
187+
188+
@JvmName("takeKeysArity2")
189+
inline fun <reified T1, T2> Dataset<Arity2<T1, T2>>.takeKeys(): Dataset<T1> = map { it._1 }
190+
191+
@JvmName("takeValuesTuple2")
192+
inline fun <T1, reified T2> Dataset<Tuple2<T1, T2>>.takeValues(): Dataset<T2> = map { it._2() }
193+
194+
inline fun <T1, reified T2> Dataset<Pair<T1, T2>>.takeValues(): Dataset<T2> = map { it.second }
195+
196+
@JvmName("takeValuesArity2")
197+
inline fun <T1, reified T2> Dataset<Arity2<T1, T2>>.takeValues(): Dataset<T2> = map { it._2 }
198+
183199
inline fun <K, V, reified U> KeyValueGroupedDataset<K, V>.flatMapGroups(
184200
noinline func: (key: K, values: Iterator<V>) -> Iterator<U>
185201
): Dataset<U> = flatMapGroups(
@@ -238,6 +254,8 @@ inline fun <reified R> Dataset<*>.to(): Dataset<R> = `as`(encoder<R>())
238254

239255
inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))
240256

257+
inline fun <reified T> Dataset<T>.forEachPartition(noinline func: (Iterator<T>) -> Unit) = foreachPartition(ForeachPartitionFunction(func))
258+
241259
/**
242260
* It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that
243261
*/
@@ -718,6 +736,16 @@ inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T,
718736
*/
719737
inline operator fun <reified T, reified U> Dataset<T>.invoke(column: KProperty1<T, U>): TypedColumn<T, U> = col(column)
720738

739+
/**
740+
* Allows to sort data class dataset on one or more of the properties of the data class.
741+
* ```kotlin
742+
* val sorted: Dataset<YourClass> = unsorted.sort(YourClass::a)
743+
* val sorted2: Dataset<YourClass> = unsorted.sort(YourClass::a, YourClass::b)
744+
* ```
745+
*/
746+
fun <T> Dataset<T>.sort(col: KProperty1<T, *>, vararg cols: KProperty1<T, *>): Dataset<T> =
747+
sort(col.name, *cols.map { it.name }.toTypedArray())
748+
721749
/**
722750
* Alternative to [Dataset.show] which returns source dataset.
723751
* Useful for debug purposes when you need to view content of a dataset as an intermediate operation

kotlin-spark-api/2.4/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -437,11 +437,43 @@ class ApiTest : ShouldSpec({
437437

438438
b.count() shouldBe 1
439439
}
440+
should("Allow simple forEachPartition in datasets") {
441+
val dataset = dsOf(
442+
SomeClass(intArrayOf(1, 2, 3), 1),
443+
SomeClass(intArrayOf(4, 3, 2), 1),
444+
)
445+
dataset.forEachPartition {
446+
it.forEach {
447+
it.b shouldBe 1
448+
}
449+
}
450+
}
451+
should("Have easier access to keys and values for key/value datasets") {
452+
val dataset: Dataset<SomeClass> = dsOf(
453+
SomeClass(intArrayOf(1, 2, 3), 1),
454+
SomeClass(intArrayOf(4, 3, 2), 1),
455+
)
456+
.groupByKey { it.b }
457+
.reduceGroups(func = { a, b -> SomeClass(a.a + b.a, a.b) })
458+
.takeValues()
459+
460+
dataset.count() shouldBe 1
461+
}
462+
should("Be able to sort datasets with property reference") {
463+
val dataset: Dataset<SomeClass> = dsOf(
464+
SomeClass(intArrayOf(1, 2, 3), 2),
465+
SomeClass(intArrayOf(4, 3, 2), 1),
466+
)
467+
dataset.sort(SomeClass::b)
468+
dataset.takeAsList(1).first().b shouldBe 2
469+
470+
dataset.sort(SomeClass::a, SomeClass::b)
471+
dataset.takeAsList(1).first().b shouldBe 2
472+
}
440473
}
441474
}
442475
})
443476

444-
445477
data class DataClassWithTuple<T : Product>(val tuple: T)
446478

447479

kotlin-spark-api/3.0/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,23 @@ inline fun <reified KEY, reified VALUE> KeyValueGroupedDataset<KEY, VALUE>.reduc
176176
reduceGroups(ReduceFunction(func))
177177
.map { t -> t._1 to t._2 }
178178

179+
@JvmName("takeKeysTuple2")
180+
inline fun <reified T1, T2> Dataset<Tuple2<T1, T2>>.takeKeys(): Dataset<T1> = map { it._1() }
181+
182+
inline fun <reified T1, T2> Dataset<Pair<T1, T2>>.takeKeys(): Dataset<T1> = map { it.first }
183+
184+
@JvmName("takeKeysArity2")
185+
inline fun <reified T1, T2> Dataset<Arity2<T1, T2>>.takeKeys(): Dataset<T1> = map { it._1 }
186+
187+
@JvmName("takeValuesTuple2")
188+
inline fun <T1, reified T2> Dataset<Tuple2<T1, T2>>.takeValues(): Dataset<T2> = map { it._2() }
189+
190+
inline fun <T1, reified T2> Dataset<Pair<T1, T2>>.takeValues(): Dataset<T2> = map { it.second }
191+
192+
@JvmName("takeValuesArity2")
193+
inline fun <T1, reified T2> Dataset<Arity2<T1, T2>>.takeValues(): Dataset<T2> = map { it._2 }
194+
195+
179196
inline fun <K, V, reified U> KeyValueGroupedDataset<K, V>.flatMapGroups(
180197
noinline func: (key: K, values: Iterator<V>) -> Iterator<U>
181198
): Dataset<U> = flatMapGroups(
@@ -234,6 +251,8 @@ inline fun <reified R> Dataset<*>.to(): Dataset<R> = `as`(encoder<R>())
234251

235252
inline fun <reified T> Dataset<T>.forEach(noinline func: (T) -> Unit) = foreach(ForeachFunction(func))
236253

254+
inline fun <reified T> Dataset<T>.forEachPartition(noinline func: (Iterator<T>) -> Unit) = foreachPartition(ForeachPartitionFunction(func))
255+
237256
/**
238257
* It's hard to call `Dataset.debugCodegen` from kotlin, so here is utility for that
239258
*/
@@ -714,6 +733,16 @@ inline fun <reified T, reified U> col(column: KProperty1<T, U>): TypedColumn<T,
714733
*/
715734
inline operator fun <reified T, reified U> Dataset<T>.invoke(column: KProperty1<T, U>): TypedColumn<T, U> = col(column)
716735

736+
/**
737+
* Allows to sort data class dataset on one or more of the properties of the data class.
738+
* ```kotlin
739+
* val sorted: Dataset<YourClass> = unsorted.sort(YourClass::a)
740+
* val sorted2: Dataset<YourClass> = unsorted.sort(YourClass::a, YourClass::b)
741+
* ```
742+
*/
743+
fun <T> Dataset<T>.sort(col: KProperty1<T, *>, vararg cols: KProperty1<T, *>): Dataset<T> =
744+
sort(col.name, *cols.map { it.name }.toTypedArray())
745+
717746
/**
718747
* Alternative to [Dataset.show] which returns source dataset.
719748
* Useful for debug purposes when you need to view content of a dataset as an intermediate operation

kotlin-spark-api/3.0/src/test/kotlin/org/jetbrains/kotlinx/spark/api/ApiTest.kt

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -477,6 +477,39 @@ class ApiTest : ShouldSpec({
477477
)
478478
dataset.show()
479479
}
480+
should("Allow simple forEachPartition in datasets") {
481+
val dataset = dsOf(
482+
SomeClass(intArrayOf(1, 2, 3), 1),
483+
SomeClass(intArrayOf(4, 3, 2), 1),
484+
)
485+
dataset.forEachPartition {
486+
it.forEach {
487+
it.b shouldBe 1
488+
}
489+
}
490+
}
491+
should("Have easier access to keys and values for key/value datasets") {
492+
val dataset: Dataset<SomeClass> = dsOf(
493+
SomeClass(intArrayOf(1, 2, 3), 1),
494+
SomeClass(intArrayOf(4, 3, 2), 1),
495+
)
496+
.groupByKey { it.b }
497+
.reduceGroups(func = { a, b -> SomeClass(a.a + b.a, a.b) })
498+
.takeValues()
499+
500+
dataset.count() shouldBe 1
501+
}
502+
should("Be able to sort datasets with property reference") {
503+
val dataset: Dataset<SomeClass> = dsOf(
504+
SomeClass(intArrayOf(1, 2, 3), 2),
505+
SomeClass(intArrayOf(4, 3, 2), 1),
506+
)
507+
dataset.sort(SomeClass::b)
508+
dataset.takeAsList(1).first().b shouldBe 2
509+
510+
dataset.sort(SomeClass::a, SomeClass::b)
511+
dataset.takeAsList(1).first().b shouldBe 2
512+
}
480513
}
481514
}
482515
})

0 commit comments

Comments
 (0)