Skip to content

Commit 5ea31c4

Browse files
committed
chore" fixes imports, reformats
1 parent 5490104 commit 5ea31c4

File tree

16 files changed

+256
-181
lines changed

16 files changed

+256
-181
lines changed

core/2.4/src/main/scala/org/apache/spark/sql/catalyst/KotlinReflection.scala

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@
1717

1818
package org.apache.spark.sql.catalyst
1919

20-
import java.beans.{Introspector, PropertyDescriptor}
21-
import java.lang.reflect.Type
22-
import java.lang.{Iterable => JIterable}
23-
import java.time.LocalDate
24-
import java.util.{Iterator => JIterator, List => JList, Map => JMap}
2520
import com.google.common.reflect.TypeToken
2621
import org.apache.spark.sql.catalyst.analysis.{GetColumnByOrdinal, UnresolvedAttribute, UnresolvedExtractValue}
2722
import org.apache.spark.sql.catalyst.expressions._
@@ -31,6 +26,11 @@ import org.apache.spark.sql.types._
3126
import org.apache.spark.sql.{ComplexWrapper, DataTypeWithClass, KDataTypeWrapper, KStructField}
3227
import org.apache.spark.unsafe.types.UTF8String
3328

29+
import java.beans.{Introspector, PropertyDescriptor}
30+
import java.lang.reflect.Type
31+
import java.lang.{Iterable => JIterable}
32+
import java.time.LocalDate
33+
import java.util.{Iterator => JIterator, List => JList, Map => JMap}
3434
import scala.language.existentials
3535

3636
/**
@@ -276,7 +276,9 @@ object KotlinReflection {
276276
}.getOrElse {
277277
Invoke(
278278
MapObjects(
279-
p => {deserializerFor(typeToken.getComponentType, Some(p), maybeType.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper]))},
279+
p => {
280+
deserializerFor(typeToken.getComponentType, Some(p), maybeType.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper]))
281+
},
280282
getPath,
281283
maybeType.filter(_.isInstanceOf[ComplexWrapper]).map(_.asInstanceOf[ComplexWrapper].dt).getOrElse(inferDataType(elementType)._1)
282284
),
@@ -398,7 +400,7 @@ object KotlinReflection {
398400
getPath,
399401
customCollectionCls = Some(predefinedDt.get.cls))
400402

401-
case StructType(elementType: Array[StructField]) =>
403+
case StructType(elementType: Array[StructField]) =>
402404
val cls = t.cls
403405

404406
val arguments = elementType.map { field =>

core/2.4/src/main/scala/org/jetbrains/kotlinx/spark/extensions/KSparkExtensions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
*/
2020
package org.jetbrains.kotlinx.spark.extensions
2121

22-
import java.util
23-
2422
import org.apache.spark.SparkContext
2523
import org.apache.spark.sql._
2624

25+
import java.util
2726
import scala.collection.JavaConverters._
2827

