diff --git a/tests/test_anomaly_detection.py b/tests/test_anomaly_detection.py index ae349ac..d78967b 100644 --- a/tests/test_anomaly_detection.py +++ b/tests/test_anomaly_detection.py @@ -4,10 +4,19 @@ import pytest from pyspark.sql import Row -from pydeequ.analyzers import * -from pydeequ.anomaly_detection import * -from pydeequ.repository import * -from pydeequ.verification import * +from pydeequ.analyzers import MinLength, Size +from pydeequ.anomaly_detection import ( + AbsoluteChangeStrategy, + BatchNormalStrategy, + HoltWinters, + MetricInterval, + OnlineNormalStrategy, + RelativeRateOfChangeStrategy, + SeriesSeasonality, + SimpleThresholdStrategy, +) +from pydeequ.repository import InMemoryMetricsRepository, ResultKey +from pydeequ.verification import VerificationResult, VerificationSuite from tests.conftest import setup_pyspark @@ -19,81 +28,41 @@ def setUpClass(cls): cls.df_1 = cls.sc.parallelize( [ - Row( - a=3, - b=0, - c="colder", - ), - Row( - a=3, - b=5, - c="bolder", - ), + Row(a=3, b=0, c="colder"), + Row(a=3, b=5, c="bolder"), ] ).toDF() cls.df_2 = cls.sc.parallelize( [ - Row( - a=3, - b=0, - c="foo", - ), - Row( - a=3, - b=5, - c="zoo", - ), - Row( - a=100, - b=5, - c="who", - ), - Row( - a=2, - b=30, - c="email", - ), - Row( - a=10, - b=5, - c="cards", - ), + Row(a=3, b=0, c="foo"), + Row(a=3, b=5, c="zoo"), + Row(a=100, b=5, c="who"), + Row(a=2, b=30, c="email"), + Row(a=10, b=5, c="cards"), ] ).toDF() - cls.df_3 = cls.sc.parallelize( - [ - Row( - a=1, - b=23, - c="pool", - ) - ] - ).toDF() + cls.df_3 = cls.sc.parallelize([Row(a=1, b=23, c="pool")]).toDF() - cls.df_4 = cls.sc.parallelize( - [ - Row( - a=1, - b=23, - c="pool", - ) - ] - ).toDF() + cls.df_4 = cls.sc.parallelize([Row(a=1, b=23, c="pool")]).toDF() @classmethod def tearDownClass(cls): cls.spark.sparkContext._gateway.shutdown_callback_server() cls.spark.stop() - def RelativeRateOfChangeStrategy(self, df_prev, df_curr, analyzer_func, maxRateDecrease=None, maxRateIncrease=None): + def RelativeRateOfChangeStrategy( + self, df_prev, df_curr, analyzer_func, maxRateDecrease=None, maxRateIncrease=None + ): metricsRepository = InMemoryMetricsRepository(self.spark) previousKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 1000 * 60) VerificationSuite(self.spark).onData(df_prev).useRepository(metricsRepository).saveOrAppendResult( previousKey - ).addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func).run() + ).addAnomalyCheck( + RelativeRateOfChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func + ).run() currKey = ResultKey(self.spark, ResultKey.current_milli_time()) @@ -102,7 +71,9 @@ def RelativeRateOfChangeStrategy(self, df_prev, df_curr, analyzer_func, maxRateD .onData(df_curr) .useRepository(metricsRepository) .saveOrAppendResult(currKey) - .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func) + .addAnomalyCheck( + RelativeRateOfChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func + ) .run() ) @@ -112,13 +83,17 @@ def RelativeRateOfChangeStrategy(self, df_prev, df_curr, analyzer_func, maxRateD print(df.collect()) return df.select("check_status").collect() - def AbsoluteChangeStrategy(self, df_prev, df_curr, analyzer_func, maxRateDecrease=None, maxRateIncrease=None): + def AbsoluteChangeStrategy( + self, df_prev, df_curr, analyzer_func, maxRateDecrease=None, maxRateIncrease=None + ): metricsRepository = InMemoryMetricsRepository(self.spark) previousKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 1000 * 60) VerificationSuite(self.spark).onData(df_prev).useRepository(metricsRepository).saveOrAppendResult( previousKey - ).addAnomalyCheck(AbsoluteChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func).run() + ).addAnomalyCheck( + AbsoluteChangeStrategy(maxRateDecrease, maxRateIncrease, order=1), analyzer_func + ).run() currKey = ResultKey(self.spark, ResultKey.current_milli_time()) @@ -154,7 +129,9 @@ def OnlineNormalStrategy( VerificationSuite(self.spark).onData(df_prev).useRepository(metricsRepository).saveOrAppendResult( previousKey ).addAnomalyCheck( - OnlineNormalStrategy(lowerDeviationFactor, upperDeviationFactor, ignoreStartPercentage, ignoreAnomalies), + OnlineNormalStrategy( + lowerDeviationFactor, upperDeviationFactor, ignoreStartPercentage, ignoreAnomalies + ), analyzer_func, ).run() @@ -241,7 +218,8 @@ def BatchNormalStrategy( .useRepository(metricsRepository) .saveOrAppendResult(currKey) .addAnomalyCheck( - BatchNormalStrategy(lowerDeviationFactor, upperDeviationFactor, includeInterval), analyzer_func + BatchNormalStrategy(lowerDeviationFactor, upperDeviationFactor, includeInterval), + analyzer_func, ) .run() ) @@ -264,7 +242,9 @@ def HoltWinters(self, analyzer_func, test, df_prev, df_curr=None): .onData(df_prev) .useRepository(metricsRepository) .saveOrAppendResult(previousKey) - .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), analyzer_func) + .addAnomalyCheck( + HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), analyzer_func + ) .run() ) @@ -279,9 +259,11 @@ def HoltWinters(self, analyzer_func, test, df_prev, df_curr=None): for x in range(14): previousKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 1000 * 60 * x) - VerificationSuite(self.spark).onData(df_prev).useRepository(metricsRepository).saveOrAppendResult( - previousKey - ).addAnomalyCheck(HoltWinters(MetricInterval.Daily, SeriesSeasonality.Weekly), analyzer_func).run() + VerificationSuite(self.spark).onData(df_prev).useRepository( + metricsRepository + ).saveOrAppendResult(previousKey).addAnomalyCheck( + HoltWinters(MetricInterval.Daily, SeriesSeasonality.Weekly), analyzer_func + ).run() currKey = ResultKey(self.spark, ResultKey.current_milli_time()) currResult = ( @@ -299,10 +281,6 @@ def HoltWinters(self, analyzer_func, test, df_prev, df_curr=None): print(df.collect()) return df.select("check_status").collect() - # TODO - Failing bcoz of - # can not implement breeze.stats.DescriptiveStats, because it is not an interface - # (breeze.stats.DescriptiveStats is in unnamed module of loader 'app') - @pytest.mark.xfail(reason="TODO: breeze.stats.DescriptiveStats is in unnamed module of loader 'app'") def test_BatchNormalStrategy(self): # Interval is inclusive, so meet the requirements upper value is up to 9 @@ -325,24 +303,35 @@ def test_BatchNormalStrategy(self): def test_OnlineNormalStrategy(self): # Sees if table 1 and 2 are within range - self.assertEqual(self.OnlineNormalStrategy(self.df_1, self.df_2, Size()), [Row(check_status="Success")]) + self.assertEqual( + self.OnlineNormalStrategy(self.df_1, self.df_2, Size()), [Row(check_status="Success")] + ) # df 2 does not meet the requirement self.assertEqual( - self.OnlineNormalStrategy(self.df_3, self.df_2, Size(), lowerDeviationFactor=1.0, upperDeviationFactor=0.5), + self.OnlineNormalStrategy( + self.df_3, self.df_2, Size(), lowerDeviationFactor=1.0, upperDeviationFactor=0.5 + ), [Row(check_status="Warning")], ) # df 3 does not meet the requirement self.assertEqual( - self.OnlineNormalStrategy(self.df_2, self.df_3, Size(), lowerDeviationFactor=0.5, upperDeviationFactor=1.0), + self.OnlineNormalStrategy( + self.df_2, self.df_3, Size(), lowerDeviationFactor=0.5, upperDeviationFactor=1.0 + ), [Row(check_status="Warning")], ) # df 3 does not meet the requirement self.assertEqual( self.OnlineNormalStrategy( - self.df_2, self.df_3, Size(), lowerDeviationFactor=0.5, upperDeviationFactor=1.0, ignoreAnomalies=False + self.df_2, + self.df_3, + Size(), + lowerDeviationFactor=0.5, + upperDeviationFactor=1.0, + ignoreAnomalies=False, ), [Row(check_status="Warning")], ) @@ -360,12 +349,7 @@ def test_OnlineNormalStrategy(self): [Row(check_status="Success")], ) - # TODO - Fix in deequ - Failing bcoz of - # can not implement breeze.stats.DescriptiveStats, because it is not an interface - # (breeze.stats.DescriptiveStats is in unnamed module of loader 'app') - @pytest.mark.xfail(reason="TODO: breeze.stats.DescriptiveStats is in unnamed module of loader 'app'") def test_holtWinters(self): - # must have 15 points of data self.assertEqual(self.HoltWinters(Size(), 1, self.df_1), [Row(check_status="Success")]) @@ -376,22 +360,26 @@ def test_holtWinters(self): def test_SimpleThresholdStrategy(self): # Lower bound is 1 upper bound is 6 (Range: 1-6 rows) self.assertEqual( - self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 6.0), [Row(check_status="Success")] + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 6.0), + [Row(check_status="Success")], ) # Lower bound is 1.0 upper bound is 4.0 (df_2, does not meet requirement) self.assertEqual( - self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), [Row(check_status="Warning")] + self.SimpleThresholdStrategy(self.df_1, self.df_2, Size(), 1.0, 4.0), + [Row(check_status="Warning")], ) # Lower bound is 3.0 upper bound is 6.0 (df_1 does not meet requirement) self.assertEqual( - self.SimpleThresholdStrategy(self.df_2, self.df_1, Size(), 3.0, 6.0), [Row(check_status="Warning")] + self.SimpleThresholdStrategy(self.df_2, self.df_1, Size(), 3.0, 6.0), + [Row(check_status="Warning")], ) # Test with another analyzer self.assertEqual( - self.SimpleThresholdStrategy(self.df_2, self.df_1, MinLength("c"), 2.0, 6.0), [Row(check_status="Success")] + self.SimpleThresholdStrategy(self.df_2, self.df_1, MinLength("c"), 2.0, 6.0), + [Row(check_status="Success")], ) def test_AbsoluteChangeStrategy(self): @@ -420,15 +408,19 @@ def test_AbsoluteChangeStrategy(self): ) # MaxRateDecrease = -3.0, data should not decrease by more than .2x - # MaxRateIncrease = 1.0, data should not decrease by more than 1x + # MaxRateIncrease = 1.0, data should not increase by more than 1x self.assertEqual( - self.AbsoluteChangeStrategy(self.df_2, self.df_1, Size(), maxRateDecrease=-3.0, maxRateIncrease=1.0), + self.AbsoluteChangeStrategy( + self.df_2, self.df_1, Size(), maxRateDecrease=-3.0, maxRateIncrease=1.0 + ), [Row(check_status="Success")], ) # Test with another analyzer self.assertEqual( - self.AbsoluteChangeStrategy(self.df_2, self.df_1, MinLength("c"), maxRateDecrease=0.0, maxRateIncrease=4.0), + self.AbsoluteChangeStrategy( + self.df_2, self.df_1, MinLength("c"), maxRateDecrease=0.0, maxRateIncrease=4.0 + ), [Row(check_status="Success")], ) @@ -460,9 +452,11 @@ def test_RelativeRateOfChangeStrategy(self): ) # MaxRateDecrease = .2, data should not decrease by more than .2x - # MaxRateIncrease = 1.0, data should not decrease by more than 1x + # MaxRateIncrease = 1.0, data should not increase by more than 1x self.assertEqual( - self.RelativeRateOfChangeStrategy(self.df_2, self.df_1, Size(), maxRateDecrease=0.2, maxRateIncrease=1.0), + self.RelativeRateOfChangeStrategy( + self.df_2, self.df_1, Size(), maxRateDecrease=0.2, maxRateIncrease=1.0 + ), [Row(check_status="Success")], ) @@ -473,57 +467,95 @@ def test_RelativeRateOfChangeStrategy(self): [Row(check_status="Success")], ) - # Todo: test anomaly detector - # Doesn't work in verification suite + # TODO: test anomaly detector + # Doesn"t work in verification suite def get_anomalyDetector(self, anomaly): anomaly._set_jvm(self._jvm) strategy_jvm = anomaly._anomaly_jvm - - AnomalyDetector._set_jvm(self._jvm, strategy_jvm) - return AnomalyDetector._anomaly_jvm + raise NotImplementedError + # AnomalyDetector._set_jvm(self._jvm, strategy_jvm) + # return AnomalyDetector._anomaly_jvm @pytest.mark.skip("Not implemented yet!") def test_anomalyDetector(self): self.get_anomalyDetector(SimpleThresholdStrategy(1.0, 3.0)) - # - # def test_RelativeRateOfChangeStrategy(self): - # metricsRepository = InMemoryMetricsRepository(self.spark) - # yesterdaysKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 60* 1000) - # - # self.df_yesterday = self.sc.parallelize([ - # Row(a=3, b=0, ), - # Row(a=3, b=5, )]).toDF() - # - # # MaxRateIncrease = 2, data should not increase by more than 2x - # prevResult = VerificationSuite(self.spark).onData(self.df_yesterday) \ - # .useRepository(metricsRepository) \ - # .saveOrAppendResult(yesterdaysKey) \ - # .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) \ - # .run() - # - # todaysKey = ResultKey(self.spark, ResultKey.current_milli_time()) - # - # self.df_today = self.sc.parallelize([ - # Row(a=3, b=0, ), - # Row(a=3, b=5, ), - # Row(a=100, b=5,), - # Row(a=2, b=30, ), - # Row(a=10, b=5)]).toDF() - # - # currResult = VerificationSuite(self.spark).onData(self.df_today) \ - # .useRepository(metricsRepository) \ - # .saveOrAppendResult(todaysKey) \ - # .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) \ - # .run() - # - # print(VerificationResult.successMetricsAsJson(self.spark, currResult)) - # - # df = VerificationResult.checkResultsAsDataFrame(self.spark, currResult) - # - # print(df.collect()) - # print(df.select('check_status').collect()) - # - # if (currResult.status != "Success"): - # print("Anomaly detected in the Size() metric!") - # metricsRepository.load().forAnalyzers([Size()]).getSuccessMetricsAsDataFrame().show() + def test_RelativeRateOfChangeStrategy(self): + metricsRepository = InMemoryMetricsRepository(self.spark) + yesterdaysKey = ResultKey(self.spark, ResultKey.current_milli_time() - 24 * 60 * 60 * 1000) + + self.df_yesterday = self.sc.parallelize( + [ + Row(a=3, b=0), + Row(a=3, b=5), + ] + ).toDF() + + # MaxRateIncrease = 2, data should not increase by more than 2x + _ = ( + VerificationSuite(self.spark) + .onData(self.df_yesterday) + .useRepository(metricsRepository) + .saveOrAppendResult(yesterdaysKey) + .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) + .run() + ) + + todaysKey = ResultKey(self.spark, ResultKey.current_milli_time()) + + self.df_today = self.sc.parallelize( + [ + Row(a=3, b=0), + Row(a=3, b=5), + Row(a=100, b=5), + Row(a=2, b=30), + Row(a=10, b=5), + ] + ).toDF() + + currResult = ( + VerificationSuite(self.spark) + .onData(self.df_today) + .useRepository(metricsRepository) + .saveOrAppendResult(todaysKey) + .addAnomalyCheck(RelativeRateOfChangeStrategy(maxRateIncrease=2.0), Size()) + .run() + ) + + success_metrics_json = VerificationResult.successMetricsAsJson(self.spark, currResult) + print(success_metrics_json) + self.assertEqual( + success_metrics_json, [{"entity": "Dataset", "instance": "*", "name": "Size", "value": 5.0}] + ) + + df = VerificationResult.checkResultsAsDataFrame(self.spark, currResult) + df.show(truncate=False) + results = df.select("check_status", "constraint_status", "constraint_message").collect() + self.assertEqual( + results, + [ + Row( + check_status="Warning", + constraint_status="Failure", + constraint_message="Value: 5.0 does not meet the constraint requirement!", + ) + ], + ) + + self.assertEqual(currResult.status, "Warning") + print("Anomaly detected in the Size() metric!") + metrics_df = ( + metricsRepository.load() + .forAnalyzers([Size()]) + .getSuccessMetricsAsDataFrame() + .sort("dataset_date") + ) + metrics_df.show() + metrics = metrics_df.select("entity", "name", "value").collect() + self.assertEqual( + metrics, + [ + Row(entity="Dataset", name="Size", value=2.0), + Row(entity="Dataset", name="Size", value=5.0), + ], + )