From c28bb9c9af7837c4651c01d4439db4eef0bd8bd6 Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 17 Jun 2021 17:59:00 +0530 Subject: [PATCH 1/6] Added support for anomaly check config --- pydeequ/verification.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/pydeequ/verification.py b/pydeequ/verification.py index b4579cf..f4f7728 100644 --- a/pydeequ/verification.py +++ b/pydeequ/verification.py @@ -1,6 +1,6 @@ from pyspark.sql import SparkSession, DataFrame -from pydeequ.checks import CheckStatus, CheckResult, Check +from pydeequ.checks import CheckStatus, CheckResult, Check, CheckLevel from pydeequ.analyzers import * from pydeequ.anomaly_detection import * import json @@ -8,8 +8,26 @@ # TODO integrate Analyzer context + class AnomalyCheckConfig: - pass + + def __init__(self, level: CheckLevel, description): + self.level = level + self.description = description + + def _get_java_object(self, jvm): + self._jvm = jvm + self._java_level = self.level._get_java_object(self._jvm) + self._check_java_class = self._jvm.com.amazon.deequ.AnomalyCheckConfig + self._anomalyCheckConfig_jvm = self._check_java_class( + self._java_level, + self.description, + getattr(self._check_java_class, 'apply$default3')(), + None, + None + ) + return self._anomalyCheckConfig_jvm + class VerificationResult: """ The results returned from the VerificationSuite @@ -157,7 +175,7 @@ def addCheck(self, check: Check): self._VerificationRunBuilder.addCheck(check._Check) return self - def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckConfig=None): + def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckConfig: AnomalyCheckConfig = None): """ Add a check using anomaly_detection methods. The Anomaly Detection Strategy only checks if the new value is an Anomaly. @@ -167,9 +185,11 @@ def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckCon :param anomalyCheckConfig: Some configuration settings for the Check :return: Adds an anomaly strategy to the run """ - if anomalyCheckConfig: raise NotImplementedError("anomalyCheckConfigs have not been implemented yet, using default value") + anomalyCheckConfig_jvm = None + if anomalyCheckConfig: + anomalyCheckConfig_jvm = anomalyCheckConfig._get_java_object(self._jvm) - AnomalyCheckConfig = self._jvm.scala.Option.apply(anomalyCheckConfig) + AnomalyCheckConfig = self._jvm.scala.Option.apply(anomalyCheckConfig_jvm) anomaly._set_jvm(self._jvm) anomaly_jvm = anomaly._anomaly_jvm From a3945caebc08c190b738377e37df3d7a8dfb8bfb Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 17 Jun 2021 18:06:03 +0530 Subject: [PATCH 2/6] Added support for anomaly check config --- pydeequ/verification.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pydeequ/verification.py b/pydeequ/verification.py index f4f7728..cdccc2c 100644 --- a/pydeequ/verification.py +++ b/pydeequ/verification.py @@ -22,7 +22,7 @@ def _get_java_object(self, jvm): self._anomalyCheckConfig_jvm = self._check_java_class( self._java_level, self.description, - getattr(self._check_java_class, 'apply$default3')(), + getattr(self._check_java_class, 'apply$default$3')(), None, None ) From 6d009bce9487263a1679f3f37467f491e882fc3a Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 17 Jun 2021 18:31:08 +0530 Subject: [PATCH 3/6] Added other default values --- pydeequ/verification.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pydeequ/verification.py b/pydeequ/verification.py index cdccc2c..7cbebad 100644 --- a/pydeequ/verification.py +++ b/pydeequ/verification.py @@ -23,8 +23,8 @@ def _get_java_object(self, jvm): self._java_level, self.description, getattr(self._check_java_class, 'apply$default$3')(), - None, - None + getattr(self._check_java_class, 'apply$default$4')(), + getattr(self._check_java_class, 'apply$default$5')(), ) return self._anomalyCheckConfig_jvm From 8a89e324f35ba72d7a1be41dd0acfe5dec2897ff Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 12 Aug 2021 19:13:44 +0530 Subject: [PATCH 4/6] Added anomaly config test cases --- tests/test_anomaly_detection.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/tests/test_anomaly_detection.py b/tests/test_anomaly_detection.py index ae349ac..f67634d 100644 --- a/tests/test_anomaly_detection.py +++ b/tests/test_anomaly_detection.py @@ -181,7 +181,8 @@ def OnlineNormalStrategy( print(df.collect()) return df.select("check_status").collect() - def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, upperBound): + def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, upperBound, + anomalyCheckConfig: AnomalyCheckConfig = None): metricsRepository = InMemoryMetricsRepository(self.spark) previousKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 1000 * 60) @@ -196,7 +197,7 @@ def SimpleThresholdStrategy(self, df_prev, df_curr, analyzer_func, lowerBound, u .onData(df_curr) .useRepository(metricsRepository) .saveOrAppendResult(currKey) - .addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func) + .addAnomalyCheck(SimpleThresholdStrategy(lowerBound, upperBound), analyzer_func, anomalyCheckConfig) .run() ) @@ -486,6 +487,20 @@ def get_anomalyDetector(self, anomaly): def test_anomalyDetector(self): self.get_anomalyDetector(SimpleThresholdStrategy(1.0, 3.0)) + def test_SimpleThresholdStrategy_Error(self): + config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Error) + # Lower bound is 1 upper bound is 6 (Range: 1-6 rows) + self.assertEqual( + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), [Row(check_status="Error")] + ) + + def test_SimpleThresholdStrategy_Warning(self): + config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Warning) + # Lower bound is 1 upper bound is 6 (Range: 1-6 rows) + self.assertEqual( + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), [Row(check_status="Warning")] + ) + # # def test_RelativeRateOfChangeStrategy(self): # metricsRepository = InMemoryMetricsRepository(self.spark) From b24ca4d875ae94dbca4f63efd0b02443ecf67f9b Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 12 Aug 2021 19:18:26 +0530 Subject: [PATCH 5/6] Added anomaly config test cases --- pydeequ/verification.py | 63 +++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/pydeequ/verification.py b/pydeequ/verification.py index ebf2fce..fb88f0a 100644 --- a/pydeequ/verification.py +++ b/pydeequ/verification.py @@ -1,14 +1,13 @@ +from pydeequ.analyzers import _AnalyzerObject import json from pyspark import SQLContext from pyspark.sql import DataFrame, SparkSession from pydeequ.analyzers import AnalysisRunBuilder -from pydeequ.checks import Check -from pydeequ.checks import CheckLevel +from pydeequ.checks import Check, CheckLevel from pydeequ.pandas_utils import ensure_pyspark_df - # TODO integrate Analyzer context @@ -67,7 +66,7 @@ def metrics(self): @classmethod def successMetricsAsDataFrame( - cls, spark_session: SparkSession, verificationResult, forAnalyzers: list = None, pandas: bool = False + cls, spark_session: SparkSession, verificationResult, forAnalyzers: list = None, pandas: bool = False ): """ The results returned in a Data Frame @@ -134,7 +133,7 @@ def checkResultsAsJson(cls, spark_session: SparkSession, verificationResult, for @classmethod def checkResultsAsDataFrame( - cls, spark_session: SparkSession, verificationResult, forChecks=None, pandas: bool = False + cls, spark_session: SparkSession, verificationResult, forChecks=None, pandas: bool = False ): """ Returns the verificaton Results as a Data Frame @@ -194,7 +193,7 @@ def addCheck(self, check: Check): self._VerificationRunBuilder.addCheck(check._Check) return self - def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckConfig: AnomalyCheckConfig = None): + def addAnomalyCheck(self, anomaly, analyzer: _AnalyzerObject, anomalyCheckConfig=None): """ Add a check using anomaly_detection methods. The Anomaly Detection Strategy only checks if the new value is an Anomaly. @@ -204,7 +203,6 @@ def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckCon :param anomalyCheckConfig: Some configuration settings for the Check :return: Adds an anomaly strategy to the run """ - anomalyCheckConfig_jvm = None if anomalyCheckConfig: anomalyCheckConfig_jvm = anomalyCheckConfig._get_java_object(self._jvm) @@ -220,38 +218,35 @@ def addAnomalyCheck(self, anomaly, analyzer: AnalysisRunBuilder, anomalyCheckCon self._VerificationRunBuilder.addAnomalyCheck(anomaly_jvm, analyzer_jvm, AnomalyCheckConfig) return self + def run(self): + """ + A method that runs the desired VerificationRunBuilder functions on the data to obtain a Verification Result + :return:a verificationResult object + """ + return VerificationResult(self._spark_session, self._VerificationRunBuilder.run()) -def run(self): - """ - A method that runs the desired VerificationRunBuilder functions on the data to obtain a Verification Result - :return:a verificationResult object - """ - return VerificationResult(self._spark_session, self._VerificationRunBuilder.run()) - - -def useRepository(self, repository): - """ - This method reassigns our AnalysisRunBuilder because useRepository returns back a different - class: AnalysisRunBuilderWithRepository - - Sets a metrics repository associated with the current data to enable features like reusing previously computed - results and storing the results of the current run. + def useRepository(self, repository): + """ + This method reassigns our AnalysisRunBuilder because useRepository returns back a different + class: AnalysisRunBuilderWithRepository - :param repository: a metrics repository to store and load results associated with the run - """ - self._VerificationRunBuilder = self._VerificationRunBuilder.useRepository(repository.repository) - return self + Sets a metrics repository associated with the current data to enable features like reusing previously computed + results and storing the results of the current run. + :param repository: a metrics repository to store and load results associated with the run + """ + self._VerificationRunBuilder = self._VerificationRunBuilder.useRepository(repository.repository) + return self -def saveOrAppendResult(self, resultKey): - """ - A shortcut to save the results of the run or append them to the existing results in the metrics repository. + def saveOrAppendResult(self, resultKey): + """ + A shortcut to save the results of the run or append them to the existing results in the metrics repository. - :param resultKey: The result key to identify the current run - :return: :A VerificationRunBuilder.scala object that saves or appends a result - """ - self._VerificationRunBuilder.saveOrAppendResult(resultKey.resultKey) - return self + :param resultKey: The result key to identify the current run + :return: :A VerificationRunBuilder.scala object that saves or appends a result + """ + self._VerificationRunBuilder.saveOrAppendResult(resultKey.resultKey) + return self class VerificationRunBuilderWithSparkSession(VerificationRunBuilder): From 71a3110f9997b722b4647a94cf6762e487297ffa Mon Sep 17 00:00:00 2001 From: Rahul Goyal Date: Thu, 12 Aug 2021 19:32:02 +0530 Subject: [PATCH 6/6] Fixed defect --- tests/test_anomaly_detection.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_anomaly_detection.py b/tests/test_anomaly_detection.py index f67634d..1e397e5 100644 --- a/tests/test_anomaly_detection.py +++ b/tests/test_anomaly_detection.py @@ -491,14 +491,14 @@ def test_SimpleThresholdStrategy_Error(self): config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Error) # Lower bound is 1 upper bound is 6 (Range: 1-6 rows) self.assertEqual( - self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), [Row(check_status="Error")] + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0, config), [Row(check_status="Error")] ) def test_SimpleThresholdStrategy_Warning(self): config = AnomalyCheckConfig(description='test error case', level=CheckLevel.Warning) # Lower bound is 1 upper bound is 6 (Range: 1-6 rows) self.assertEqual( - self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), [Row(check_status="Warning")] + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0, config), [Row(check_status="Warning")] ) #