From a2ff3b24725e09b9b06c51f396c8a5067e26adaf Mon Sep 17 00:00:00 2001 From: Simmar Kalsi Date: Fri, 10 Apr 2026 20:34:07 +0000 Subject: [PATCH 1/3] fix: metrics for remaining segment types --- pytrendy/post_processing/segments_analyse.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pytrendy/post_processing/segments_analyse.py b/pytrendy/post_processing/segments_analyse.py index 26308de..71c53dc 100644 --- a/pytrendy/post_processing/segments_analyse.py +++ b/pytrendy/post_processing/segments_analyse.py @@ -60,6 +60,12 @@ def analyse_segments(df: pd.DataFrame, value_col: str, segments: list[dict]) -> segment_enhanced['pct_change'] = ( float(val_min / val_max - 1) if val_max != 0 else np.nan ) + # I think this should capture all other segment types + else: + abs_change = float(abs(val_max - val_min)) + segment_enhanced['change'] = abs_change + mean_val = df_segment[value_col].mean() + segment_enhanced['pct_change'] = (float(abs_change / mean_val * 100) if mean_val != 0 else np.nan) # Calculate days & cumulative total change days = (pd.to_datetime(segment['end']) - pd.to_datetime(segment['start'])).days @@ -68,8 +74,7 @@ def analyse_segments(df: pd.DataFrame, value_col: str, segments: list[dict]) -> segment_enhanced['days'] = days # set days # Calculate cumulative total change - if segment['direction'] in ['Up', 'Down']: - segment_enhanced['total_change'] = float(df_segment[value_col].diff().sum()) + segment_enhanced['total_change'] = float(df_segment[value_col].diff().sum()) # Calculate Signal to Noise Ratio signal_power = np.mean(df_segment['signal']**2) @@ -83,7 +88,7 @@ def analyse_segments(df: pd.DataFrame, value_col: str, segments: list[dict]) -> # Rank change, by steepest to shallowest change sorted_segments = sorted(segments_enhanced, key=lambda x: abs(x.get('total_change', 0)), reverse=True) - sorted_trends = [seg for seg in sorted_segments if 'total_change' in seg and abs(seg['total_change']) > 0] + sorted_trends = [seg for seg in sorted_segments if 'total_change' in seg] for i, seg in enumerate(sorted_trends): j = seg['time_index'] - 1 segments_enhanced[j]['change_rank'] = int(i+1) From 3df5f18224125acb49374aff91b6c6c59c4cebcc Mon Sep 17 00:00:00 2001 From: Simmar Kalsi Date: Fri, 10 Apr 2026 22:39:59 +0000 Subject: [PATCH 2/3] fix: classify flat/noise segments and avoid false best selection --- pytrendy/io/results_pytrendy.py | 7 ++++--- .../segments_refine/trend_classify.py | 14 ++++++++++---- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/pytrendy/io/results_pytrendy.py b/pytrendy/io/results_pytrendy.py index 11e1b96..aead8e8 100644 --- a/pytrendy/io/results_pytrendy.py +++ b/pytrendy/io/results_pytrendy.py @@ -33,12 +33,13 @@ def set_best(self) -> None: - `results.best` returns best based on `total_change` (cumulative sum of differences). - Identifies the best trend segment based on steepness and duration. - - The segment with the lowest `change_rank` is selected as the best. + - The segment with the lowest `change_rank` among Up/Down directions is selected as the best. """ - if len(self.segments) == 0 or not any('change_rank' in segment for segment in self.segments): + trends = [seg for seg in self.segments if seg.get('direction') in ['Up', 'Down']] + if not trends: self.best = None return - self.best = min(self.segments, key=lambda x: x.get('change_rank', math.inf)) + self.best = min(trends, key=lambda x: x.get('change_rank', math.inf)) def set_summary(self) -> None: """ diff --git a/pytrendy/post_processing/segments_refine/trend_classify.py b/pytrendy/post_processing/segments_refine/trend_classify.py index 9995a01..6b87eb6 100644 --- a/pytrendy/post_processing/segments_refine/trend_classify.py +++ b/pytrendy/post_processing/segments_refine/trend_classify.py @@ -33,9 +33,6 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l for i, segment in enumerate(segments): - if segment['direction'] not in ['Up', 'Down']: - continue - # Assume some padding for abrupt cases start = pd.to_datetime(segment['start']) - pd.Timedelta(days=2) end = pd.to_datetime(segment['end']) + pd.Timedelta(days=2) @@ -56,7 +53,7 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l else: segments_classified[i]['trend_class'] = 'abrupt' - if segment['direction'] == 'Down': + elif segment['direction'] == 'Down': _, cost_gradual_down, _, _, _ = dtw(df_segment['value_cleaned'], df_class['gradual_down']) _, cost_abrupt_down, _, _, _ = dtw(df_segment['value_cleaned'], df_class['abrupt_down']) @@ -70,6 +67,15 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l else: segments_classified[i]['trend_class'] = 'abrupt' + elif segment['direction'] == 'Flat': + segments_classified[i]['trend_class'] = 'flat' + + elif segment['direction'] == 'Noise': + segments_classified[i]['trend_class'] = 'noise' + + else: + segments_classified[i]['trend_class'] = 'unknown' + # Final condition, hard-classify graduals as abrupt if too short segment_length = (pd.to_datetime(segment['end']) - pd.to_datetime(segment['start'])).days if segment_length < 3: From fe88cd8584af8a1e4ff352a7bad460bc54badea1 Mon Sep 17 00:00:00 2001 From: Simmar Kalsi Date: Sat, 11 Apr 2026 16:36:47 +0000 Subject: [PATCH 3/3] fix: add trend class for non-up/down segments fix: update artifact cleanup to set trend class instead of deleting for non-up/down segments fix: update set_best to only consider up/down segments --- .gitignore | 1 + pytrendy/io/results_pytrendy.py | 7 +++--- pytrendy/post_processing/segments_analyse.py | 3 +-- .../segments_refine/artifact_cleanup.py | 4 ++-- .../segments_refine/trend_classify.py | 23 +++++++++++-------- 5 files changed, 20 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 551dd09..34c9c58 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ htmlcov/ .pytest_cache/ tests/testtemp.py site/ +poetry.lock \ No newline at end of file diff --git a/pytrendy/io/results_pytrendy.py b/pytrendy/io/results_pytrendy.py index aead8e8..2212128 100644 --- a/pytrendy/io/results_pytrendy.py +++ b/pytrendy/io/results_pytrendy.py @@ -33,13 +33,12 @@ def set_best(self) -> None: - `results.best` returns best based on `total_change` (cumulative sum of differences). - Identifies the best trend segment based on steepness and duration. - - The segment with the lowest `change_rank` among Up/Down directions is selected as the best. + - The segment with the lowest `change_rank` is selected as the best. """ - trends = [seg for seg in self.segments if seg.get('direction') in ['Up', 'Down']] - if not trends: + if len(self.segments) == 0 or not any('change_rank' in segment for segment in [s for s in self.segments if s.get('direction') in ['Up', 'Down']]): self.best = None return - self.best = min(trends, key=lambda x: x.get('change_rank', math.inf)) + self.best = min(self.segments, key=lambda x: x.get('change_rank', math.inf)) def set_summary(self) -> None: """ diff --git a/pytrendy/post_processing/segments_analyse.py b/pytrendy/post_processing/segments_analyse.py index 71c53dc..321d190 100644 --- a/pytrendy/post_processing/segments_analyse.py +++ b/pytrendy/post_processing/segments_analyse.py @@ -88,8 +88,7 @@ def analyse_segments(df: pd.DataFrame, value_col: str, segments: list[dict]) -> # Rank change, by steepest to shallowest change sorted_segments = sorted(segments_enhanced, key=lambda x: abs(x.get('total_change', 0)), reverse=True) - sorted_trends = [seg for seg in sorted_segments if 'total_change' in seg] - for i, seg in enumerate(sorted_trends): + for i, seg in enumerate(sorted_segments): j = seg['time_index'] - 1 segments_enhanced[j]['change_rank'] = int(i+1) diff --git a/pytrendy/post_processing/segments_refine/artifact_cleanup.py b/pytrendy/post_processing/segments_refine/artifact_cleanup.py index 6a09d6b..cbc9bba 100644 --- a/pytrendy/post_processing/segments_refine/artifact_cleanup.py +++ b/pytrendy/post_processing/segments_refine/artifact_cleanup.py @@ -311,11 +311,11 @@ def has_partial_overlap_prev(segment: dict, segment_prev: dict) -> bool: # Reclassify as noise if either edge cases met if too_noisy or (is_abrupt_near_noise and not trend_ends_too_close) or is_small_gradual_in_noise: segment['direction'] = 'Noise' - if 'trend_class' in segment: del segment['trend_class'] + segment['trend_class'] = 'noise' if trend_ends_too_close or trend_too_small or trend_too_flat: segment['direction'] = 'Flat' - if 'trend_class' in segment: del segment['trend_class'] + segment['trend_class'] = 'flat' segments_refined.append(segment) diff --git a/pytrendy/post_processing/segments_refine/trend_classify.py b/pytrendy/post_processing/segments_refine/trend_classify.py index 6b87eb6..d49d305 100644 --- a/pytrendy/post_processing/segments_refine/trend_classify.py +++ b/pytrendy/post_processing/segments_refine/trend_classify.py @@ -33,6 +33,9 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l for i, segment in enumerate(segments): + if segment['direction'] not in ['Up', 'Down']: + continue + # Assume some padding for abrupt cases start = pd.to_datetime(segment['start']) - pd.Timedelta(days=2) end = pd.to_datetime(segment['end']) + pd.Timedelta(days=2) @@ -53,7 +56,7 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l else: segments_classified[i]['trend_class'] = 'abrupt' - elif segment['direction'] == 'Down': + if segment['direction'] == 'Down': _, cost_gradual_down, _, _, _ = dtw(df_segment['value_cleaned'], df_class['gradual_down']) _, cost_abrupt_down, _, _, _ = dtw(df_segment['value_cleaned'], df_class['abrupt_down']) @@ -67,18 +70,18 @@ def classify_trends(df: pd.DataFrame, value_col: str, segments: list[dict]) -> l else: segments_classified[i]['trend_class'] = 'abrupt' - elif segment['direction'] == 'Flat': - segments_classified[i]['trend_class'] = 'flat' - - elif segment['direction'] == 'Noise': - segments_classified[i]['trend_class'] = 'noise' - - else: - segments_classified[i]['trend_class'] = 'unknown' - # Final condition, hard-classify graduals as abrupt if too short segment_length = (pd.to_datetime(segment['end']) - pd.to_datetime(segment['start'])).days if segment_length < 3: segments_classified[i]['trend_class'] = 'abrupt' + for segment in segments_classified: + if 'trend_class' not in segment: + if segment['direction'] == 'Flat': + segment['trend_class'] = 'flat' + elif segment['direction'] == 'Noise': + segment['trend_class'] = 'noise' + else: + segment['trend_class'] = 'unknown' + return segments_classified