Skip to content

Commit da43ddc

Browse files
authored
Merge pull request #29 from michaelraney/main
Count Rows in Table
2 parents b2b23b6 + 0bccd51 commit da43ddc

File tree

6 files changed

+437
-7
lines changed

6 files changed

+437
-7
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
## Using Glue Count Example
2+
This example provides scala script for counting the number of rows in a Amazon Keyspaces table using AWS Glue. This is common utility in verifying data in tables during migration or after bulk import.
3+
4+
## Prerequisites
5+
* Amazon Keyspaces table to count
6+
* Amazon S3 bucket to store required jars
7+
* Amazon S3 bucket to store job configuration and script
8+
* Amazon S3 bucket to store glue shuffle data
9+
10+
### Counting the number of rows in Amazon Keyspaces
11+
The following example uses the spark-cassandra-connector. The script takes three parameters KEYSPACE_NAME, KEYSPACE_TABLE, and DRIVER_CONF. DRIVER_CONF is the java driver external application.conf where all connection information is maintained.
12+
13+
14+
```
15+
import com.amazonaws.services.glue.GlueContext
16+
import com.amazonaws.services.glue.log.GlueLogger
17+
import com.amazonaws.services.glue.util.GlueArgParser
18+
import com.amazonaws.services.glue.util.Job
19+
import org.apache.spark.SparkContext
20+
import org.apache.spark.SparkConf
21+
import org.apache.spark.sql.Dataset
22+
import org.apache.spark.sql.Row
23+
import org.apache.spark.sql.SaveMode
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.functions.from_json
26+
import org.apache.spark.sql.streaming.Trigger
27+
import scala.collection.JavaConverters._
28+
import com.datastax.spark.connector._
29+
import org.apache.spark.sql.cassandra._
30+
import org.apache.spark.sql.SaveMode._
31+
32+
object GlueApp {
33+
34+
def main(sysArgs: Array[String]) {
35+
36+
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF").toArray)
37+
38+
val driverConfFileName = args("DRIVER_CONF")
39+
40+
val conf = new SparkConf()
41+
.setAll(
42+
Seq(
43+
("spark.cassandra.connection.config.profile.path", driverConfFileName),
44+
("spark.cassandra.query.retry.count", "100"),
45+
46+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
47+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
48+
("spark.cassandra.concurrent.reads", "512"),
49+
50+
("spark.cassandra.output.concurrent.writes", "5"),
51+
("spark.cassandra.output.batch.grouping.key", "none"),
52+
("spark.cassandra.output.batch.size.rows", "1")
53+
))
54+
55+
56+
val spark: SparkContext = new SparkContext(conf)
57+
val glueContext: GlueContext = new GlueContext(spark)
58+
val sparkSession: SparkSession = glueContext.getSparkSession
59+
60+
import com.datastax.spark.connector._
61+
import org.apache.spark.sql.cassandra._
62+
import sparkSession.implicits._
63+
64+
Job.init(args("JOB_NAME"), glueContext, args.asJava)
65+
66+
val tableName = args("TABLE_NAME")
67+
val keyspaceName = args("KEYSPACE_NAME")
68+
69+
val tableDf = sparkSession.read
70+
.format("org.apache.spark.sql.cassandra")
71+
.options(Map( "table" -> tableName, "keyspace" -> keyspaceName))
72+
.load()
73+
74+
val total = tableDf.toJavaRDD.count()
75+
76+
val logger = new GlueLogger
77+
78+
logger.info("Total number of rows in table:" + total)
79+
80+
Job.commit()
81+
}
82+
}
83+
84+
```
85+
86+
87+
## Create IAM ROLE for AWS Glue
88+
Create a new AWS service role named 'GlueKeyspacesRole' with AWS Glue as a trusted entity.
89+
90+
Included is a sample permissions-policy for executing Glue job. You can use managed policies AWSGlueServiceRole, AmazonKeyspacesReadOnlyAccess, read access to S3 bucket containing spack-cassandra-connector jar and configuration.
91+
92+
93+
## Cassandra driver configuration to connect to Amazon Keyspaces
94+
The following configuration for connecting to Amazon Keyspaces with the spark-cassandra connector.
95+
96+
Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second. With this configuration and G.1X DPU you will achieve 1000 request per Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.
97+
98+
```
99+
100+
datastax-java-driver {
101+
basic.request.consistency = "LOCAL_ONE"
102+
basic.request.default-idempotence = true
103+
basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"]
104+
advanced.reconnect-on-init = true
105+
106+
basic.load-balancing-policy {
107+
local-datacenter = "us-east-1"
108+
}
109+
110+
advanced.auth-provider = {
111+
class = PlainTextAuthProvider
112+
username = "user-at-sample"
113+
password = "SAMPLE#PASSWORD"
114+
}
115+
116+
advanced.throttler = {
117+
class = RateLimitingRequestThrottler
118+
max-requests-per-second = 1000
119+
max-queue-size = 50000
120+
drain-interval = 1 millisecond
121+
}
122+
123+
advanced.ssl-engine-factory {
124+
class = DefaultSslEngineFactory
125+
hostname-validation = false
126+
}
127+
128+
advanced.connection.pool.local.size = 2
129+
advanced.resolve-contact-points = false
130+
131+
}
132+
133+
```
134+
135+
## Create S3 bucket to store job artifacts
136+
The AWS Glue ETL job will need to access jar dependencies, driver configuration, and scala script. You can use the same bucket to store backups.
137+
```
138+
aws s3 mb s3://amazon-keyspaces-artifacts
139+
```
140+
141+
## Create S3 bucket for shuffle space
142+
With NoSQL its common to shuffle large sets of data. This can overflow local disk. With AWS Glue, you can use Amazon S3 to store Spark shuffle and spill data. This solution disaggregates compute and storage for your Spark jobs, and gives complete elasticity and low-cost shuffle storage, allowing you to run your most shuffle-intensive workloads reliably.
143+
144+
```
145+
aws s3 mb s3://amazon-keyspaces-glue-shuffle
146+
```
147+
148+
## Upload job artifacts to S3
149+
The job will require
150+
* The spark-cassandra-connector to allow reads from Amazon Keyspaces. Amazon Keyspaces recommends version 2.5.2 or above of the spark-cassandra-connector.
151+
* creation of S3 buckets
152+
* cassandra-application.conf containing the cassandra driver configuration for Keyspaces access
153+
* count-example.scala script containing the count code.
154+
155+
```
156+
curl -L -O https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-assembly_2.11/2.5.2/spark-cassandra-connector-assembly_2.11-2.5.2.jar
157+
158+
aws s3api put-object --bucket amazon-keyspaces-artifacts --key jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar --body spark-cassandra-connector-assembly_2.11-2.5.2.jar
159+
160+
aws s3api put-object --bucket amazon-keyspaces-artifacts --key conf/cassandra-application.conf --body cassandra-application.conf
161+
162+
aws s3api put-object --bucket amazon-keyspaces-artifacts --key scripts/count-example.scala --body count-example.scala
163+
164+
```
165+
### Create AWS Glue ETL Job
166+
You can use the following command to create a glue job using the script provided in this example. You can also take the parameters and enter them into the AWS Console.
167+
```
168+
aws glue create-job \
169+
--name "AmazonKeyspacesCount" \
170+
--role "GlueKeyspacesRestore" \
171+
--description "Count Rows in Amazon Keyspaces table" \
172+
--glue-version "2.0" \
173+
--number-of-workers 5 \
174+
--worker-type "G.1X" \
175+
--command "Name=glueetl,ScriptLocation=s3://amazon-keyspaces-artifacts/scripts/count-example.scala" \
176+
--default-arguments '{
177+
"--job-language":"scala",
178+
"--KEYSPACE_NAME":"my_keyspace",
179+
"--TABLE_NAME":"my_table",
180+
"--DRIVER_CONF":"cassandra-application.conf",
181+
"--extra-jars":"s3://amazon-keyspaces-artifacts/jars/spark-cassandra-connector-assembly_2.11-2.5.2.jar",
182+
"--extra-files":"s3://amazon-keyspaces-artifacts/conf/cassandra-application.conf",
183+
"--enable-continuous-cloudwatch-log":"true",
184+
"--write-shuffle-files-to-s3":"true",
185+
"--write-shuffle-spills-to-s3":"true",
186+
"--TempDir":"s3://amazon-keyspaces-glue-shuffle",
187+
"--class":"GlueApp"
188+
}'
189+
```
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
2+
datastax-java-driver {
3+
basic.request.consistency = "LOCAL_ONE"
4+
basic.request.default-idempotence = true
5+
basic.contact-points = [ "cassandra.us-east-1.amazonaws.com:9142"]
6+
advanced.reconnect-on-init = true
7+
8+
basic.load-balancing-policy {
9+
local-datacenter = "us-east-1"
10+
}
11+
12+
advanced.auth-provider = {
13+
class = PlainTextAuthProvider
14+
username = "user-at-sample"
15+
password = "SAMPLE#PASSWORD"
16+
}
17+
18+
advanced.throttler = {
19+
class = RateLimitingRequestThrottler
20+
max-requests-per-second = 1000
21+
max-queue-size = 50000
22+
drain-interval = 1 millisecond
23+
}
24+
25+
advanced.ssl-engine-factory {
26+
class = DefaultSslEngineFactory
27+
hostname-validation = false
28+
}
29+
30+
advanced.connection.pool.local.size = 2
31+
advanced.resolve-contact-points = false
32+
33+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import com.amazonaws.services.glue.GlueContext
2+
import com.amazonaws.services.glue.log.GlueLogger
3+
import com.amazonaws.services.glue.util.GlueArgParser
4+
import com.amazonaws.services.glue.util.Job
5+
import org.apache.spark.SparkContext
6+
import org.apache.spark.SparkConf
7+
import org.apache.spark.sql.Dataset
8+
import org.apache.spark.sql.Row
9+
import org.apache.spark.sql.SaveMode
10+
import org.apache.spark.sql.SparkSession
11+
import org.apache.spark.sql.functions.from_json
12+
import org.apache.spark.sql.streaming.Trigger
13+
import scala.collection.JavaConverters._
14+
import com.datastax.spark.connector._
15+
import org.apache.spark.sql.cassandra._
16+
import org.apache.spark.sql.SaveMode._
17+
18+
object GlueApp {
19+
20+
def main(sysArgs: Array[String]) {
21+
22+
val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "KEYSPACE_NAME", "TABLE_NAME", "DRIVER_CONF").toArray)
23+
24+
val driverConfFileName = args("DRIVER_CONF")
25+
26+
val conf = new SparkConf()
27+
.setAll(
28+
Seq(
29+
("spark.cassandra.connection.config.profile.path", driverConfFileName),
30+
("spark.cassandra.query.retry.count", "100"),
31+
32+
("spark.cassandra.sql.inClauseToJoinConversionThreshold", "0"),
33+
("spark.cassandra.sql.inClauseToFullScanConversionThreshold", "0"),
34+
("spark.cassandra.concurrent.reads", "512"),
35+
36+
("spark.cassandra.output.concurrent.writes", "5"),
37+
("spark.cassandra.output.batch.grouping.key", "none"),
38+
("spark.cassandra.output.batch.size.rows", "1")
39+
))
40+
41+
42+
val spark: SparkContext = new SparkContext(conf)
43+
val glueContext: GlueContext = new GlueContext(spark)
44+
val sparkSession: SparkSession = glueContext.getSparkSession
45+
46+
import com.datastax.spark.connector._
47+
import org.apache.spark.sql.cassandra._
48+
import sparkSession.implicits._
49+
50+
Job.init(args("JOB_NAME"), glueContext, args.asJava)
51+
52+
val tableName = args("TABLE_NAME")
53+
val keyspaceName = args("KEYSPACE_NAME")
54+
55+
56+
val tableDf = sparkSession.read
57+
.format("org.apache.spark.sql.cassandra")
58+
.options(Map( "table" -> tableName, "keyspace" -> keyspaceName))
59+
.load()
60+
61+
val total = tableDf.toJavaRDD.count()
62+
63+
val logger = new GlueLogger
64+
65+
logger.info("Total number of rows in table:" + total)
66+
67+
Job.commit()
68+
}
69+
}

0 commit comments

Comments
 (0)