diff --git a/glue/flagging_script_glue/flagging.py b/glue/flagging_script_glue/flagging.py index dc61ee84..47a1209c 100644 --- a/glue/flagging_script_glue/flagging.py +++ b/glue/flagging_script_glue/flagging.py @@ -186,238 +186,167 @@ def pricing_info( df: pd.DataFrame, permut: tuple, groups: tuple, condos: bool ) -> pd.DataFrame: """ - Creates information about whether the price is an outlier, and its movement. - Also fetches the sandard deviation for the record. - pricing is whether it is a high/low outlier and whether it is a price swing. - which_price is whether it is the raw price, price/sqft or both that are outliers. + Computes pricing deviations and computes lower/upper standard + deviation thresholds based on group means and standard deviations. + Inputs: df (pd.DataFrame): dataframe of sales permut (tuple): tuple of standard deviation boundaries. Ex: (2,2) is 2 std away on both sides. + groups: (tuple): Our statistical grouping columns condos (bool): Specifies whether we are running function for condos or residential Outputs: df (pd.DataFrame): dataframe with 3 extra columns of price info. """ - group_string = create_group_string(groups, "_") - - columns_to_log = ["meta_sale_price"] + group_str = create_group_string(groups, "_") + # Log-transform the columns (price and, if applicable, price per sqft) + cols_to_log = ["meta_sale_price"] + ([] if condos else ["sv_price_per_sqft"]) + df = log_transform(df, cols_to_log) + + # Persist group-level statistics (for raw price and price per sqft) + df["group_mean"] = df.groupby(list(groups))["meta_sale_price"].transform("mean") + df["group_std"] = df.groupby(list(groups))["meta_sale_price"].transform("std") if not condos: - columns_to_log.append("sv_price_per_sqft") - df = log_transform(df, columns_to_log) - - prices = [ - f"sv_price_deviation_{group_string}", - f"sv_cgdr_deviation_{group_string}", - ] - if not condos: - prices.insert(1, f"sv_price_per_sqft_deviation_{group_string}") - - # Persist standard deviation per group - group_std = ( - df.groupby(list(groups), group_keys=False)["meta_sale_price"] - .std(ddof=0) - .reset_index() - ) - group_std = group_std.rename(columns={"meta_sale_price": "group_std"}) - df = df.merge(group_std, on=groups) - - # Add group mean columns - group_mean = ( - df.groupby(list(groups), group_keys=False)["meta_sale_price"] - .mean() - .reset_index() - ) - group_mean = group_mean.rename(columns={"meta_sale_price": "group_mean"}) - df = df.merge(group_mean, on=groups) - - if not condos: - # Persist group sqft standard deviation and group mean - group_sqft_std = ( - df.groupby(list(groups), group_keys=False)["sv_price_per_sqft"] - .std(ddof=0) - .reset_index() - ) - group_sqft_std = group_sqft_std.rename( - columns={"sv_price_per_sqft": "group_sqft_std"} - ) - df = df.merge(group_sqft_std, on=groups) - - group_sqft_mean = ( - df.groupby(list(groups), group_keys=False)["sv_price_per_sqft"] - .mean() - .reset_index() + df["group_sqft_mean"] = df.groupby(list(groups))["sv_price_per_sqft"].transform( + "mean" ) - group_sqft_mean = group_sqft_mean.rename( - columns={"sv_price_per_sqft": "group_sqft_mean"} + df["group_sqft_std"] = df.groupby(list(groups))["sv_price_per_sqft"].transform( + "std" ) - df = df.merge(group_sqft_mean, on=groups) - - # Calculate standard deviations - df[f"sv_price_deviation_{group_string}"] = df.groupby( - list(groups), group_keys=False - )["meta_sale_price"].apply(z_normalize_groupby) + # Compute z-score deviation columns within groups + df[f"sv_price_deviation_{group_str}"] = df.groupby(list(groups))[ + "meta_sale_price" + ].apply(z_normalize_groupby) if not condos: - df[f"sv_price_per_sqft_deviation_{group_string}"] = df.groupby( - list(groups), group_keys=False - )["sv_price_per_sqft"].apply(z_normalize_groupby) - - df[f"sv_cgdr_deviation_{group_string}"] = df.groupby( - list(groups), group_keys=False - )["sv_cgdr"].apply(z_normalize_groupby) - - holds = get_thresh(df, prices, permut, groups) - df["sv_pricing"] = df.apply(price_column, args=(holds, groups, condos), axis=1) + df[f"sv_price_per_sqft_deviation_{group_str}"] = df.groupby(list(groups))[ + "sv_price_per_sqft" + ].apply(z_normalize_groupby) + df[f"sv_cgdr_deviation_{group_str}"] = df.groupby(list(groups))["sv_cgdr"].apply( + z_normalize_groupby + ) + # Vectorized per-row lower and upper thresholds (mean ± std * multiplier) + for col in [f"sv_price_deviation_{group_str}", f"sv_cgdr_deviation_{group_str}"]: + df[f"{col}_lower"] = df.groupby(list(groups))[col].transform("mean") - permut[ + 0 + ] * df.groupby(list(groups))[col].transform("std") + df[f"{col}_upper"] = df.groupby(list(groups))[col].transform("mean") + permut[ + 1 + ] * df.groupby(list(groups))[col].transform("std") if not condos: - df["sv_which_price"] = df.apply(which_price, args=(holds, groups), axis=1) - + col = f"sv_price_per_sqft_deviation_{group_str}" + df[f"{col}_lower"] = df.groupby(list(groups))[col].transform("mean") - permut[ + 0 + ] * df.groupby(list(groups))[col].transform("std") + df[f"{col}_upper"] = df.groupby(list(groups))[col].transform("mean") + permut[ + 1 + ] * df.groupby(list(groups))[col].transform("std") + + # Determine pricing outlier type and (if applicable) which price measure is flagged + df["sv_pricing"] = df.apply(lambda row: price_column(row, groups, condos), axis=1) + if not condos: + df["sv_which_price"] = df.apply(lambda row: which_price(row, groups), axis=1) return df -def which_price(row: pd.Series, thresholds: dict, groups: tuple) -> str: +def which_price(row: pd.Series, groups: tuple) -> str: """ - Determines whether sale_price, price_per_sqft, or both are outliers, - and returns a string resembling it. + Determines which price measure (raw, per sqft, or both) is flagged as an outlier + by comparing deviation values with per-row thresholds. Inputs: - thresholds (dict): dict of thresholds from get_thresh + groups (tuple): tuple of columns used for statistical grouping Outputs: value (str): string saying which of these are outliers. """ - value = "Non-outlier" - group_string = create_group_string(groups, "_") - key = tuple(row[group] for group in groups) - - if thresholds.get(f"sv_price_deviation_{group_string}").get(key) and thresholds.get( - f"sv_price_per_sqft_deviation_{group_string}" - ).get(key): - s_std, *s_std_range = thresholds.get(f"sv_price_deviation_{group_string}").get( - key - ) - s_lower, s_upper = s_std_range - sq_std, *sq_std_range = thresholds.get( - f"sv_price_per_sqft_deviation_{group_string}" - ).get(key) - sq_lower, sq_upper = sq_std_range - if not between_two_numbers( - row[f"sv_price_deviation_{group_string}"], s_lower, s_upper - ) and between_two_numbers( - row[f"sv_price_per_sqft_deviation_{group_string}"], sq_lower, sq_upper - ): - value = "(raw)" - elif between_two_numbers( - row[f"sv_price_deviation_{group_string}"], s_lower, s_upper - ) and not between_two_numbers( - row[f"sv_price_per_sqft_deviation_{group_string}"], sq_lower, sq_upper - ): - value = "(sqft)" - elif not between_two_numbers( - row[f"sv_price_deviation_{group_string}"], s_lower, s_upper - ) and not between_two_numbers( - row[f"sv_price_per_sqft_deviation_{group_string}"], sq_lower, sq_upper - ): - value = "(raw & sqft)" - - return value + group_str = create_group_string(groups, "_") + raw_val = row[f"sv_price_deviation_{group_str}"] + raw_lower = row[f"sv_price_deviation_{group_str}_lower"] + raw_upper = row[f"sv_price_deviation_{group_str}_upper"] + raw_out = not between_two_numbers(raw_val, raw_lower, raw_upper) + + if f"sv_price_per_sqft_deviation_{group_str}" in row: + sqft_val = row[f"sv_price_per_sqft_deviation_{group_str}"] + sqft_lower = row[f"sv_price_per_sqft_deviation_{group_str}_lower"] + sqft_upper = row[f"sv_price_per_sqft_deviation_{group_str}_upper"] + sqft_out = not between_two_numbers(sqft_val, sqft_lower, sqft_upper) + else: + sqft_out = False + + if raw_out and not sqft_out: + return "(raw)" + elif not raw_out and sqft_out: + return "(sqft)" + elif raw_out and sqft_out: + return "(raw & sqft)" + else: + return "Non-outlier" def between_two_numbers(num: int or float, a: int or float, b: int or float) -> bool: return a < num < b -def price_column(row: pd.Series, thresholds: dict, groups: tuple, condos: bool) -> str: +def price_column(row: pd.Series, groups: tuple, condos: bool) -> str: """ + Determines whether the record is a high or low price outlier and, if applicable, + whether it exhibits a price swing. Comparisons are made by checking the record's + deviation against its per-row lower/upper threshold. + Determines whether the record is a high price outlier or a low price outlier. - If the record is also a price change outlier, than add 'swing' to the string. + If the record is also a price change outlier, than 'swing' is added to the string. Inputs: - thresholds (dict): dict of standard deviation thresholds from get_thresh() + groups: (tuple) Columns for statistical grouping condos (bool): Specifies whether we are running function for condos or residential Outputs: value (str): string showing what kind of price outlier the record is. """ + group_str = create_group_string(groups, "_") value = "Not price outlier" - price = False - - group_string = create_group_string(groups, "_") - key = tuple(row[group] for group in groups) + price_flag = False + raw_val = row[f"sv_price_deviation_{group_str}"] + raw_lower = row[f"sv_price_deviation_{group_str}_lower"] + raw_upper = row[f"sv_price_deviation_{group_str}_upper"] - if condos == True: - if thresholds.get(f"sv_price_deviation_{group_string}").get(key): - s_std, *s_std_range = thresholds.get( - f"sv_price_deviation_{group_string}" - ).get(key) - s_lower, s_upper = s_std_range - - if row[f"sv_price_deviation_{group_string}"] > s_upper: - value = "High price" - price = True - elif row[f"sv_price_deviation_{group_string}"] < s_lower: - value = "Low price" - price = True - - if ( - price - and pd.notnull(row[f"sv_cgdr_deviation_{group_string}"]) - and thresholds.get(f"sv_cgdr_deviation_{group_string}").get(key) + if condos: + if raw_val > raw_upper: + value = "High price" + price_flag = True + elif raw_val < raw_lower: + value = "Low price" + price_flag = True + if price_flag and pd.notnull(row.get(f"sv_cgdr_deviation_{group_str}")): + cgdr_val = row[f"sv_cgdr_deviation_{group_str}"] + cgdr_lower = row[f"sv_cgdr_deviation_{group_str}_lower"] + cgdr_upper = row[f"sv_cgdr_deviation_{group_str}_upper"] + if row["sv_price_movement"] == "Away from mean" and not between_two_numbers( + cgdr_val, cgdr_lower, cgdr_upper ): - # not every combo will have pct change info so we need this check - p_std, *p_std_range = thresholds.get( - f"sv_cgdr_deviation_{group_string}" - ).get(key) - - p_lower, p_upper = p_std_range - if row[ - "sv_price_movement" - ] == "Away from mean" and not between_two_numbers( - row[f"sv_cgdr_deviation_{group_string}"], p_lower, p_upper - ): - value += " swing" - + value += " swing" else: - if thresholds.get(f"sv_price_deviation_{group_string}").get( - key - ) and thresholds.get(f"sv_price_per_sqft_deviation_{group_string}").get(key): - s_std, *s_std_range = thresholds.get( - f"sv_price_deviation_{group_string}" - ).get(key) - s_lower, s_upper = s_std_range - - sq_std, *sq_std_range = thresholds.get( - f"sv_price_per_sqft_deviation_{group_string}" - ).get(key) - sq_lower, sq_upper = sq_std_range - - if ( - row[f"sv_price_deviation_{group_string}"] > s_upper - or row[f"sv_price_per_sqft_deviation_{group_string}"] > sq_upper - ): - value = "High price" - price = True - elif ( - row[f"sv_price_deviation_{group_string}"] < s_lower - or row[f"sv_price_per_sqft_deviation_{group_string}"] < sq_lower - ): - value = "Low price" - price = True - - if ( - price - and pd.notnull(row[f"sv_cgdr_deviation_{group_string}"]) - and thresholds.get(f"sv_cgdr_deviation_{group_string}").get(key) - ): - # not every combo will have pct change info so we need this check - p_std, *p_std_range = thresholds.get( - f"sv_cgdr_deviation_{group_string}" - ).get(key) - - p_lower, p_upper = p_std_range + raw_out = raw_val > raw_upper or raw_val < raw_lower + sqft_val = row[f"sv_price_per_sqft_deviation_{group_str}"] + sqft_lower = row[f"sv_price_per_sqft_deviation_{group_str}_lower"] + sqft_upper = row[f"sv_price_per_sqft_deviation_{group_str}_upper"] + sqft_out = sqft_val > sqft_upper or sqft_val < sqft_lower + + if raw_out or sqft_out: + if raw_out: + value = "High price" if raw_val > raw_upper else "Low price" + elif sqft_out: + value = "High price" if sqft_val > sqft_upper else "Low price" + price_flag = True + + if price_flag and pd.notnull(row.get(f"sv_cgdr_deviation_{group_str}")): + cgdr_val = row[f"sv_cgdr_deviation_{group_str}"] + cgdr_lower = row[f"sv_cgdr_deviation_{group_str}_lower"] + cgdr_upper = row[f"sv_cgdr_deviation_{group_str}_upper"] if row[ "sv_price_movement" ] == "Away from mean" and not between_two_numbers( - row[f"sv_cgdr_deviation_{group_string}"], p_lower, p_upper + cgdr_val, cgdr_lower, cgdr_upper ): value += " swing" - return value @@ -675,49 +604,6 @@ def check_days(df: pd.DataFrame, threshold: int) -> pd.DataFrame: return df -def get_thresh(df: pd.DataFrame, cols: list, permut: tuple, groups: tuple) -> dict: - """ - Creates a nested dictionary where the top level key is a column - and the 2nd-level key is a (township, class) combo. - Ex: stds['sale_price'][76, 203] - Needed in order to keep track of specific thresholds for each township/class combo. - Theoretically each std should be 1(because of z_normalization), but in practical terms - it is in a very very small range around 1, so using a uniform cutoff of 2 and -2 - loses us some precision. - - We also want to allow for some flexibility in how the thresholds are calculated; - and this function allows for more flexbility in the event of future changes. - Inputs: - df (pd.DataFrame): Dataframe to create dictionary from. - cols (list): list of columns to get standard deviations for. - permut (tuple): standard deviation range for lower_limit and upper_limit - First term is how many stndard deviations away on the left - Second term is how many standard deviations away on the right. - Outputs: - stds (dict): nested dictionary of std deviations for all columns - from DataFrame. - """ - stds = {} - - for col in cols: - df[col] = df[col].astype(float) - grouped = df.dropna(subset=list(groups) + [col]).groupby(list(groups))[col] - lower_limit = grouped.mean() - (grouped.std(ddof=0) * permut[0]) - upper_limit = grouped.mean() + (grouped.std(ddof=0) * permut[1]) - std = grouped.std(ddof=0) - lower_limit = lower_limit.to_dict() - upper_limit = upper_limit.to_dict() - std = std.to_dict() - - limits = { - x: (std.get(x, 0), lower_limit.get(x, 0), upper_limit.get(x, 0)) - for x in set(std).union(upper_limit, lower_limit) - } - stds[col] = limits - - return stds - - def log_transform(df: pd.DataFrame, columns: list) -> pd.DataFrame: """ Apply log transformation on given column set.