2928
object KSparkExtensions {

core/3.0/src/main/scala/org/apache/spark/sql/KotlinReflection.scala

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -441,28 +441,28 @@ object KotlinReflection extends KotlinReflection {
441441

442442
UnresolvedMapObjects(mapFunction, path, customCollectionCls = Some(t.cls))
443443

444-
case StructType(elementType: Array[StructField]) =>
444+
case StructType(elementType: Array[StructField]) =>
445445
val cls = t.cls
446446

447447
val arguments = elementType.map { field =>
448-
val dataType = field.dataType.asInstanceOf[DataTypeWithClass]
449-
val nullable = dataType.nullable
450-
val clsName = getClassNameFromType(getType(dataType.cls))
451-
val newTypePath = walkedTypePath.recordField(clsName, field.name)
452-
453-
// For tuples, we based grab the inner fields by ordinal instead of name.
454-
val newPath = deserializerFor(
455-
getType(dataType.cls),
456-
addToPath(path, field.name, dataType.dt, newTypePath),
457-
newTypePath,
458-
Some(dataType).filter(_.isInstanceOf[ComplexWrapper])
459-
)
460-
expressionWithNullSafety(
461-
newPath,
462-
nullable = nullable,
463-
newTypePath
464-
)
465-
}
448+
val dataType = field.dataType.asInstanceOf[DataTypeWithClass]
449+
val nullable = dataType.nullable
450+
val clsName = getClassNameFromType(getType(dataType.cls))
451+
val newTypePath = walkedTypePath.recordField(clsName, field.name)
452+
453+
// For tuples, we based grab the inner fields by ordinal instead of name.
454+
val newPath = deserializerFor(
455+
getType(dataType.cls),
456+
addToPath(path, field.name, dataType.dt, newTypePath),
457+
newTypePath,
458+
Some(dataType).filter(_.isInstanceOf[ComplexWrapper])
459+
)
460+
expressionWithNullSafety(
461+
newPath,
462+
nullable = nullable,
463+
newTypePath
464+
)
465+
}
466466
val newInstance = NewInstance(cls, arguments, ObjectType(cls), propagateNull = false)
467467

468468
org.apache.spark.sql.catalyst.expressions.If(

core/3.0/src/main/scala/org/jetbrains/kotinx/spark/extensions/KSparkExtensions.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,10 @@
1919
*/
2020
package org.jetbrains.kotinx.spark.extensions
2121

22-
import java.util
23-
2422
import org.apache.spark.SparkContext
2523
import org.apache.spark.sql._
2624

25+
import java.util
2726
import scala.collection.JavaConverters
2827

2928
object KSparkExtensions {

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Collect.kt

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,22 @@ fun main() {
2727
val sd = dsOf(1, 2, 3)
2828
sd.createOrReplaceTempView("ds")
2929
spark.sql("select * from ds")
30-
.withCached {
31-
println("asList: ${toList<Int>()}")
32-
println("asArray: ${toArray<Int>().contentToString()}")
33-
this
34-
}
35-
.to<Int>()
36-
.withCached {
37-
println("typed collect: " + (collect() as Array<Int>).contentToString())
38-
println("type collectAsList: " + collectAsList())
39-
}
30+
.withCached {
31+
println("asList: ${toList<Int>()}")
32+
println("asArray: ${toArray<Int>().contentToString()}")
33+
this
34+
}
35+
.to<Int>()
36+
.withCached {
37+
println("typed collect: " + (collect() as Array<Int>).contentToString())
38+
println("type collectAsList: " + collectAsList())
39+
}
4040

4141
dsOf(1, 2, 3)
42-
.map { c(it, it + 1, it + 2) }
43-
.to<Row>()
44-
.select("_1")
45-
.collectAsList()
46-
.forEach { println(it) }
42+
.map { c(it, it + 1, it + 2) }
43+
.to<Row>()
44+
.select("_1")
45+
.collectAsList()
46+
.forEach { println(it) }
4747
}
4848
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Join.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ fun main() {
3232
val first = dsOf(Left(1, "a"), Left(2, "b"))
3333
val second = dsOf(Right(1, 100), Right(3, 300))
3434
first
35-
.leftJoin(second, first.col("id").eq(second.col("id")))
36-
.debugCodegen()
37-
.also { it.show() }
38-
.map { c(it.first.id, it.first.name, it.second?.value) }
39-
.show()
35+
.leftJoin(second, first.col("id").eq(second.col("id")))
36+
.debugCodegen()
37+
.also { it.show() }
38+
.map { c(it.first.id, it.first.name, it.second?.value) }
39+
.show()
4040

4141
}
4242
}

examples/src/main/kotlin/org/jetbrains/kotlinx/spark/examples/Main.kt

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -29,34 +29,34 @@ object Main {
2929
fun main(args: Array<String>) {
3030

3131
val spark = SparkSession
32-
.builder()
33-
.master("local[2]")
34-
.appName("Simple Application").orCreate
32+
.builder()
33+
.master("local[2]")
34+
.appName("Simple Application").orCreate
3535

3636
val triples = spark
37-
.toDS(listOf(Q(1, 1 to null), Q(2, 2 to "22"), Q(3, 3 to "333")))
38-
.map { (a, b) -> a + b.first to b.second?.length }
39-
.map { it to 1 }
40-
.map { (a, b) -> Triple(a.first, a.second, b) }
37+
.toDS(listOf(Q(1, 1 to null), Q(2, 2 to "22"), Q(3, 3 to "333")))
38+
.map { (a, b) -> a + b.first to b.second?.length }
39+
.map { it to 1 }
40+
.map { (a, b) -> Triple(a.first, a.second, b) }
4141

4242

4343
val pairs = spark
44-
.toDS(listOf(2 to "hell", 4 to "moon", 6 to "berry"))
44+
.toDS(listOf(2 to "hell", 4 to "moon", 6 to "berry"))
4545

4646
triples
47-
.leftJoin(pairs, triples.col("first").multiply(2).eq(pairs.col("first")))
47+
.leftJoin(pairs, triples.col("first").multiply(2).eq(pairs.col("first")))
4848
// .also { it.printSchema() }
49-
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
50-
.groupByKey { it.a }
51-
.reduceGroups(ReduceFunction { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) })
52-
.map { it._2 }
53-
.repartition(1)
54-
.withCached {
55-
write()
56-
.also { it.csv("csvpath") }
57-
.also { it.orc("orcpath") }
58-
showDS()
59-
}
49+
.map { (triple, pair) -> Five(triple.first, triple.second, triple.third, pair?.first, pair?.second) }
50+
.groupByKey { it.a }
51+
.reduceGroups(ReduceFunction { v1, v2 -> v1.copy(a = v1.a + v2.a, b = v1.a + v2.a) })
52+
.map { it._2 }
53+
.repartition(1)
54+
.withCached {
55+
write()
56+
.also { it.csv("csvpath") }
57+
.also { it.orc("orcpath") }
58+
showDS()
59+
}
6060

6161

6262

kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/ApiV1.kt

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ import org.apache.spark.sql.streaming.GroupStateTimeout
3636
import org.apache.spark.sql.streaming.OutputMode
3737
import org.apache.spark.sql.types.*
3838
import org.jetbrains.kotlinx.spark.extensions.KSparkExtensions
39-
import scala.*
39+
import scala.Product
40+
import scala.Tuple2
4041
import scala.collection.Seq
4142
import scala.reflect.`ClassTag$`
4243
import java.beans.PropertyDescriptor
@@ -47,10 +48,41 @@ import java.time.Instant
4748
import java.time.LocalDate
4849
import java.util.*
4950
import java.util.concurrent.ConcurrentHashMap
51+
import kotlin.Any
52+
import kotlin.Array
53+
import kotlin.Boolean
54+
import kotlin.BooleanArray
55+
import kotlin.Byte
56+
import kotlin.ByteArray
57+
import kotlin.Deprecated
58+
import kotlin.DeprecationLevel
59+
import kotlin.Double
60+
import kotlin.DoubleArray
61+
import kotlin.ExperimentalStdlibApi
62+
import kotlin.Float
63+
import kotlin.FloatArray
64+
import kotlin.IllegalArgumentException
65+
import kotlin.Int
66+
import kotlin.IntArray
67+
import kotlin.Long
68+
import kotlin.LongArray
69+
import kotlin.OptIn
70+
import kotlin.Pair
71+
import kotlin.ReplaceWith
72+
import kotlin.Short
73+
import kotlin.ShortArray
74+
import kotlin.String
75+
import kotlin.Suppress
76+
import kotlin.Triple
77+
import kotlin.Unit
78+
import kotlin.also
79+
import kotlin.apply
80+
import kotlin.invoke
5081
import kotlin.reflect.*
5182
import kotlin.reflect.full.findAnnotation
5283
import kotlin.reflect.full.isSubclassOf
5384
import kotlin.reflect.full.primaryConstructor
85+
import kotlin.to
5486

5587
@JvmField
5688
val ENCODERS = mapOf<KClass<*>, Encoder<*>>(

kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/Conversions.kt

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,8 @@ fun <A, B> Map<A, B>.asScalaMap(): ScalaMap<A, B> = JavaConversions.mapAsScalaMa
7878
/**
7979
* @see JavaConversions.mapAsScalaConcurrentMap for more information.
8080
*/
81-
fun <A, B> ConcurrentMap<A, B>.asScalaConcurrentMap(): ScalaConcurrentMap<A, B> = JavaConversions.mapAsScalaConcurrentMap<A, B>(this)
81+
fun <A, B> ConcurrentMap<A, B>.asScalaConcurrentMap(): ScalaConcurrentMap<A, B> =
82+
JavaConversions.mapAsScalaConcurrentMap<A, B>(this)
8283

8384
/**
8485
* @see JavaConversions.dictionaryAsScalaMap for more information.
@@ -139,7 +140,8 @@ fun <A> ScalaSet<A>.asKotlinSet(): Set<A> = JavaConversions.setAsJavaSet<A>(this
139140
/**
140141
* @see JavaConversions.mutableMapAsJavaMap for more information.
141142
*/
142-
fun <A, B> ScalaMutableMap<A, B>.asKotlinMutableMap(): MutableMap<A, B> = JavaConversions.mutableMapAsJavaMap<A, B>(this)
143+
fun <A, B> ScalaMutableMap<A, B>.asKotlinMutableMap(): MutableMap<A, B> =
144+
JavaConversions.mutableMapAsJavaMap<A, B>(this)
143145

144146
/**
145147
* @see JavaConversions.asJavaDictionary for more information.
@@ -154,5 +156,6 @@ fun <A, B> ScalaMap<A, B>.asKotlinMap(): Map<A, B> = JavaConversions.mapAsJavaMa
154156
/**
155157
* @see JavaConversions.mapAsJavaConcurrentMap for more information.
156158
*/
157-
fun <A, B> ScalaConcurrentMap<A, B>.asKotlinConcurrentMap(): ConcurrentMap<A, B> = JavaConversions.mapAsJavaConcurrentMap<A, B>(this)
159+
fun <A, B> ScalaConcurrentMap<A, B>.asKotlinConcurrentMap(): ConcurrentMap<A, B> =
160+
JavaConversions.mapAsJavaConcurrentMap<A, B>(this)
158161

kotlin-spark-api/2.4/src/main/kotlin/org/jetbrains/kotlinx/spark/api/SparkHelper.kt

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -31,37 +31,43 @@ import org.jetbrains.kotlinx.spark.api.SparkLogLevel.ERROR
3131
* @param func function which will be executed in context of [KSparkSession] (it means that `this` inside block will point to [KSparkSession])
3232
*/
3333
@JvmOverloads
34-
inline fun withSpark(props: Map<String, Any> = emptyMap(), master: String = "local[*]", appName: String = "Kotlin Spark Sample", logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) {
34+
inline fun withSpark(
35+
props: Map<String, Any> = emptyMap(),
36+
master: String = "local[*]",
37+
appName: String = "Kotlin Spark Sample",
38+
logLevel: SparkLogLevel = ERROR,
39+
func: KSparkSession.() -> Unit,
40+
) {
3541
val builder = SparkSession
36-
.builder()
37-
.master(master)
38-
.appName(appName)
39-
.apply {
40-
props.forEach {
41-
when (val value = it.value) {
42-
is String -> config(it.key, value)
43-
is Boolean -> config(it.key, value)
44-
is Long -> config(it.key, value)
45-
is Double -> config(it.key, value)
46-
else -> throw IllegalArgumentException("Cannot set property ${it.key} because value $value of unsupported type ${value::class}")
47-
}
42+
.builder()
43+
.master(master)
44+
.appName(appName)
45+
.apply {
46+
props.forEach {
47+
when (val value = it.value) {
48+
is String -> config(it.key, value)
49+
is Boolean -> config(it.key, value)
50+
is Long -> config(it.key, value)
51+
is Double -> config(it.key, value)
52+
else -> throw IllegalArgumentException("Cannot set property ${it.key} because value $value of unsupported type ${value::class}")
4853
}
4954
}
55+
}
5056
withSpark(builder, logLevel, func)
5157

5258
}
5359

5460
@JvmOverloads
5561
inline fun withSpark(builder: Builder, logLevel: SparkLogLevel = ERROR, func: KSparkSession.() -> Unit) {
5662
builder
57-
.orCreate
58-
.apply {
59-
KSparkSession(this).apply {
60-
sparkContext.setLogLevel(logLevel)
61-
func()
62-
}
63+
.orCreate
64+
.apply {
65+
KSparkSession(this).apply {
66+
sparkContext.setLogLevel(logLevel)
67+
func()
6368
}
64-
.also { it.stop() }
69+
}
70+
.also { it.stop() }
6571
}
6672

6773
/**

0 commit comments

Comments
 (0)