Skip to content

Commit f16b56a

Browse files
committed
update example for common spark config paramters
update example for common spark config paramters
1 parent 47ac068 commit f16b56a

File tree

3 files changed

+82
-21
lines changed

3 files changed

+82
-21
lines changed

scala/datastax-v4/aws-glue/export-to-s3/README.md

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,45 @@ The following example exports data to S3 using the spark-cassandra-connector. Th
1111

1212

1313
```
14+
import com.amazonaws.services.glue.GlueContext
15+
import com.amazonaws.services.glue.util.GlueArgParser
16+
import com.amazonaws.services.glue.util.Job
17+
import org.apache.spark.SparkContext
18+
import org.apache.spark.SparkConf
19+
import org.apache.spark.sql.Dataset
20+
import org.apache.spark.sql.Row
21+
import org.apache.spark.sql.SaveMode
22+
import org.apache.spark.sql.SparkSession
23+
import org.apache.spark.sql.functions.from_json
24+
import org.apache.spark.sql.streaming.Trigger
25+
import scala.collection.JavaConverters._
26+
import com.datastax.spark.connector._
27+
import org.apache.spark.sql.cassandra._
28+
import org.apache.spark.sql.SaveMode._
29+
30+
31+
1432
object GlueApp {
1533
1634
def main(sysArgs: Array[String]) {
1735
18-
val spark: SparkContext = new SparkContext()
36+
val conf = new SparkConf()
37+
.setAll(
38+
Seq(
39+
("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"),
40+
("spark.cassandra.query.retry.count", "100"),
41+
42+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
43+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
44+
("spark.cassandra.concurrent.reads", "512"),
45+
46+
("spark.cassandra.output.concurrent.writes", "5"),
47+
("spark.cassandra.output.batch.grouping.key", "none"),
48+
("spark.cassandra.output.batch.size.rows", "1")
49+
))
50+
51+
52+
val spark: SparkContext = new SparkContext(conf)
1953
val glueContext: GlueContext = new GlueContext(spark)
2054
val sparkSession: SparkSession = glueContext.getSparkSession
2155
@@ -57,32 +91,40 @@ The following configuration for connecting to Amazon Keyspaces with the spark-ca
5791
Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second. With this configuration and G.1X DPU you will achieve 1000 request per Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.
5892

5993
```
94+
6095
datastax-java-driver {
61-
basic.request.consistency = "LOCAL_QUORUM"
96+
basic.request.consistency = "LOCAL_ONE"
97+
basic.request.default-idempotence = true
6298
basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"]
63-
64-
advanced.reconnect-on-init = true
99+
advanced.reconnect-on-init = true
65100
66101
basic.load-balancing-policy {
67102
local-datacenter = "us-east-1"
68-
}
69-
advanced.auth-provider = {
103+
}
104+
105+
advanced.auth-provider = {
70106
class = PlainTextAuthProvider
71107
username = "user-at-sample"
72-
password = "S@MPLE=PASSWORD="
108+
password = "SAMPLE#PASSWORD"
73109
}
110+
74111
advanced.throttler = {
75-
class = RateLimitingRequestThrottler
76-
max-requests-per-second = 1000
77-
max-queue-size = 50000
78-
drain-interval = 1 millisecond
112+
class = RateLimitingRequestThrottler
113+
max-requests-per-second = 1000
114+
max-queue-size = 50000
115+
drain-interval = 1 millisecond
79116
}
80-
advanced.ssl-engine-factory {
117+
118+
advanced.ssl-engine-factory {
81119
class = DefaultSslEngineFactory
82120
hostname-validation = false
83121
}
84-
advanced.connection.pool.local.size = 1
122+
123+
advanced.connection.pool.local.size = 2
124+
advanced.resolve-contact-points = false
125+
85126
}
127+
86128
```
87129

88130
## Create S3 bucket to store job artifacts
@@ -93,7 +135,7 @@ aws s3 mb s3://amazon-keyspaces-backups
93135

94136
## Upload job artifacts to S3
95137
The job will require
96-
* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above.
138+
* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 of the spark-cassandra-connector or above.
97139
* application.conf containing the cassandra driver configuration for Keyspaces access
98140
* export-sample.scala script containing the export code.
99141

@@ -102,7 +144,7 @@ curl -L -O https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-con
102144
103145
aws s3api put-object --bucket amazon-keyspaces-backups --key jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar --body spark-cassandra-connector-assembly_2.11-2.5.2.jar
104146
105-
aws s3api put-object --bucket amazon-keyspaces-backups --key conf/application.conf --body application.conf
147+
aws s3api put-object --bucket amazon-keyspaces-backups --key conf/cassandra-application.conf --body cassandra-application.conf
106148
107149
aws s3api put-object --bucket amazon-keyspaces-backups --key scripts/export-sample.scala --body export-sample.scala
108150
@@ -124,9 +166,9 @@ aws glue create-job \
124166
"--KEYSPACE_NAME":"my_keyspace",
125167
"--TABLE_NAME":"my_table",
126168
"--S3_URI":"s3://amazon-keyspaces-backups/snap-shots/",
169+
"--DRIVER_CONF":"cassandra-application.conf",
127170
"--extra-jars":"s3://amazon-keyspaces-backups/jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar",
128171
"--extra-files":"s3://amazon-keyspaces-backups/conf/application.conf",
129-
"--conf":"spark.cassandra.connection.config.profile.path=application.conf",
130172
"--class":"GlueApp"
131173
}'
132174
```

scala/datastax-v4/aws-glue/export-to-s3/application.conf renamed to scala/datastax-v4/aws-glue/export-to-s3/cassandra-application.conf

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,14 @@ datastax-java-driver {
1212
advanced.auth-provider = {
1313
class = PlainTextAuthProvider
1414
username = "user-at-sample"
15-
password = "S@MPLE=PASSWORD="
15+
password = "SAMPLE#PASSWORD"
1616
}
1717

1818
advanced.throttler = {
19-
class = RateLimitingRequestThrottler
20-
max-requests-per-second = 1000
21-
drain-interval = 10 milliseconds
19+
class = RateLimitingRequestThrottler
20+
max-requests-per-second = 1000
21+
max-queue-size = 50000
22+
drain-interval = 1 millisecond
2223
}
2324

2425
advanced.ssl-engine-factory {
@@ -27,5 +28,6 @@ datastax-java-driver {
2728
}
2829

2930
advanced.connection.pool.local.size = 2
31+
advanced.resolve-contact-points = false
3032

3133
}

scala/datastax-v4/aws-glue/export-to-s3/export-sample.scala

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import com.amazonaws.services.glue.GlueContext
22
import com.amazonaws.services.glue.util.GlueArgParser
33
import com.amazonaws.services.glue.util.Job
44
import org.apache.spark.SparkContext
5+
import org.apache.spark.SparkConf
56
import org.apache.spark.sql.Dataset
67
import org.apache.spark.sql.Row
78
import org.apache.spark.sql.SaveMode
@@ -19,7 +20,23 @@ object GlueApp {
1920

2021
def main(sysArgs: Array[String]) {
2122

22-
val spark: SparkContext = new SparkContext()
23+
val conf = new SparkConf()
24+
.setAll(
25+
Seq(
26+
("spark.cassandra.connection.config.profile.path", "cassandra-application.conf"),
27+
("spark.cassandra.query.retry.count", "100"),
28+
29+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
30+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
31+
("spark.cassandra.concurrent.reads", "512"),
32+
33+
("spark.cassandra.output.concurrent.writes", "5"),
34+
("spark.cassandra.output.batch.grouping.key", "none"),
35+
("spark.cassandra.output.batch.size.rows", "1")
36+
))
37+
38+
39+
val spark: SparkContext = new SparkContext(conf)
2340
val glueContext: GlueContext = new GlueContext(spark)
2441
val sparkSession: SparkSession = glueContext.getSparkSession
2542

0 commit comments

Comments
 (0)