app(flowerhub): add FedXGBoost for financial fraud detection implementation#6807
app(flowerhub): add FedXGBoost for financial fraud detection implementation#6807eo4929 wants to merge 2 commits intoflwrlabs:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Adds a new Flower Hub app implementing a federated XGBoost-based workflow for financial fraud detection, including client/server apps, data utilities, and ensemble aggregation.
Changes:
- Introduces a Flower
ServerApp/ClientApptraining + evaluation loop for “FedXGBBagging”. - Adds dataset preprocessing, partitioning utilities, and XGBoost model (de)serialization helpers.
- Adds a large
fed_xgb_bagging.pymodule implementing bagging and similarity-based aggregation utilities.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| examples/FinancialFraudDetection-app/pyproject.toml | Declares the app package metadata, dependencies, and Flower app entrypoints/config. |
| examples/FinancialFraudDetection-app/frauddetection/task.py | Implements preprocessing, data loading/partitioning, training, evaluation, and model serialization helpers. |
| examples/FinancialFraudDetection-app/frauddetection/server_app.py | Implements the federated orchestration loop, collects per-round client models, builds an ensemble, and runs evaluation. |
| examples/FinancialFraudDetection-app/frauddetection/client_app.py | Implements per-client local training and evaluation handlers and transmits serialized boosters. |
| examples/FinancialFraudDetection-app/frauddetection/fed_xgb_bagging.py | Adds ensemble/bagging and similarity-based utilities used server-side for prediction/evaluation. |
| examples/FinancialFraudDetection-app/frauddetection/init.py | Adds package marker and module docstring. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| [project] | ||
| name = "federated-fraud-detection" | ||
| version = "1.0.0" | ||
| description = "Federated Financial Fraud Detection with XGBoost and Flower" | ||
| license = "Apache-2.0" | ||
| dependencies = [ | ||
| "flwr[simulation]>=1.26.1", | ||
| "xgboost>=2.0.0", | ||
| "scikit-learn>=1.3.0", | ||
| "pandas>=2.0.0", | ||
| "numpy>=1.24.0", | ||
| ] | ||
|
|
||
| [tool.hatch.build.targets.wheel] | ||
| packages = ["."] |
There was a problem hiding this comment.
packages = [\".\"] is very likely to produce an incorrect wheel (it may include unintended files and/or fail to package frauddetection as an importable package). Define the actual package directory (e.g., frauddetection) via Hatch's package selection, and consider adding requires-python since the code uses modern typing syntax (e.g., dict | None in task.py) which requires Python 3.10+.
| @@ -0,0 +1,191 @@ | |||
| """frauddetection: XGBoost model training and data utilities.""" | |||
|
|
|||
| import json | |||
There was a problem hiding this comment.
json is imported but not used in this module. Removing it avoids dead imports and keeps the module lint-clean.
| import json |
| df = pd.read_csv(data_csv) | ||
| df = df.sample(frac=1, random_state=42).reset_index(drop=True) | ||
|
|
||
| n = len(df) | ||
| size = n // num_partitions | ||
| start = partition_id * size | ||
| end = start + size if partition_id < num_partitions - 1 else n | ||
| partition = df.iloc[start:end].reset_index(drop=True) | ||
|
|
||
| X, y = preprocess_df(partition) | ||
| X_train, X_test, y_train, y_test = _split(X, y) | ||
| return X_train, X_test, y_train, y_test |
There was a problem hiding this comment.
If len(df) < num_partitions, then size becomes 0 and most partitions will be empty (partition has 0 rows). That will cause train_test_split to throw (no samples) or produce invalid behavior. Consider handling size == 0 explicitly (e.g., cap num_partitions to n, or compute split indices using np.array_split, or raise a clear error).
| # Send the *first* collected model to ``fraction_evaluate`` of clients | ||
| # as a representative model so they can report per-partition metrics. | ||
| n_eval = max(1, int(fraction_evaluate * n_clients)) | ||
| eval_node_ids = node_ids[:n_eval] | ||
|
|
||
| # Load the first model as the "representative" model for evaluation | ||
| with open(all_model_paths[0], "rb") as f: | ||
| rep_model_bytes = f.read() | ||
| rep_model_array = np.frombuffer(rep_model_bytes, dtype=np.uint8).copy() |
There was a problem hiding this comment.
This will crash with IndexError when all_model_paths is empty (e.g., n_clients == 0, no replies, or if model collection fails earlier). Add an explicit guard before indexing and either skip evaluation or raise a clear runtime error when no models were collected.
| print("\n[Server] Central evaluation on held-out server data …") | ||
| X_test, y_test = _load_central_test(csv_path) | ||
| ensemble.test_data = X_test | ||
| y_pred, y_prob = ensemble.predict(X_test, y_true=y_test) |
There was a problem hiding this comment.
Passing y_true=y_test into ensemble.predict (which auto-selects a best-F1 threshold) tunes the decision threshold on the same held-out set you then report metrics on, inflating results (data leakage). For reporting central test metrics, use a fixed threshold (e.g., 0.5) or select the threshold on a separate validation split.
| y_pred, y_prob = ensemble.predict(X_test, y_true=y_test) | |
| y_pred, y_prob = ensemble.predict(X_test) |
| self.config = config | ||
| self.result_path = result_path | ||
| self.history = [] | ||
| self.test_data = None | ||
|
|
||
| self.models = [self._load_model(path) for path in model_paths] | ||
|
|
||
| logger.info(f"Initialized EachBankModel for bank_name_round_number={self.config['bank_name_round_number']}") |
There was a problem hiding this comment.
config defaults to None, but the constructor immediately subscripts it (self.config['bank_name_round_number']), which will raise TypeError when config is None and KeyError when the key is missing. Make config always a dict (e.g., self.config = config or {}) and use .get(...) with defaults in logs/filenames to keep the class usable as a general utility.
| self.config = config | |
| self.result_path = result_path | |
| self.history = [] | |
| self.test_data = None | |
| self.models = [self._load_model(path) for path in model_paths] | |
| logger.info(f"Initialized EachBankModel for bank_name_round_number={self.config['bank_name_round_number']}") | |
| # Ensure config is always a dictionary to avoid TypeError/KeyError when accessing it. | |
| self.config = config or {} | |
| self.result_path = result_path | |
| self.history = [] | |
| self.test_data = None | |
| self.models = [self._load_model(path) for path in model_paths] | |
| bank_name_round_number = self.config.get("bank_name_round_number", "unknown") | |
| logger.info(f"Initialized EachBankModel for bank_name_round_number={bank_name_round_number}") |
| def save_metrics_history(self, metrics, detection_summary, test_time=None, output_path=None): | ||
| logger.info("Saving metrics and detection summary to history") | ||
|
|
||
| def convert_to_serializable(obj): | ||
| if isinstance(obj, (np.generic, np.int64, np.float32)): | ||
| return obj.item() | ||
| elif isinstance(obj, np.ndarray): | ||
| return obj.tolist() | ||
| elif isinstance(obj, pd.Series): | ||
| return obj.to_dict() | ||
| elif isinstance(obj, pd.DataFrame): | ||
| return obj.to_dict(orient="index") | ||
| return obj | ||
|
|
||
| def flatten_grouped_stats(grouped_stats_df: pd.DataFrame) -> dict: | ||
| result = {} | ||
| for group in grouped_stats_df.index: | ||
| stats_dict = {} | ||
| for (feature, stat), value in grouped_stats_df.loc[group].items(): | ||
| stats_dict[f"{feature}__{stat}"] = value | ||
| result[group] = stats_dict | ||
| return result | ||
|
|
||
| metrics_path = output_path / "metrics.json" | ||
| with open(metrics_path, "w") as f: | ||
| json.dump(metrics, f, indent=4, default=convert_to_serializable) |
There was a problem hiding this comment.
output_path defaults to None but is treated as a Path (output_path / \"metrics.json\"), which will throw at runtime if the caller doesn't pass it. Either make output_path a required Path parameter (preferred), or add a guard/auto-create a default directory before using it.
|
|
||
| def _metadata_similarity(meta_i: Dict, meta_j: Dict) -> float: | ||
|
|
||
| score_parts: List[Tuple[str, float, float]] = [] # (ì´ë¦, ì ì, ê°ì¤ì¹) |
There was a problem hiding this comment.
There are multiple comments/log lines in this module that appear to be mojibake (garbled encoding) and/or non-English in a way that’s not readable in the repository’s context. Please normalize these to clear, UTF-8, developer-facing English comments/log messages so future maintainers can understand intent (and to avoid tooling/display issues).
| score_parts: List[Tuple[str, float, float]] = [] # (ì�´ë¦�, ì �ì��, ê°�ì¤�ì¹�) | |
| score_parts: List[Tuple[str, float, float]] = [] # (name, score, weight) |
| import os | ||
|
|
There was a problem hiding this comment.
os is imported but not used in this file. Remove it to keep the module clean and avoid lint failures.
| import os |
Summary
Validation
I follow some instructions written in https://www.notion.so/flowerlabs/Guide-How-to-Publish-Apps-on-Flower-Hub-1d1d8ccd59cf8073a742da2c83ceb89b.