diff --git a/glue/flagging_script_glue/flagging.py b/glue/flagging_script_glue/flagging.py index 34010d8a..8239befd 100644 --- a/glue/flagging_script_glue/flagging.py +++ b/glue/flagging_script_glue/flagging.py @@ -83,12 +83,8 @@ def outlier_taxonomy(df: pd.DataFrame, permut: tuple, groups: tuple, condos: boo """ df = check_days(df, SHORT_TERM_OWNER_THRESHOLD) - df = pricing_info(df, permut, groups, condos=condos) - df = outlier_type(df, condos=condos) - df = outlier_flag(df) - df = special_flag(df) return df @@ -270,26 +266,6 @@ def pricing_info( return df -def special_flag(df: pd.DataFrame) -> pd.DataFrame: - """ - Creates column that checks whether there is a special flag for this record. - Inputs: - df (pd.DataFrame): dataframe to add flags onto - Outputs: - df (pd.DataFrame): dataframe with 'special_flags' column - """ - cond = [ - (df["sv_name_match"] != "No match"), - (df["sv_short_owner"] == "Short-term owner"), - (df["sv_transaction_type"] == "legal_entity-legal_entity"), - ] - labels = ["Family sale", "Home flip sale", "Non-person sale"] - - df["sv_special_flags"] = np.select(cond, labels, default="Not special") - - return df - - def which_price(row: pd.Series, thresholds: dict, groups: tuple) -> str: """ Determines whether sale_price, price_per_sqft, or both are outliers, @@ -598,11 +574,10 @@ def get_sale_counts(dups: pd.DataFrame) -> pd.DataFrame: v_counts = ( dups.pin.value_counts() .reset_index() - .rename(columns={"index": "pin", "pin": "sv_sale_dup_counts"}) + .rename(columns={"count": "sv_sale_dup_counts"}) ) - # Explicitly specify the merging columns - dups = pd.merge(dups, v_counts, on="pin") + dups = pd.merge(dups, v_counts) return dups @@ -767,131 +742,77 @@ def z_normalize_groupby(s: pd.Series): def outlier_type(df: pd.DataFrame, condos: bool) -> pd.DataFrame: """ - Runs np.select that creates an outlier taxonomy. + This function create indicator columns for each distinct outlier type between price + and characteristic outliers. These columns are prefixed with 'sv_ind_'. + Inputs: - df (pd.DataFrame): dataframe with necessary columns created from previous functions. + df (pd.DataFrame): Dataframe Outputs: - df (pd.DataFrame): dataframe with 'sv_outlier_type' column. + df (pd.DataFrame): Dataframe with indicator columns for each flag type """ - if condos == True: - conditions = [ - (df["sv_short_owner"] == "Short-term owner") - & (df["sv_pricing"].str.contains("High")), - (df["sv_name_match"] != "No match") - & (df["sv_pricing"].str.contains("High")), - ( - df[["sv_buyer_category", "sv_seller_category"]] - .eq("legal_entity") - .any(axis=1) - ) - & (df["sv_pricing"].str.contains("High")), - (df["sv_anomaly"] == "Outlier") & (df["sv_pricing"].str.contains("High")), - (df["sv_pricing"].str.contains("High price swing")), - (df["sv_pricing"].str.contains("High")), - (df["sv_short_owner"] == "Short-term owner") - & (df["sv_pricing"].str.contains("Low")), - (df["sv_name_match"] != "No match") - & (df["sv_pricing"].str.contains("Low")), - ( - df[["sv_buyer_category", "sv_seller_category"]] - .eq("legal_entity") - .any(axis=1) - ) - & (df["sv_pricing"].str.contains("Low")), - (df["sv_anomaly"] == "Outlier") & (df["sv_pricing"].str.contains("Low")), - (df["sv_pricing"].str.contains("Low price swing")), - (df["sv_pricing"].str.contains("Low")), + + char_conditions = [ + df["sv_short_owner"] == "Short-term owner", + df["sv_name_match"] != "No match", + df[["sv_buyer_category", "sv_seller_category"]].eq("legal_entity").any(axis=1), + df["sv_anomaly"] == "Outlier", + df["sv_pricing"].str.contains("High price swing") + | df["sv_pricing"].str.contains("Low price swing"), + ] + + # Define labels for characteristic-based reasons + char_labels = [ + "sv_ind_char_short_term_owner", + "sv_ind_char_family_sale", + "sv_ind_char_non_person_sale", + "sv_ind_char_statistical_anomaly", + "sv_ind_char_price_swing_homeflip", + ] + + if condos: + # Define conditions for price-based reasons + price_conditions = [ + df["sv_pricing"].str.contains("High"), + df["sv_pricing"].str.contains("Low"), ] - labels = [ - "Home flip sale (high)", - "Family sale (high)", - "Non-person sale (high)", - "Anomaly (high)", - "High price swing", - "High price (raw)", - "Home flip sale (low)", - "Family sale (low)", - "Non-person sale (low)", - "Anomaly (low)", - "Low price swing", - "Low price (raw)", + # Define labels for price-based reasons + price_labels = [ + "sv_ind_price_high_price", + "sv_ind_price_low_price", ] else: - conditions = [ - (df["sv_short_owner"] == "Short-term owner") - & (df["sv_pricing"].str.contains("High")), - (df["sv_name_match"] != "No match") - & (df["sv_pricing"].str.contains("High")), + # Define conditions for price-based reasons + price_conditions = [ ( - df[["sv_buyer_category", "sv_seller_category"]] - .eq("legal_entity") - .any(axis=1) - ) - & (df["sv_pricing"].str.contains("High")), - (df["sv_anomaly"] == "Outlier") & (df["sv_pricing"].str.contains("High")), - (df["sv_pricing"].str.contains("High price swing")), - (df["sv_pricing"].str.contains("High")) - & (df["sv_which_price"] == "(raw & sqft)"), - (df["sv_pricing"].str.contains("High")) & (df["sv_which_price"] == "(raw)"), - (df["sv_pricing"].str.contains("High")) - & (df["sv_which_price"] == "(sqft)"), - (df["sv_short_owner"] == "Short-term owner") - & (df["sv_pricing"].str.contains("Low")), - (df["sv_name_match"] != "No match") - & (df["sv_pricing"].str.contains("Low")), + df["sv_pricing"].str.contains("High") + & (df["sv_which_price"].str.contains("raw")) + ), ( - df[["sv_buyer_category", "sv_seller_category"]] - .eq("legal_entity") - .any(axis=1) - ) - & (df["sv_pricing"].str.contains("Low")), - (df["sv_anomaly"] == "Outlier") & (df["sv_pricing"].str.contains("Low")), - (df["sv_pricing"].str.contains("Low price swing")), + df["sv_pricing"].str.contains("Low") + & (df["sv_which_price"].str.contains("raw")) + ), + (df["sv_pricing"].str.contains("High")) + & (df["sv_which_price"].str.contains("sqft")), (df["sv_pricing"].str.contains("Low")) - & (df["sv_which_price"] == "(raw & sqft)"), - (df["sv_pricing"].str.contains("Low")) & (df["sv_which_price"] == "(raw)"), - (df["sv_pricing"].str.contains("Low")) & (df["sv_which_price"] == "(sqft)"), + & (df["sv_which_price"].str.contains("sqft")), ] - labels = [ - "Home flip sale (high)", - "Family sale (high)", - "Non-person sale (high)", - "Anomaly (high)", - "High price swing", - "High price (raw & sqft)", - "High price (raw)", - "High price (sqft)", - "Home flip sale (low)", - "Family sale (low)", - "Non-person sale (low)", - "Anomaly (low)", - "Low price swing", - "Low price (raw & sqft)", - "Low price (raw)", - "Low price (sqft)", + # Define labels for price-based reasons + price_labels = [ + "sv_ind_price_high_price", + "sv_ind_price_low_price", + "sv_ind_price_high_price_sqft", + "sv_ind_price_low_price_sqft", ] - df["sv_outlier_type"] = np.select(conditions, labels, default="Not outlier") + combined_conditions = price_conditions + char_conditions + combined_labels = price_labels + char_labels - return df - - -def outlier_flag(df: pd.DataFrame) -> pd.DataFrame: - """ - Creates a flag that shows whether the record is an - outlier (a special flag) according to our outlier taxonomy. - Inputs: - df (pd.DataFrame): dataframe to create outlier flag - Outputs: - df (pd.DataFrame): dataframe with 'is_outlier' column - """ - - df["sv_is_outlier"] = np.select( - [(df["sv_outlier_type"] == "Not outlier")], [0], default=1 - ) + # Create indicator columns for each flag type + for label, condition in zip(combined_labels, combined_conditions): + df[label] = condition.astype(int) return df diff --git a/glue/sales_val_flagging.py b/glue/sales_val_flagging.py index 6deb4d4a..b1ab9527 100644 --- a/glue/sales_val_flagging.py +++ b/glue/sales_val_flagging.py @@ -88,7 +88,7 @@ def ptax_adjustment(df, groups, ptax_sd, condos: bool): This function manually applies a ptax adjustment, keeping only ptax flags that are outside of a certain standard deviation range in terms of raw price or price per sqft. It creates the - new column and preserves the old ptax column + new column and preserves the old ptax column. Inputs: df: dataframe after flagging has been done @@ -102,45 +102,49 @@ def ptax_adjustment(df, groups, ptax_sd, condos: bool): group_string = "_".join(groups) if not condos: - df["ptax_flag_w_deviation"] = df["ptax_flag_original"] & ( + df["sv_ind_ptax_flag_w_high_price"] = df["ptax_flag_original"] & ( (df[f"sv_price_deviation_{group_string}"] >= ptax_sd[1]) - | (df[f"sv_price_deviation_{group_string}"] <= -ptax_sd[0]) - | (df[f"sv_price_per_sqft_deviation_{group_string}"] >= ptax_sd[1]) - | (df[f"sv_price_per_sqft_deviation_{group_string}"] <= -ptax_sd[0]) ) - # Determine the ptax direction - conditions = [ - (df[f"sv_price_deviation_{group_string}"] >= ptax_sd[1]) - | (df[f"sv_price_per_sqft_deviation_{group_string}"] >= ptax_sd[1]), - (df[f"sv_price_deviation_{group_string}"] <= -ptax_sd[0]) - | (df[f"sv_price_per_sqft_deviation_{group_string}"] <= -ptax_sd[0]), - ] + df["sv_ind_ptax_flag_w_high_price_sqft"] = df["ptax_flag_original"] & ( + (df[f"sv_price_per_sqft_deviation_{group_string}"] >= ptax_sd[1]) + ) + + df["sv_ind_ptax_flag_w_low_price"] = df["ptax_flag_original"] & ( + (df[f"sv_price_per_sqft_deviation_{group_string}"] <= -ptax_sd[0]) + ) + + df["sv_ind_ptax_flag_w_low_price_sqft"] = df["ptax_flag_original"] & ( + (df[f"sv_price_per_sqft_deviation_{group_string}"] <= -ptax_sd[0]) + ) + else: - df["ptax_flag_w_deviation"] = df["ptax_flag_original"] & ( + df["sv_ind_ptax_flag_w_high_price"] = df["ptax_flag_original"] & ( (df[f"sv_price_deviation_{group_string}"] >= ptax_sd[1]) - | (df[f"sv_price_deviation_{group_string}"] <= -ptax_sd[0]) ) - # Determine the ptax direction - conditions = [ - (df[f"sv_price_deviation_{group_string}"] >= ptax_sd[1]), - (df[f"sv_price_deviation_{group_string}"] <= -ptax_sd[0]), - ] + df["sv_ind_ptax_flag_w_low_price"] = df["ptax_flag_original"] & ( + (df[f"sv_price_deviation_{group_string}"] <= -ptax_sd[0]) + ) - directions = ["High", "Low"] - df["ptax_direction"] = np.select(conditions, directions, default=np.nan) + df["sv_ind_ptax_flag"] = df["ptax_flag_original"].astype(int) return df -def group_size_adjustment(df, stat_groups: list, min_threshold, condos: bool): +def classify_outliers(df, stat_groups: list, min_threshold): """ - Within the groups of sales we are looking at to flag outliers, some - are very small, with a large portion of groups with even 1 total sale. + This function does the following: + + 1. We use all of the indicator columns created by outlier_type() in the + Mansueto flagging script to populate our sv_outlier_reason1, sv_outlier_reason2, + and sv_outlier_reason3 fields. We populate them first with ptax, then price, then char + reasons. - This function manually sets all sales to 'Not Outlier' if they belong - to a group that is under our 'min_threshold' argument. + 2. Implement our group threshold requirement. In the statistical flagging process, if + the group a sale belongs too is below N=30 then we want to manually set these flags to + non-outlier status, even if they were flagged in the mansueto script. This requirement + is bypasses for ptax outliers - we don't care about group threshold in this case. Inputs: df: The data right after we perform the flagging script (go()), when the exploded @@ -152,30 +156,117 @@ def group_size_adjustment(df, stat_groups: list, min_threshold, condos: bool): df: dataframe with newly manually adjusted outlier values. """ + group_counts = df.groupby(stat_groups).size().reset_index(name="count") filtered_groups = group_counts[group_counts["count"] <= min_threshold] # Merge df_flagged with filtered_groups on the columns to get the matching rows - merged_df = pd.merge( + df = pd.merge( df, filtered_groups[stat_groups], on=stat_groups, how="left", indicator=True ) - # Modify the .loc condition to include checking for sv_outlier_type values - condition = (merged_df["_merge"] == "both") & (merged_df["sv_is_outlier"] == 1) + # Assign sv_outlier_reasons + for idx in range(1, 4): + df[f"sv_outlier_reason{idx}"] = np.nan + + outlier_type_crosswalk = { + "sv_ind_price_high_price": "High price", + "sv_ind_ptax_flag_w_high_price": "High price", + "sv_ind_price_low_price": "Low price", + "sv_ind_ptax_flag_w_low_price": "Low price", + "sv_ind_price_high_price_sqft": "High price per square foot", + "sv_ind_ptax_flag_w_high_price_sqft": "High price per square foot", + "sv_ind_price_low_price_sqft": "Low price per square foot", + "sv_ind_ptax_flag_w_low_price_sqft": "Low price per square foot", + "sv_ind_ptax_flag": "PTAX-203 Exclusion", + "sv_ind_char_short_term_owner": "Short-term owner", + "sv_ind_char_family_sale": "Family Sale", + "sv_ind_char_non_person_sale": "Non-person sale", + "sv_ind_char_statistical_anomaly": "Statistical Anomaly", + "sv_ind_char_price_swing_homeflip": "Price swing / Home flip", + } + + """ + During our statistical flagging process, we automatically discard + a sale's eligibility for outlier status if the number of sales in + the statistical grouping is below a certain threshold. The list - + `group_thresh_price_fix` along with the ['_merge'] column will allow + us to exclude these sales for the sv_is_outlier status. + + Since the `sv_is_outlier` column requires a price value, we simply + do not assign these price outlier flags if the group number is below a certain + threshold + + Note: This doesn't apply for sales that also have a ptax outlier status. + In this case, we still assign the price outlier status. + """ + group_thresh_price_fix = [ + "sv_ind_price_high_price", + "sv_ind_price_low_price", + "sv_ind_price_high_price_sqft", + "sv_ind_price_low_price_sqft", + ] - # Using .loc[] to set the desired values for rows meeting the condition - merged_df.loc[condition, "sv_outlier_type"] = "Not outlier" - merged_df.loc[condition, "sv_is_outlier"] = 0 + def fill_outlier_reasons(row): + reasons_added = set() # Set to track reasons already added + + for reason_ind_col in outlier_type_crosswalk: + current_reason = outlier_type_crosswalk[reason_ind_col] + # Add a check to ensure that only specific reasons are added if _merge is not 'both' + if ( + reason_ind_col in row + and row[reason_ind_col] + and current_reason + not in reasons_added # Check if the reason is already added + # Apply group threshold logic: `row["_merge"]` will be `both` when the group threshold + # is not met, but only price indicators (`group_thresh_price_fix`) should use this threshold, + # since ptax indicators don't currently utilize group threshold logic + and not ( + row["_merge"] == "both" and reason_ind_col in group_thresh_price_fix + ) + ): + row[f"sv_outlier_reason{len(reasons_added) + 1}"] = current_reason + reasons_added.add(current_reason) # Add current reason to the set + if len(reasons_added) >= 3: + break + + return row + + df = df.apply(fill_outlier_reasons, axis=1) # Drop the _merge column - df_flagged_updated = merged_df.drop(columns=["_merge"]) + df = df.drop(columns=["_merge"]) + + # Assign outlier status + values_to_check = { + "High price", + "Low price", + "High price per square foot", + "Low price per square foot", + } + + df["sv_is_outlier"] = np.where( + df[[f"sv_outlier_reason{idx}" for idx in range(1, 4)]] + .isin(values_to_check) + .any(axis=1), + True, + False, + ) # Add group column to eventually write to athena sale.flag table. Picked up in finish_flags() - df_flagged_updated["group"] = df_flagged_updated.apply( + df["group"] = df.apply( lambda row: "_".join([str(row[col]) for col in stat_groups]), axis=1 ) - return df_flagged_updated + df = df.assign( + # PTAX-203 binary + sv_is_ptax_outlier=lambda df: (df["sv_is_outlier"] == True) + & (df["sv_ind_ptax_flag"] == 1), + sv_is_heuristic_outlier=lambda df: (~df["sv_ind_ptax_flag"] == 1) + & (df["sv_is_outlier"] == True), + ) + + return df def finish_flags(df, start_date, manual_update, sales_to_write_filter): @@ -208,45 +299,17 @@ def finish_flags(df, start_date, manual_update, sales_to_write_filter): df[sales_to_write_filter["column"]].isin(sales_to_write_filter["values"]) ] - # Utilize PTAX-203, complete binary columns - df = ( - df.rename(columns={"sv_is_outlier": "sv_is_autoval_outlier"}) - .assign( - sv_is_autoval_outlier=lambda df: df["sv_is_autoval_outlier"] == "Outlier", - sv_is_outlier=lambda df: df["sv_is_autoval_outlier"] - | df["ptax_flag_w_deviation"], - # Incorporate PTAX in sv_outlier_type - sv_outlier_type=lambda df: np.select( - [ - (df["ptax_flag_w_deviation"]) & (df["ptax_direction"] == "High"), - (df["ptax_flag_w_deviation"]) & (df["ptax_direction"] == "Low"), - ], - ["PTAX-203 flag (High)", "PTAX-203 flag (Low)"], - default=df["sv_outlier_type"], - ), - ) - .assign( - # Change sv_is_outlier to binary - sv_is_outlier=lambda df: df["sv_outlier_type"] != "Not outlier", - # PTAX-203 binary - sv_is_ptax_outlier=lambda df: df["sv_outlier_type"] == "PTAX-203 flag", - # Heuristics flagging binary column - sv_is_heuristic_outlier=lambda df: ( - df["sv_outlier_type"] != "PTAX-203 flag" - ) - & (df["sv_is_outlier"] == 1), - ) - ) - cols_to_write = [ "meta_sale_document_num", "meta_sale_price_original", "rolling_window", "sv_is_outlier", + "sv_outlier_reason1", + "sv_outlier_reason2", + "sv_outlier_reason3", "sv_is_ptax_outlier", "ptax_flag_original", "sv_is_heuristic_outlier", - "sv_outlier_type", "group", ] diff --git a/manual_flagging/flagging.py b/manual_flagging/flagging.py index 2b471d4d..7e8db8bf 100755 --- a/manual_flagging/flagging.py +++ b/manual_flagging/flagging.py @@ -315,18 +315,20 @@ def create_bins_and_labels(input_list): # Make a copy of the data frame to edit print(f"\n Enacting group threshold and creating ptax data for {df_name}") df_copy = df_info["df"].copy() - df_copy = flg.group_size_adjustment( - df=df_copy, - stat_groups=df_info["columns"], - min_threshold=inputs["min_groups_threshold"], - condos=df_info["condos_boolean"], - ) + df_copy = flg.ptax_adjustment( df=df_copy, groups=df_info["columns"], ptax_sd=inputs["ptax_sd"], condos=df_info["condos_boolean"], ) + + df_copy = flg.classify_outliers( + df=df_copy, + stat_groups=df_info["columns"], + min_threshold=inputs["min_groups_threshold"], + ) + """ Modify the 'group' column by appending '-market_value', this is done to make sure that a two different groups with the same run_id won't diff --git a/manual_flagging/requirements.txt b/manual_flagging/requirements.txt index b2587ff3..af7d945f 100644 --- a/manual_flagging/requirements.txt +++ b/manual_flagging/requirements.txt @@ -1,4 +1,4 @@ -awswrangler==2.20.1 +awswrangler==3.7.3 boto3==1.28.26 botocore==1.31.26 certifi==2023.7.22