From 14fb933b9f2b49f28016f783b919dabbbe7ec510 Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Thu, 28 Sep 2023 15:50:13 +0200 Subject: [PATCH 01/11] RING-44425 - First pass a complete comments --- scripts/S3_FSCK/s3_fsck_p0.py | 10 ++++++++++ scripts/S3_FSCK/s3_fsck_p1.py | 25 +++++++++++++++++++++++-- scripts/S3_FSCK/s3_fsck_p2.py | 12 ++++++++++++ scripts/S3_FSCK/s3_fsck_p3.py | 24 ++++++++++++++++++++++-- scripts/S3_FSCK/s3_fsck_p4.py | 9 +++++++++ 5 files changed, 76 insertions(+), 4 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index 6b67e68..bb77905 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -118,6 +118,7 @@ def sparse(f): def check_split(key): + """Check if the key is split or not. Return True if split, False if not split, None if error (404, 50X, etc.)""" url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: @@ -126,8 +127,10 @@ def check_split(key): def blob(row): + """Return a list of dict with the key, subkey and digkey""" key = row._c2 split = check_split(key) + # If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP if not split['result']: return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}] if split['is_split']: @@ -150,21 +153,28 @@ def blob(row): rtlst.append( {"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} ) + # If the key is split and request is OK, return a list of dict with the key, subkey and digkey set to the md5 of the subkey return rtlst + # If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK return [{"key": key, "subkey": "NOK", "digkey": "NOK"}] except requests.exceptions.ConnectionError as e: + # If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}] if not split['is_split']: + # If the key is not split, return a dict with the key, subkey set to SINGLE and digkey set to the md5 of the key return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) +# reading without a header, the _c0, _c1, _c2 are the default column names of column 1, 2, 3 for the csv df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) +# repartition the dataframe to have the same number of partitions as the number of executors * cores df = df.repartition(PARTITIONS) rdd = df.rdd.map(lambda x : blob(x)) dfnew = rdd.flatMap(lambda x: x).toDF() single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) +# write the dataframe to a csv file with a header dfnew.write.format("csv").mode("overwrite").options(header="true").save(single) diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 8d7b5ac..135fb6c 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,37 +39,58 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) +# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) -#list the ARC SPLIT main chunks +# list the ARC SPLIT main chunks with service ID 50 from column 2 df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") +# Match keys which end in 70 from column 2 dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) +# Filter out when less than 3 stripe chunks (RING orphans) dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") + +# dfARCsingle _c1 (column 2) is now the ringkey ??? dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) +# filter _c1 (column 2) for specific COS protection dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) + +# count the number of chunks in _c1 (column 2) found for each key dfCOSsingle = dfCOSsingle.groupBy("_c1").count() +# dfCOSsingle _c1 (column 2) is now the ringkey ??? dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"]) +# ??? dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# union the ARC and COS single chunks dfARCsingle = dfARCsingle.union(dfCOSsingle) -#list the ARC KEYS +# list the ARC KEYS with service ID 51 df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1") +# Match keys which end in 70 from column 2 dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$")) +# Filter out when less than 3 stripe chunks (RING orphans) dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3") +# dfARCSYNC _c1 (column 2) is now the ringkey ??? dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"]) +# filter _c1 (column 2) for specific COS protection dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# filter _c1 (column 2) for specific COS protection dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) +# count the number of chunks in _c1 (column 2) found for each key dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count() +# dfCOCSYNC _c1 (column 2) is now the ringkey ??? dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"]) +# ??? dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) +# union the ARC and COS SYNC chunks dfARCSYNC = dfARCSYNC.union(dfCOCSYNC) +# union the ARC and COS SYNC and single chunks to get the total list of keys dftotal = dfARCSYNC.union(dfARCsingle) total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) dftotal.write.format("csv").mode("overwrite").options(header="true").save(total) diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index 22eeb39..0034c40 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -39,16 +39,28 @@ .getOrCreate() +# s3keys are read from the verifySproxydKeys.js scripts output s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) +# ringkeys are read from the listkeys.py (or ringsh dump) scripts output ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) +# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys) +# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) +# rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey") +# inner join the s3keys and ringkeys on the digkey column +# the result will be a dataframe with the columns ringkey, digkey +# the leftanti option will remove the rows that are present in both dataframes. inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey") + +# Create the final dataframe with only the ringkey column df_final = inner_join_false.select("ringkey") + +# write the final dataframe to a csv file allmissing = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) df_final.write.format("csv").mode("overwrite").options(header="false").save(allmissing) diff --git a/scripts/S3_FSCK/s3_fsck_p3.py b/scripts/S3_FSCK/s3_fsck_p3.py index 6243e85..358f75f 100644 --- a/scripts/S3_FSCK/s3_fsck_p3.py +++ b/scripts/S3_FSCK/s3_fsck_p3.py @@ -45,34 +45,54 @@ .config("spark.local.dir", PATH) \ .getOrCreate() - +# Use of the arcindex limits the inspection to a specific ARC protection scheme. +# If there were more than one cluster with different ARC protection schemes then this would limit the check to a specific scheme. +# FOOD FOR THOUGHT: limits finding keys which may have been written after a schema change or any bug did not honor the schema. +# The arcindex is a dictionary that contains the ARC protection scheme and the hex value found in the ringkey arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} + +# The arcdatakeypattern is a regular expression that matches the ARC data keys arcdatakeypattern = re.compile(r'[0-9a-fA-F]{38}70') def statkey(row): + """ statkey takes a row from the dataframe and returns a tuple with the key, status_code, size""" key = row._c0 try: url = "%s/%s" % (SREBUILDD_URL, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: - if re.search(arcdatakeypattern, key): + if re.search(arcdatakeypattern, key): # Should consider changing this to match any entry in the arcindex + # The size of the ARC data key is 12 times the size of the ARC index key. + # At this point there is no longer access to the qty of keys found, so + # it simply computes based on the presumed schema of 12 chunks per key. size = int(r.headers.get("X-Scal-Size", False))*12 else: + # The size of the ARC index key is the size of the ARC index key plus the size of the ARC data key times the COS protection. + # At this point there is no longer access to the qty of keys found, so + # it simply computes based on the presumed schema of int(COS) chunks per key. + # If there are orphans which are not matching the arcdatakeypattern they will + # be computed as if they were COS. size = int(r.headers.get("X-Scal-Size",False)) + int(r.headers.get("X-Scal-Size",False))*int(COS) return ( key, r.status_code, size) else: + # If the key is not found (HTTP code != 200) then return the key, the status code, and 0 for the size return ( key, r.status_code, 0) except requests.exceptions.ConnectionError as e: + # If there is a connection error then return the key, the status code, and 0 for the size return ( key, "HTTP_ERROR", 0) files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) +# Create a dataframe from the csv file not using the header, the columns will be _c0, _c1, _c2 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) +# Create a resilient distributed dataset (RDD) from the dataframe (logical partitions of data) +# The rdd is a collection of tuples returned from statkey (key, status_code, size) rdd = df.rdd.map(statkey) #rdd1 = rdd.toDF() +# The size_computed is the sum of the size column in the rdd size_computed= rdd.map(lambda x: (2,int(x[2]))).reduceByKey(lambda x,y: x + y).collect()[0][1] string = "The total computed size of the not indexed keys is: %d bytes" % size_computed banner = '\n' + '-' * len(string) + '\n' diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index df61463..062a4f2 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -68,9 +68,18 @@ def deletekey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) + +# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) +# rename the column _c0 (column 1) to ringkey df = df.withColumnRenamed("_c0","ringkey") + +# repartition the dataframe to the number of partitions (executors * cores) df = df.repartition(PARTITIONS) + +# map the deletekey function to the dataframe rdd = df.rdd.map(deletekey).toDF() + deletedorphans = "%s://%s/%s/s3fsck/deleted-s3-orphans.csv" % (PROTOCOL, PATH, RING) +# write the dataframe to a csv file with the results of the deletekey function rdd.write.format("csv").mode("overwrite").options(header="false").save(deletedorphans) From 5d3ce2d7c02b6b383cc88d47fc2541beef1dfa18 Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Thu, 28 Sep 2023 17:35:18 +0200 Subject: [PATCH 02/11] RING-44425 - More clearly define a digkey, and the actual content of a subkey --- scripts/S3_FSCK/s3_fsck_p0.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index bb77905..a68a0fc 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -31,6 +31,7 @@ COS = cfg["cos_protection"] PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) +# The arcindex is a map between the ARC Schema and the hex value found in the ringkey in the 24 bits preceding the last 8 bits of the key arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' @@ -128,10 +129,12 @@ def check_split(key): def blob(row): """Return a list of dict with the key, subkey and digkey""" + # set key from row._c2 (column 3) which contains an sproxyd input key key = row._c2 + # use the sproxyd input key to find out if the key is split or not split = check_split(key) - # If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP if not split['result']: + # If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP return [{"key":key, "subkey":"NOK_HTTP", "digkey":"NOK_HTTP"}] if split['is_split']: try: @@ -149,7 +152,12 @@ def blob(row): chunks = chunk + chunk chunkshex = chunks.encode("hex") rtlst = [] + # the k value is the subkey, a subkey is the sproxys input key for each stripe of the split for k in list(set(sparse(chunkshex))): + # "key": key == primary sproxyd input key of a split object + # "subkey": k == subkey sproxyd input key of an individual stripe of a split object + # "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey, which is the unique part of a + # main chunk before service id, arc schema, and class are appended rtlst.append( {"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} ) From bc89b56812d4d2e5608efec90721e6c6e66b7fa8 Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Thu, 28 Sep 2023 18:12:12 +0200 Subject: [PATCH 03/11] RING-44425 - additional clarification --- scripts/S3_FSCK/s3_fsck_p0.py | 29 ++++++++++++++++++++++------- scripts/S3_FSCK/s3_fsck_p2.py | 12 ++++++++---- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index a68a0fc..ba2afe5 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -152,24 +152,39 @@ def blob(row): chunks = chunk + chunk chunkshex = chunks.encode("hex") rtlst = [] - # the k value is the subkey, a subkey is the sproxys input key for each stripe of the split + # the k value is the subkey, a subkey is the sproxyd input key for each stripe of the split for k in list(set(sparse(chunkshex))): # "key": key == primary sproxyd input key of a split object # "subkey": k == subkey sproxyd input key of an individual stripe of a split object - # "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey, which is the unique part of a - # main chunk before service id, arc schema, and class are appended + # "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey + # digkey: the unqiue part of a main chunk before service id, + # arc schema, and class are appended rtlst.append( {"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} ) - # If the key is split and request is OK, return a list of dict with the key, subkey and digkey set to the md5 of the subkey + # If the key is split and request is OK: + # return a list of dicts with the key (primary sproxyd input key), + # subkey (sproxyd input key of a split stripe) and + # digkey, (md5 of the subkey) + # digkey: the unqiue part of a main chunk before service id, + # arc schema, and class are appended return rtlst - # If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK + # If the key is split and request is not OK: + # return a dict with the key (primary sproxyd input key) + # with both subkey and digkey columns set to NOK return [{"key": key, "subkey": "NOK", "digkey": "NOK"}] except requests.exceptions.ConnectionError as e: - # If the key is split and request is not OK, return a dict with the key, with both subkey and digkey set to NOK + # If there is a Connection Error in the HTTP request: + # return a dict with the key(primary sproxyd input key), + # with both subkey and digkey set to NOK return [{"key": key, "subkey": "NOK_HTTP", "digkey": "NOK_HTTP"}] if not split['is_split']: - # If the key is not split, return a dict with the key, subkey set to SINGLE and digkey set to the md5 of the key + # If the key is not split: + # return a dict with the key (primary sproxyd input key), + # subkey set to SINGLE and + # digkey, (md5 of the subkey) + # digkey: the unqiue part of a main chunk before service id, + # arc schema, and class are appended return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] new_path = os.path.join(PATH, RING, "s3-bucketd") diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index 0034c40..d64aecf 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -50,14 +50,18 @@ dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) # rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 +# digkey: the unqiue part of a main chunk before service id, arc schema, and class are appended dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey") -# inner join the s3keys and ringkeys on the digkey column -# the result will be a dataframe with the columns ringkey, digkey -# the leftanti option will remove the rows that are present in both dataframes. +# inner join the s3keys (sproxyd input key) and ringkeys (the main chunk of the strip or replica) +# on the digkey column. The result will be a dataframe with the columns ringkey, digkey +# the inner join leftani will not return rows that are present in both dataframes, +# eliminating ringkeys (main chunks) that have metadata in s3 (not application orphans). +# digkey: the unqiue part of a main chunk before service id, arc schema, and class are appended +# ringkey: the main chunk of the strip or replica inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey") -# Create the final dataframe with only the ringkey column +# Create the final dataframe with only the ringkey (the main chunk of the strip or replica) df_final = inner_join_false.select("ringkey") # write the final dataframe to a csv file From 41f1b34905a074f5bcbafc58efec1b91d4237204 Mon Sep 17 00:00:00 2001 From: illuminatus Date: Thu, 28 Sep 2023 18:34:07 +0200 Subject: [PATCH 04/11] RING-44425 - Apply suggestions from code review Co-authored-by: scality-fno <84861109+scality-fno@users.noreply.github.com> --- scripts/S3_FSCK/s3_fsck_p0.py | 2 +- scripts/S3_FSCK/s3_fsck_p1.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index ba2afe5..6330b6a 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -128,7 +128,7 @@ def check_split(key): def blob(row): - """Return a list of dict with the key, subkey and digkey""" + """Return a list of dict with the sproxyd input key, its subkey if it exists and digkey""" # set key from row._c2 (column 3) which contains an sproxyd input key key = row._c2 # use the sproxyd input key to find out if the key is split or not diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 135fb6c..3ad29ee 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -45,15 +45,15 @@ # list the ARC SPLIT main chunks with service ID 50 from column 2 df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") -# Match keys which end in 70 from column 2 +# Match keys which end in 70 from column 2, to a new dataframe dfARCsingle dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) # Filter out when less than 3 stripe chunks (RING orphans) dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") -# dfARCsingle _c1 (column 2) is now the ringkey ??? +# rename dfARCsingle _c1 (column 2): it is now the "ringkey" (aka. "main chunk") ??? dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) -# filter _c1 (column 2) for specific COS protection +# in df_split, filter _c1 (column 2) RING key main chunk for specific COS protection dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) # count the number of chunks in _c1 (column 2) found for each key From 4a24dc93869392a5ac35b2e4e454185d372e5bbb Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Mon, 2 Oct 2023 08:39:59 -0700 Subject: [PATCH 05/11] RING-44425 - s3-bucketd data structure in p0 --- scripts/S3_FSCK/s3_fsck_p0.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index ba2afe5..5da1118 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -190,7 +190,9 @@ def blob(row): new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) -# reading without a header, the _c0, _c1, _c2 are the default column names of column 1, 2, 3 for the csv +# s3-bucketd structure: +# { bucket, s3 object, sproxyd input key } +# e.g. test,2022%2F04%2F23%2Fclients%2F798a98d2367e8d5d4%2Fobject%2F1000.idx,08BC471F14C77A14C2F1A9C78F6BCD59FB7A5B20 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # repartition the dataframe to have the same number of partitions as the number of executors * cores From 663a8458167081991d970c43a382565200790fd3 Mon Sep 17 00:00:00 2001 From: illuminatus Date: Mon, 2 Oct 2023 21:58:37 +0200 Subject: [PATCH 06/11] Update scripts/S3_FSCK/s3_fsck_p0.py Co-authored-by: scality-fno <84861109+scality-fno@users.noreply.github.com> --- scripts/S3_FSCK/s3_fsck_p0.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index 1d2db0d..2e823bd 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -119,7 +119,7 @@ def sparse(f): def check_split(key): - """Check if the key is split or not. Return True if split, False if not split, None if error (404, 50X, etc.)""" + """Check if the RING key is split or not. Return True if split, False if not split, None if error (422, 404, 50X, etc.)""" url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: From 73f7442c6dacf31b08c9d2be9ecdbe1d8fdedbba Mon Sep 17 00:00:00 2001 From: illuminatus Date: Mon, 2 Oct 2023 21:59:10 +0200 Subject: [PATCH 07/11] Update scripts/S3_FSCK/s3_fsck_p1.py --- scripts/S3_FSCK/s3_fsck_p1.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 3ad29ee..acaeb21 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,7 +39,10 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) -# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 +# listkeys.csv structure: +# { RING key, main chunk of the RING key, disk, flag } +# e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 +# reading listkeys.csv files without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # list the ARC SPLIT main chunks with service ID 50 from column 2 From 1bfe636f31bc5e5262939d4ab24a6c1590b53c1b Mon Sep 17 00:00:00 2001 From: illuminatus Date: Mon, 2 Oct 2023 22:00:35 +0200 Subject: [PATCH 08/11] Update scripts/S3_FSCK/s3_fsck_p0.py --- scripts/S3_FSCK/s3_fsck_p0.py | 1 + 1 file changed, 1 insertion(+) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index 2e823bd..c097759 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -132,6 +132,7 @@ def blob(row): # set key from row._c2 (column 3) which contains an sproxyd input key key = row._c2 # use the sproxyd input key to find out if the key is split or not + # check_split(key) is used to transform the input key into a RING key, assess if it exists AND whether it is a SPLIT. split = check_split(key) if not split['result']: # If the key is not found, return a dict with the key, subkey and digkey set to NOK_HTTP From 50c510841b862ea8c3cc4222ecd238eb9e334a92 Mon Sep 17 00:00:00 2001 From: illuminatus Date: Tue, 10 Oct 2023 17:38:55 +0200 Subject: [PATCH 09/11] RING-44425 - review suggestions Co-authored-by: scality-fno <84861109+scality-fno@users.noreply.github.com> --- scripts/S3_FSCK/s3_fsck_p0.py | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index c097759..d913434 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -31,7 +31,9 @@ COS = cfg["cos_protection"] PARTITIONS = int(cfg["spark.executor.instances"]) * int(cfg["spark.executor.cores"]) -# The arcindex is a map between the ARC Schema and the hex value found in the ringkey in the 24 bits preceding the last 8 bits of the key +# The arcindex maps the ARC Schema to the hex value found in the ringkey, in the 24 bits preceding the last 8 bits of the key +# e.g. FD770A344D6A6D259F92C500000000512040C070 +# FD770A344D6A6D259F92C50000000051XXXXXX70 where XXXXXX : 2040C0 arcindex = {"4+2": "102060", "8+4": "2040C0", "9+3": "2430C0", "7+5": "1C50C0", "5+7": "1470C0"} os.environ["PYSPARK_SUBMIT_ARGS"] = '--packages "org.apache.hadoop:hadoop-aws:2.7.3" pyspark-shell' @@ -119,7 +121,7 @@ def sparse(f): def check_split(key): - """Check if the RING key is split or not. Return True if split, False if not split, None if error (422, 404, 50X, etc.)""" + """With srebuildd, check if the RING key is split or not. Return True if split, False if not split, None if error (422, 404, 50X, etc.)""" url = "http://%s:81/%s/%s" % (SREBUILDD_IP, SREBUILDD_ARC_PATH, str(key.zfill(40))) r = requests.head(url) if r.status_code == 200: @@ -130,6 +132,8 @@ def check_split(key): def blob(row): """Return a list of dict with the sproxyd input key, its subkey if it exists and digkey""" # set key from row._c2 (column 3) which contains an sproxyd input key + # input structure: (bucket name, s3 object key, sproxyd input key) + # FIXME: the naming of the method is terrible key = row._c2 # use the sproxyd input key to find out if the key is split or not # check_split(key) is used to transform the input key into a RING key, assess if it exists AND whether it is a SPLIT. @@ -158,7 +162,7 @@ def blob(row): # "key": key == primary sproxyd input key of a split object # "subkey": k == subkey sproxyd input key of an individual stripe of a split object # "digkey": gen_md5_from_id(k)[:26] == md5 of the subkey - # digkey: the unqiue part of a main chunk before service id, + # digkey: the unique part of a main chunk before service id, # arc schema, and class are appended rtlst.append( {"key": key, "subkey": k, "digkey": gen_md5_from_id(k)[:26]} @@ -184,23 +188,28 @@ def blob(row): # return a dict with the key (primary sproxyd input key), # subkey set to SINGLE and # digkey, (md5 of the subkey) - # digkey: the unqiue part of a main chunk before service id, + # digkey: the unique part of a main chunk before service id, # arc schema, and class are appended return [{"key": key, "subkey": "SINGLE", "digkey": gen_md5_from_id(key)[:26]}] new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) -# s3-bucketd structure: -# { bucket, s3 object, sproxyd input key } -# e.g. test,2022%2F04%2F23%2Fclients%2F798a98d2367e8d5d4%2Fobject%2F1000.idx,08BC471F14C77A14C2F1A9C78F6BCD59FB7A5B20 +# reading without a header, +# columns _c0, _c1, _c2 are the default column names of +# columns 1, 2, 3 for the csv +# input structure: (bucket name, s3 object key, sproxyd input key) +# e.g. test,48K_object.01,9BC9C6080ED24A42C2F1A9C78F6BCD5967F70220 df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # repartition the dataframe to have the same number of partitions as the number of executors * cores df = df.repartition(PARTITIONS) +# Return a new Resilient Distributed Dataset (RDD) by applying a function to each element of this RDD. rdd = df.rdd.map(lambda x : blob(x)) +# Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results. Then transform it into a dataframe. dfnew = rdd.flatMap(lambda x: x).toDF() single = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) # write the dataframe to a csv file with a header +# output structure: (digkey, sproxyd input key, subkey if available) dfnew.write.format("csv").mode("overwrite").options(header="true").save(single) From 644b54e5235280f27037372b75be2675d0b3ad7c Mon Sep 17 00:00:00 2001 From: trevorbenson Date: Tue, 10 Oct 2023 11:50:10 -0700 Subject: [PATCH 10/11] RING-44425 - input structures, required fields --- scripts/S3_FSCK/s3_fsck_p0.py | 6 ++++-- scripts/S3_FSCK/s3_fsck_p1.py | 10 +++++++--- scripts/S3_FSCK/s3_fsck_p2.py | 24 ++++++++++++++++++++---- scripts/S3_FSCK/s3_fsck_p3.py | 10 +++++++++- scripts/S3_FSCK/s3_fsck_p4.py | 8 +++++++- 5 files changed, 47 insertions(+), 11 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p0.py b/scripts/S3_FSCK/s3_fsck_p0.py index d913434..159f428 100644 --- a/scripts/S3_FSCK/s3_fsck_p0.py +++ b/scripts/S3_FSCK/s3_fsck_p0.py @@ -195,11 +195,13 @@ def blob(row): new_path = os.path.join(PATH, RING, "s3-bucketd") files = "%s://%s" % (PROTOCOL, new_path) -# reading without a header, -# columns _c0, _c1, _c2 are the default column names of +# reading without a header, +# columns _c0, _c1, _c2 are the default column names of # columns 1, 2, 3 for the csv # input structure: (bucket name, s3 object key, sproxyd input key) # e.g. test,48K_object.01,9BC9C6080ED24A42C2F1A9C78F6BCD5967F70220 +# Required Fields: +# - _c2 (sproxyd input key) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # repartition the dataframe to have the same number of partitions as the number of executors * cores diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index acaeb21..430c8b3 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,10 +39,14 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) -# listkeys.csv structure: -# { RING key, main chunk of the RING key, disk, flag } +# reading without a header, +# columns _c0, _c1, _c2, _c3 are the default column names of +# columns 1, 2, 3, 4 for the csv +# input structure: (RING key, main chunk, disk, flag) # e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 -# reading listkeys.csv files without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 +# Required Fields: +# - _c1 (main chunk) +# - _c3 (FLAG) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) # list the ARC SPLIT main chunks with service ID 50 from column 2 diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index d64aecf..b4e9fe7 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -39,14 +39,30 @@ .getOrCreate() -# s3keys are read from the verifySproxydKeys.js scripts output +# s3keys are generated by verifySproxydKeys.js script and processed by s3_fsck_p0.py s3keys = "%s://%s/%s/s3fsck/s3-dig-keys.csv" % (PROTOCOL, PATH, RING) -# ringkeys are read from the listkeys.py (or ringsh dump) scripts output +# ringkeys are generated by the listkeys.py (or ringsh dump) script and processed by s3_fsck_p1.py ringkeys = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) -# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to +# reading with a header, the columns are named. +# columns digkey, sproxyd input key, subkey are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, sproxyd input key, subkey) +# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# Required Fields: +# - digkey +# - sproxyd input key dfs3keys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(s3keys) -# reading with a header, the columns are named. The column _c1 will be whatever column the _c1 header is assigned to + + +# reading with a header, the columns are named. +# columns _c1, count, ringkey (main chunk) are the actual column names of +# columns 1, 2, 3 for the csv +# input structure: (digkey, count, ringkey (main chunk)) +# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# Required Fields: +# - digkey +# - ringkey (main chunk) dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) # rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 diff --git a/scripts/S3_FSCK/s3_fsck_p3.py b/scripts/S3_FSCK/s3_fsck_p3.py index 358f75f..77712f9 100644 --- a/scripts/S3_FSCK/s3_fsck_p3.py +++ b/scripts/S3_FSCK/s3_fsck_p3.py @@ -84,8 +84,16 @@ def statkey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -# Create a dataframe from the csv file not using the header, the columns will be _c0, _c1, _c2 + +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) + # Create a resilient distributed dataset (RDD) from the dataframe (logical partitions of data) # The rdd is a collection of tuples returned from statkey (key, status_code, size) rdd = df.rdd.map(statkey) diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index 062a4f2..290ec20 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -69,7 +69,13 @@ def deletekey(row): files = "%s://%s/%s/s3fsck/s3objects-missing.csv" % (PROTOCOL, PATH, RING) -# reading without a header, the _c0, _c1, _c2, _c3 are the default column names for column 1, 2, 3, 4 +# reading without a header, +# columns _c0 is the default column names of +# column 1 for the csv +# input structure: _c0 (main chunk) +# e.g. 998C4DF2FC7389A7C82A9600000000512040C070 +# Required Fields: +# - _c0 (main chunk) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").load(files) # rename the column _c0 (column 1) to ringkey df = df.withColumnRenamed("_c0","ringkey") From 2c8f3de4c4f8dfa70ee183221cb615b5b3bf7ab3 Mon Sep 17 00:00:00 2001 From: illuminatus Date: Wed, 25 Oct 2023 17:41:34 -0700 Subject: [PATCH 11/11] Apply suggestions from code review Co-authored-by: scality-fno <84861109+scality-fno@users.noreply.github.com> --- scripts/S3_FSCK/s3_fsck_p1.py | 52 +++++++++++++++++++++-------------- scripts/S3_FSCK/s3_fsck_p2.py | 6 ++-- scripts/S3_FSCK/s3_fsck_p4.py | 2 +- 3 files changed, 36 insertions(+), 24 deletions(-) diff --git a/scripts/S3_FSCK/s3_fsck_p1.py b/scripts/S3_FSCK/s3_fsck_p1.py index 430c8b3..244d1a3 100644 --- a/scripts/S3_FSCK/s3_fsck_p1.py +++ b/scripts/S3_FSCK/s3_fsck_p1.py @@ -39,65 +39,77 @@ files = "%s://%s/%s/listkeys.csv" % (PROTOCOL, PATH, RING) +# FIXME skip native column names, rename on the fly. # reading without a header, -# columns _c0, _c1, _c2, _c3 are the default column names of +# columns _c0, _c1, _c2, _c3 are the default column names of # columns 1, 2, 3, 4 for the csv +# REQUIRED N, Y, N, Y # input structure: (RING key, main chunk, disk, flag) # e.g. 555555A4948FAA554034E155555555A61470C07A,8000004F3F3A54FFEADF8C00000000511470C070,g1disk1,0 # Required Fields: # - _c1 (main chunk) -# - _c3 (FLAG) +# - _c3 (flag) df = spark.read.format("csv").option("header", "false").option("inferSchema", "true").option("delimiter", ",").load(files) -# list the ARC SPLIT main chunks with service ID 50 from column 2 +# list the ARC_REPLICATED main chunks from column 2 with service ID 50 and flag = 0 (new), into a single column (vector) named "_c1" +# FIXME rename column on the fly df_split = df.filter(df["_c1"].rlike(r".*000000..50........$") & df["_c3"].rlike("0")).select("_c1") -# Match keys which end in 70 from column 2, to a new dataframe dfARCsingle +# In df_split, match keys which end in 70 from column with litteral name "_c1", to a new dataframe dfARCsingle +# FIXME explain what we're trying to accomplish by identifying such keys (0x50: ARC_REPLICATED, per UKS --> calls for a COS ending, not an ARC ending). Under what circumstances should such keys exist? dfARCsingle = df_split.filter(df["_c1"].rlike(r".*70$")) -# Filter out when less than 3 stripe chunks (RING orphans) +# Filter out when strictly less than 4 stripe chunks (RING orphans), creating a new "count" column on the fly +# dfARCsingle now has two columns ("_c1", "count") dfARCsingle = dfARCsingle.groupBy("_c1").count().filter("count > 3") -# rename dfARCsingle _c1 (column 2): it is now the "ringkey" (aka. "main chunk") ??? +# in dfARCsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column +# dfARCsingle now has three columns ("_c1", "count", "ringkey") +# FIXME do the renaming some place else, e.g. upon dataframe creation, be consistent about it dfARCsingle = dfARCsingle.withColumn("ringkey",dfARCsingle["_c1"]) -# in df_split, filter _c1 (column 2) RING key main chunk for specific COS protection +# in df_split (a vector of main chunks), filter column named "_c1" RING key main chunk for the configured COS protection dfCOSsingle = df_split.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) -# count the number of chunks in _c1 (column 2) found for each key +# count the number of chunks in column named "_c1" found for each key, creating the "count" column on the fly +# dfCOSsingle now has two columns ("_c1", "count") dfCOSsingle = dfCOSsingle.groupBy("_c1").count() -# dfCOSsingle _c1 (column 2) is now the ringkey ??? +# in dfCOSsingle, duplicate column named "_c1" into a new "ringkey" (aka. "main chunk") column +# dfCOSsingle now has three columns ("_c1", "count", "ringkey") dfCOSsingle = dfCOSsingle.withColumn("ringkey",dfCOSsingle["_c1"]) -# ??? +# in dfCOSsingle, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) +# FIXME: say why we need those 4 extra characters (about 18% more weight than the 22-char md5 alone) dfCOSsingle = dfCOSsingle.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) -# union the ARC and COS single chunks +# union the dfCOSsingle and dfARCsingle dataframes ("_c1", "count", "ringkey") dfARCsingle = dfARCsingle.union(dfCOSsingle) -# list the ARC KEYS with service ID 51 +# list the ARC_SINGLE keys with service ID 51 +# repeat the same logic as before, with a different initial mask +# Output is a three-column matrix that will be unioned with the previous dataframe dfARCsingle df_sync = df.filter(df["_c1"].rlike(r".*000000..51........$")).select("_c1") # Match keys which end in 70 from column 2 dfARCSYNC = df_sync.filter(df["_c1"].rlike(r".*70$")) # Filter out when less than 3 stripe chunks (RING orphans) dfARCSYNC = dfARCSYNC.groupBy("_c1").count().filter("count > 3") -# dfARCSYNC _c1 (column 2) is now the ringkey ??? +# dfARCSYNC "_c1" column is duplicated into a "ringkey" column dfARCSYNC = dfARCSYNC.withColumn("ringkey",dfARCSYNC["_c1"]) -# filter _c1 (column 2) for specific COS protection +# in dfARCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) dfARCSYNC = dfARCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) -# filter _c1 (column 2) for specific COS protection +# filter "_c1" for configured COS protection dfCOCSYNC = df_sync.filter(df["_c1"].rlike(r".*" + str(COS) + "0$")) -# count the number of chunks in _c1 (column 2) found for each key +# count the number of chunks in "_c1" found for each key dfCOCSYNC = dfCOCSYNC.groupBy("_c1").count() -# dfCOCSYNC _c1 (column 2) is now the ringkey ??? +# dfCOCSYNC "_c1" column is duplicated into a "ringkey" column dfCOCSYNC = dfCOCSYNC.withColumn("ringkey",dfCOCSYNC["_c1"]) -# ??? +# in dfCOCSYNC, do an in-place substring operation on column "_c1": get the 26 first characters of the main chunk (MD5 hash of the input key + 4 extra chars) dfCOCSYNC = dfCOCSYNC.withColumn("_c1",F.expr("substring(_c1, 1, length(_c1)-14)")) -# union the ARC and COS SYNC chunks +# union the two previous dataframes dfARCSYNC = dfARCSYNC.union(dfCOCSYNC) -# union the ARC and COS SYNC and single chunks to get the total list of keys +# union again the two outstanding dataframes dfARCSYNC and dfARCSINGLE into a dftotal dataframe dftotal = dfARCSYNC.union(dfARCsingle) total = "%s://%s/%s/s3fsck/arc-keys.csv" % (PROTOCOL, PATH, RING) dftotal.write.format("csv").mode("overwrite").options(header="true").save(total) diff --git a/scripts/S3_FSCK/s3_fsck_p2.py b/scripts/S3_FSCK/s3_fsck_p2.py index b4e9fe7..2be5baf 100644 --- a/scripts/S3_FSCK/s3_fsck_p2.py +++ b/scripts/S3_FSCK/s3_fsck_p2.py @@ -59,21 +59,21 @@ # columns _c1, count, ringkey (main chunk) are the actual column names of # columns 1, 2, 3 for the csv # input structure: (digkey, count, ringkey (main chunk)) -# e.g. 7359114991482315D0A5890000,BDE4B9BBEB45711EC2F1A9C78F6BCD59E02C6220,SINGLE +# e.g. 907024530554A8DB3167280000,12,907024530554A8DB31672800000000512430C070 # Required Fields: # - digkey # - ringkey (main chunk) dfringkeys = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(ringkeys) # rename the column _c1 to digkey, the next write will output a header that uses digkey instead of _c1 -# digkey: the unqiue part of a main chunk before service id, arc schema, and class are appended +# digkey: the unique part of a main chunk before service id, arc schema, and class are appended dfringkeys = dfringkeys.withColumnRenamed("_c1","digkey") # inner join the s3keys (sproxyd input key) and ringkeys (the main chunk of the strip or replica) # on the digkey column. The result will be a dataframe with the columns ringkey, digkey # the inner join leftani will not return rows that are present in both dataframes, # eliminating ringkeys (main chunks) that have metadata in s3 (not application orphans). -# digkey: the unqiue part of a main chunk before service id, arc schema, and class are appended +# digkey: the unique part of a main chunk before service id, arc schema, and class are appended # ringkey: the main chunk of the strip or replica inner_join_false = dfringkeys.join(dfs3keys,["digkey"], "leftanti").withColumn("is_present", F.lit(int(0))).select("ringkey", "is_present", "digkey") diff --git a/scripts/S3_FSCK/s3_fsck_p4.py b/scripts/S3_FSCK/s3_fsck_p4.py index 290ec20..f93dd75 100644 --- a/scripts/S3_FSCK/s3_fsck_p4.py +++ b/scripts/S3_FSCK/s3_fsck_p4.py @@ -83,7 +83,7 @@ def deletekey(row): # repartition the dataframe to the number of partitions (executors * cores) df = df.repartition(PARTITIONS) -# map the deletekey function to the dataframe +# map the deletekey function to the dataframe: blindly delete keys on the RING. rdd = df.rdd.map(deletekey).toDF() deletedorphans = "%s://%s/%s/s3fsck/deleted-s3-orphans.csv" % (PROTOCOL, PATH, RING)