1- # Migration to Amazon Keyspaces without a hassle
2- This example provides scala scripts for migrating the Cassandra workload to Amazon Keyspaces (for Apache Cassandra) using AWS Glue.
1+ # Migration to Amazon Keyspaces from Apache Cassandra
2+ This example provides scala scripts for migrating the Cassandra workload to Amazon Keyspaces (for Apache Cassandra) using AWS Glue.
33This allows you to migrate data from the Cassandra cluster to Amazon Keyspaces without setting up and provisioning a spark cluster.
44
55## Prerequisites
66* Cassandra source table
7- * Amazon Keyspaces's target table to replicate the workload
7+ * Amazon Keyspaces's target table to replicate the workload
88* Amazon S3 bucket to store intermediate parquet files with incremental data changes
99* Amazon S3 bucket to store job configuration and scripts
1010
11- ## Getting started
11+ ## Getting started
1212### Create a target keyspace and table in Amazon Keyspaces Console
1313
1414` CREATE KEYSPACE target_keyspace WITH replication = {'class': 'SingleRegionStrategy'} `
@@ -23,13 +23,13 @@ This allows you to migrate data from the Cassandra cluster to Amazon Keyspaces w
2323 email text,
2424 updatetime text,
2525PRIMARY KEY (userid, level, gameid)
26- ) WITH default_time_to_live = 0 AND CUSTOM_PROPERTIES =
26+ ) WITH default_time_to_live = 0 AND CUSTOM_PROPERTIES =
2727 {'capacity_mode':{ 'throughput_mode':'PROVISIONED',
2828 'write_capacity_units':76388,
29- 'read_capacity_units':3612 }}
29+ 'read_capacity_units':3612 }}
3030 AND CLUSTERING ORDER BY (level ASC, gameid ASC)`
3131
32- After the table was created consider to switch the table to on-demand mode to avoid unnecessary charges.
32+ After the table was created consider to switch the table to on-demand mode to avoid unnecessary charges.
3333
3434The following script will update the throughput mode.
3535
@@ -38,20 +38,20 @@ The following script will update the throughput mode.
3838}`
3939
4040### Export the Cassandra workload to S3 in parquet format
41- The following example offloads data to S3 using the spark-cassandra-connector.
41+ The following example offloads data to S3 using the spark-cassandra-connector.
4242The script takes four parameters KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.
4343
4444``` scala
4545object GlueApp {
4646 def main (sysArgs : Array [String ]) {
47-
47+
4848 val spark : SparkContext = new SparkContext ()
4949 val glueContext : GlueContext = new GlueContext (spark)
5050 val sparkSession : SparkSession = glueContext.getSparkSession
5151 import com .datastax .spark .connector ._
5252 import org .apache .spark .sql .cassandra ._
5353 import sparkSession .implicits ._
54-
54+
5555 // @params: [JOB_NAME, KEYSPACE_NAME, TABLE_NAME, S3_URI_FULL_CHANGE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, S3_URI_NEW_CHANGE]
5656 val args = GlueArgParser .getResolvedOptions(sysArgs, Seq (" JOB_NAME" , " KEYSPACE_NAME" , " TABLE_NAME" , " S3_URI_FULL_CHANGE" , " S3_URI_CURRENT_CHANGE" , " S3_URI_NEW_CHANGE" ).toArray)
5757 Job .init(args(" JOB_NAME" ), glueContext, args.asJava)
@@ -66,26 +66,26 @@ object GlueApp {
6666
6767 val dfSourceAsIs = spark.cassandraTable(keyspaceName, tableName)
6868 .select(" userid" ," level" ," gameid" ," description" ," nickname" ," zip" ," email" ," updatetime" , WriteTime (" email" ) as " writeTime" )
69-
69+
7070 // Cast to Spark datatypes, for example, all UDTs to String
71- val dfSourceWithCastDataTypes = dfSourceAsIs.keyBy(row => (row.getString(" userid" ),
72- row.getString(" level" ),
73- row.getInt(" gameid" ),
74- row.getString(" description" ),
75- row.getString(" nickname" ),
76- row.getString(" zip" ),
77- row.getString(" email" ),
78- row.getString(" updatetime" ),
71+ val dfSourceWithCastDataTypes = dfSourceAsIs.keyBy(row => (row.getString(" userid" ),
72+ row.getString(" level" ),
73+ row.getInt(" gameid" ),
74+ row.getString(" description" ),
75+ row.getString(" nickname" ),
76+ row.getString(" zip" ),
77+ row.getString(" email" ),
78+ row.getString(" updatetime" ),
7979 row.getStringOption(" writeTime" )))
8080 .map(x => x._1)
8181 .toDF(" userid" ," level" ," gameid" ," description" ," nickname" ," zip" ," email" ," updatetime" ," writeTime" )
82-
82+
8383 // Persist full dataset in parquet format to S3
8484 dfSourceWithCastDataTypes.drop(" writeTime" )
8585 .write
8686 .mode(SaveMode .Overwrite )
8787 .parquet(fullDataset)
88-
88+
8989 // Persist primarykeys and column writetimes to S3
9090 if (checkS3(incrementalCurrentDataset) && checkS3(incrementalNewDataset))
9191 {
@@ -102,32 +102,32 @@ object GlueApp {
102102 dfSourceWithCastDataTypes.select(" userid" ," level" ," gameid" ," writeTime" )
103103 .write.mode(SaveMode .Overwrite ).parquet(incrementalNewDataset)
104104 }
105-
105+
106106 Job .commit()
107107 }
108108}
109109```
110110
111111### Import the incremental workload to Amazon Keyspaces
112- The following example reads the incremental parquet files from S3.
112+ The following example reads the incremental parquet files from S3.
113113The script takes four parameters KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.
114114
115115``` scala
116116object S3ToKeyspaces {
117-
117+
118118 def main (sysArgs : Array [String ]) {
119-
119+
120120 val spark : SparkContext = new SparkContext ()
121121 val glueContext : GlueContext = new GlueContext (spark)
122122 val sparkSession : SparkSession = glueContext.getSparkSession
123123 import com .datastax .spark .connector ._
124124 import org .apache .spark .sql .cassandra ._
125125 import sparkSession .implicits ._
126-
126+
127127 // @params: [JOB_NAME, KEYSPACE_NAME, TABLE_NAME, S3_URI_FULL_CHANGE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, S3_URI_NEW_CHANGE]
128128 val args = GlueArgParser .getResolvedOptions(sysArgs, Seq (" JOB_NAME" , " KEYSPACE_NAME" , " TABLE_NAME" , " S3_URI_FULL_CHANGE" , " S3_URI_CURRENT_CHANGE" , " S3_URI_NEW_CHANGE" ).toArray)
129129 Job .init(args(" JOB_NAME" ), glueContext, args.asJava)
130-
130+
131131 def checkS3 (path: String ): Boolean = {
132132 FileSystem .get(URI .create(path), spark.hadoopConfiguration).exists(new Path (path))
133133 }
@@ -138,45 +138,45 @@ object S3ToKeyspaces {
138138 val incrementalCurrentDataset = args(" S3_URI_CURRENT_CHANGE" )
139139 val incrementalNewDataset = args(" S3_URI_NEW_CHANGE" )
140140 val fullDf = sparkSession.read.parquet(fullDataset)
141-
141+
142142 if (checkS3(incrementalCurrentDataset) && ! checkS3(incrementalNewDataset))
143143 {
144144 fullDf.write.format(" org.apache.spark.sql.cassandra" ).mode(" append" ).option(" keyspace" , keyspaceName).option(" table" , tableName).save()
145-
145+
146146 }
147-
147+
148148 if (checkS3(incrementalCurrentDataset) && checkS3(incrementalNewDataset))
149149 {
150150 val shortDataframeT1 = sparkSession.read.parquet(incrementalNewDataset)
151151 val shortDataframeT0 = sparkSession.read.parquet(incrementalCurrentDataset)
152-
153- val inserts = shortDataframeT1.as(" T1" ).join(shortDataframeT0.as(" T0" ),
154- $" T1.userid" === $" T0.userid" &&
155- $" T1.level" === $" T0.level" &&
152+
153+ val inserts = shortDataframeT1.as(" T1" ).join(shortDataframeT0.as(" T0" ),
154+ $" T1.userid" === $" T0.userid" &&
155+ $" T1.level" === $" T0.level" &&
156156 $" T1.gameid" === $" T0.gameid" , " leftanti" )
157- val finalInserts = inserts.as(" INSERTED" ).join(fullDf.as(" ORIGINAL" ),
158- $" INSERTED.userid" === $" ORIGINAL.userid" &&
159- $" INSERTED.level" === $" ORIGINAL.level" &&
157+ val finalInserts = inserts.as(" INSERTED" ).join(fullDf.as(" ORIGINAL" ),
158+ $" INSERTED.userid" === $" ORIGINAL.userid" &&
159+ $" INSERTED.level" === $" ORIGINAL.level" &&
160160 $" INSERTED.gameid" === $" ORIGINAL.gameid" , " inner" ).selectExpr(" ORIGINAL.*" ).drop(" writeTime" )
161161 finalInserts.write.format(" org.apache.spark.sql.cassandra" ).mode(" append" ).option(" keyspace" , keyspaceName).option(" table" , tableName).save()
162-
163- val updates = shortDataframeT0.as(" T0" ).join(shortDataframeT1.as(" T1" ),
164- $" T1.userid" === $" T0.userid" &&
165- $" T1.level" === $" T0.level" &&
162+
163+ val updates = shortDataframeT0.as(" T0" ).join(shortDataframeT1.as(" T1" ),
164+ $" T1.userid" === $" T0.userid" &&
165+ $" T1.level" === $" T0.level" &&
166166 $" T1.gameid" === $" T0.gameid" , " inner" )
167167 .filter($" T1.writeTime" > $" T0.writetime" ).select($" T1.userid" ,$" T1.name" ,$" T1.endpointid" , $" T1.writeTime" )
168- val finalUpdates = updates.as(" UPDATED" ).join(fullDf.as(" ORIGINAL" ),
169- $" UPDATED.userid" === $" ORIGINAL.userid" &&
170- $" UPDATED.level" === $" ORIGINAL.level" &&
168+ val finalUpdates = updates.as(" UPDATED" ).join(fullDf.as(" ORIGINAL" ),
169+ $" UPDATED.userid" === $" ORIGINAL.userid" &&
170+ $" UPDATED.level" === $" ORIGINAL.level" &&
171171 $" UPDATED.gameid" === $" ORIGINAL.gameid" , " inner" ).selectExpr(" ORIGINAL.*" ).drop(" writeTime" )
172172 finalUpdates.write.format(" org.apache.spark.sql.cassandra" ).mode(" append" ).option(" keyspace" , keyspaceName).option(" table" , tableName).save()
173-
174- val finalDeletes = shortDataframeT0.as(" T0" ).join(shortDataframeT1.as(" T1" ),
175- $" T1.userid" === $" T0.userid" &&
176- $" T1.level" === $" T0.level" &&
173+
174+ val finalDeletes = shortDataframeT0.as(" T0" ).join(shortDataframeT1.as(" T1" ),
175+ $" T1.userid" === $" T0.userid" &&
176+ $" T1.level" === $" T0.level" &&
177177 $" T1.gameid" === $" T0.gameid" , " leftanti" ).drop(" writeTime" )
178178
179- finalDeletes.rdd.foreach(d=>
179+ finalDeletes.rdd.foreach(d=>
180180 {
181181 val userid = d.get(0 ).toString
182182 val level = d.get(1 ).toString
@@ -192,9 +192,9 @@ object S3ToKeyspaces {
192192```
193193## Create a network connection for AWS Glue
194194Create a new Glue connection to connect to the Cassandra cluster.
195-
195+
196196``` shell script
197- aws glue create-connection --connection-input ' {
197+ aws glue create-connection --connection-input ' {
198198 "Name":"conn-cassandra-custom",
199199 "Description":"Connection to Cassandra cluster",
200200 "ConnectionType":"NETWORK",
@@ -228,7 +228,7 @@ datastax-java-driver {
228228
229229## Cassandra driver configuration to connect to Amazon Keyspaces
230230The following configuration for connecting to Amazon Keyspaces with the spark-cassandra connector.
231- Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second.
231+ Using the RateLimitingRequestThrottler we can ensure that request do not exceed configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 request per second.
232232With this configuration and G.1X DPU you will achieve 1000 request per Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.
233233
234234``` yaml
@@ -284,7 +284,7 @@ aws s3api put-object --bucket $MIGRATION_BUCKET --key scripts/S3toKeyspaces.scal
284284## Create IAM ROLE for AWS Glue
285285Create a new AWS service role named 'GlueKeyspacesMigration' with AWS Glue as a trusted entity.
286286
287- Included is a sample permissions-policy for executing Glue job. Read access to S3 bucket containing spack-cassandra-connector jar, configuration files for Amazon Keyspaces and the Cassandra.
287+ Included is a sample permissions-policy for executing Glue job. Read access to S3 bucket containing spack-cassandra-connector jar, configuration files for Amazon Keyspaces and the Cassandra.
288288Read and write access to S3 bucket containing intermediate data.
289289``` shell script
290290sed ' s/amazon-keyspaces-migration-bucket/' $MIGRATION_BUCKET ' /g' permissions-policy-template.json > permissions-policy.json
@@ -340,4 +340,4 @@ aws glue create-job \
340340 "--conf":"spark.cassandra.connection.config.profile.path=KeyspacesConnector.conf",
341341 "--class":"GlueApp"
342342 }'
343- ```
343+ ```
0 commit comments