diff --git a/src/openreversefeed/adapters/base.py b/src/openreversefeed/adapters/base.py index f0901c4..fb9776e 100644 --- a/src/openreversefeed/adapters/base.py +++ b/src/openreversefeed/adapters/base.py @@ -35,6 +35,14 @@ class FeedAdapter(ABC): field_map: dict[str, str] type_flip_map: dict[str, str] + # Transaction types the adapter actively refuses. Rows with a + # transaction_type in this set are dropped by the cleaner before the rest + # of the pipeline runs. Used for CAMS TICOB / TOCOB — the source system + # rejects these at validation time because the COB (close of business) + # suffix is not a transferable order and will produce bogus positions if + # classified as a TI/TO. + rejected_types: set[str] = set() + @abstractmethod def parse(self, file_path: str | Path) -> pd.DataFrame: """Read the file from disk, return a raw DataFrame (source column names).""" diff --git a/src/openreversefeed/adapters/cams.py b/src/openreversefeed/adapters/cams.py index cdb4dec..dbc269f 100644 --- a/src/openreversefeed/adapters/cams.py +++ b/src/openreversefeed/adapters/cams.py @@ -25,6 +25,20 @@ "BROKCODE": "broker_code", "PANNO": "pan", "INVNAME": "investor_name", + # CAMS ships an "REINVEST_F" column that encodes whether the dividend + # option on the scheme is a reinvestment or a payout. We capture the + # raw flag so validators / downstream can check it against the scheme + # master's plan_type. + "REINVEST_F": "dividend_option_flag", +} + +# Mapping from the raw CAMS REINVEST_F flag to the canonical plan_type +# vocabulary the scheme master stores. 'Y' means reinvest, 'N' means +# payout. Anything else (empty, 'P' for pure growth, nulls) → None, +# which means "the feed doesn't tell us, don't assert anything." +CAMS_REINVEST_F_TO_PLAN_TYPE: dict[str, str] = { + "Y": "idcw_reinvest", + "N": "idcw_payout", } _TYPE_FLIP_MAP: dict[str, str] = { @@ -38,7 +52,7 @@ "DP": "D", } -_BUY_TYPES = {"P", "SI", "TI", "D", "BON"} +_BUY_TYPES = {"P", "SI", "TI", "D", "BON", "NFO"} _SELL_TYPES = {"R", "SO", "TO", "DP"} _NO_EFFECT_TYPES = {"N", "J"} @@ -52,8 +66,19 @@ "D": "dividend", "DP": "dividend_payout", "BON": "bonus", + # NFO (New Fund Offer): initial subscription to a new scheme during the + # offer period. Classified as a purchase in the ledger — the investor is + # acquiring units at the NFO price (usually ₹10) and the outcome is a + # buy position just like a regular purchase. + "NFO": "new_fund_offer", } +# CAMS transaction types we actively refuse. TICOB / TOCOB are the +# "close of business" variants of transfer in/out and the source system +# rejects them at validation time. If you have a registrar that ships real +# COB data you want to process, subclass CamsAdapter and clear this set. +_REJECTED_TYPES = {"TICOB", "TOCOB"} + class _CamsPairStrategy(PairRemovalStrategy): def remove(self, df: pd.DataFrame) -> pd.DataFrame: @@ -87,6 +112,7 @@ class CamsAdapter(FeedAdapter): discriminator_headers: set[str] = set() field_map = _FIELD_MAP type_flip_map = _TYPE_FLIP_MAP + rejected_types = _REJECTED_TYPES def parse(self, file_path: str | Path) -> pd.DataFrame: path = Path(file_path) @@ -113,6 +139,20 @@ def normalize(self, raw: pd.DataFrame) -> pd.DataFrame: else: df["__source_meta"] = [{}] * len(df) + # Translate the CAMS REINVEST_F flag into the canonical plan_type + # vocabulary the scheme master uses. Leaves None where the feed + # is silent so the validator can skip the check cleanly. + if "dividend_option_flag" in df.columns: + df["plan_type_from_feed"] = ( + df["dividend_option_flag"] + .astype(str) + .str.strip() + .str.upper() + .map(CAMS_REINVEST_F_TO_PLAN_TYPE) + ) + else: + df["plan_type_from_feed"] = None + df.insert(0, "registrar_row_index", range(len(df))) return df diff --git a/src/openreversefeed/adapters/kfintech.py b/src/openreversefeed/adapters/kfintech.py index a24afd2..fd1c0a4 100644 --- a/src/openreversefeed/adapters/kfintech.py +++ b/src/openreversefeed/adapters/kfintech.py @@ -28,6 +28,27 @@ "PANNO": "pan", "INV_NAME": "investor_name", "BROK_CODE": "broker_code", + # KFintech DIVOPT encodes the dividend / growth plan for the scheme. + # Legacy files use "DIVIDEND PAYOUT" / "DIVIDEND REINVESTMENT" strings, + # post-SEBI-2021 files use "IDCW PAYOUT" / "IDCW REINVESTMENT". The + # validator reconciles the feed flag against the scheme master. + "DIVOPT": "dividend_option_flag", +} + +# Mapping from the raw KFintech DIVOPT text to the canonical plan_type +# vocabulary. Matches are case-insensitive and whitespace-collapsed so +# "idcw payout" and "IDCW PAYOUT" resolve to the same value. Legacy +# "DIVIDEND ..." wording is mapped identically — SEBI renamed these to +# IDCW in 2021 but KFintech still ships old strings on historical files. +KFINTECH_DIVOPT_TO_PLAN_TYPE: dict[str, str] = { + "IDCW PAYOUT": "idcw_payout", + "IDCW REINVESTMENT": "idcw_reinvest", + "IDCW REINVEST": "idcw_reinvest", + "DIVIDEND PAYOUT": "idcw_payout", + "DIVIDEND REINVESTMENT": "idcw_reinvest", + "DIVIDEND REINVEST": "idcw_reinvest", + "GROWTH": "growth", + "GROWTH PAYOUT": "growth", } _FORMAT2_FIELD_MAP = dict(_FORMAT1_FIELD_MAP) @@ -61,6 +82,9 @@ "R": (Action.SELL, "redemption"), "D": (Action.BUY, "dividend"), "DP": (Action.SELL, "dividend_payout"), + # NFO (New Fund Offer): initial subscription during the offer period. + # Classified as a buy just like a regular purchase. + "NFO": (Action.BUY, "new_fund_offer"), } _FLAG_TO_ACTION = { @@ -97,6 +121,22 @@ def _normalize_with_map(self, raw: pd.DataFrame, fmap: dict[str, str]) -> pd.Dat df["__source_meta"] = raw[unknown_cols].to_dict(orient="records") else: df["__source_meta"] = [{}] * len(df) + + # Translate the KFintech DIVOPT text into the canonical plan_type + # vocabulary. Legacy files use "DIVIDEND PAYOUT" wording, post-SEBI + # (2021) files use "IDCW PAYOUT" — both map to the same value. + if "dividend_option_flag" in df.columns: + df["plan_type_from_feed"] = ( + df["dividend_option_flag"] + .astype(str) + .str.strip() + .str.upper() + .str.replace(r"\s+", " ", regex=True) + .map(KFINTECH_DIVOPT_TO_PLAN_TYPE) + ) + else: + df["plan_type_from_feed"] = None + df.insert(0, "registrar_row_index", range(len(df))) return df diff --git a/src/openreversefeed/core/account_resolver.py b/src/openreversefeed/core/account_resolver.py index a4cc62f..2fcdd1c 100644 --- a/src/openreversefeed/core/account_resolver.py +++ b/src/openreversefeed/core/account_resolver.py @@ -17,6 +17,20 @@ class PanNotFoundError(Exception): pass +def _canonicalize_name(value: Any) -> str: + """Normalize an investor name for equality comparison. + + Collapses whitespace runs, upper-cases, strips punctuation-like + leading/trailing spaces. Deliberately NOT a fuzzy match — two + names only tie if they agree after canonicalization. This catches + the common "RAJESH KUMAR" vs "Rajesh Kumar" (double space) vs + " rajesh kumar" case without introducing false positives. + """ + if value is None: + return "" + return " ".join(str(value).upper().split()) + + def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]: pan = str(row["pan"]).strip().upper() by_ownership = cache.accounts_by_pan.get(pan) @@ -32,6 +46,21 @@ def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]: if ot and ot in by_ownership: return by_ownership[ot] + # Investor-name fallback for family PANs. When multiple accounts share + # the PAN and the row doesn't declare an ownership_type we can match + # against, try a canonicalised name equality against the stored + # account names. The adapters already emit an investor_name column + # (CAMS INVNAME, KFintech INV_NAME). + feed_name = _canonicalize_name(row.get("investor_name")) + if feed_name: + matches = [ + acc + for acc in by_ownership.values() + if _canonicalize_name(acc.get("name")) == feed_name + ] + if len(matches) == 1: + return matches[0] + # Prefer 'individual' default if "individual" in by_ownership: return by_ownership["individual"] diff --git a/src/openreversefeed/core/cleaner.py b/src/openreversefeed/core/cleaner.py index 81c6597..46ab970 100644 --- a/src/openreversefeed/core/cleaner.py +++ b/src/openreversefeed/core/cleaner.py @@ -20,6 +20,15 @@ def run(self, df: pd.DataFrame, adapter: FeedAdapter) -> pd.DataFrame: step = df.copy() + # Drop rows whose transaction_type is in the adapter's rejected set + # (e.g. CAMS TICOB / TOCOB). These do not represent real orders and + # silently classifying them as transfer in/out would corrupt positions. + rejected = getattr(adapter, "rejected_types", set()) or set() + if rejected and "transaction_type" in step.columns: + step = step[~step["transaction_type"].isin(rejected)].reset_index(drop=True) + if step.empty: + return step + # Step 4b — pair removal (registrar-specific) if adapter.registrar is Registrar.KFINTECH: step = remove_kfintech_pairs(step) diff --git a/src/openreversefeed/core/validator.py b/src/openreversefeed/core/validator.py index 9fd2174..8f59c04 100644 --- a/src/openreversefeed/core/validator.py +++ b/src/openreversefeed/core/validator.py @@ -39,7 +39,23 @@ def validate_row(row: dict[str, Any], cache: PrewarmCache) -> None: if pan not in cache.accounts_by_pan: raise ValidationError(CorrectionType.PAN_NOT_FOUND, f"PAN not in accounts: {pan}") - if row["scheme_code"] not in cache.schemes_by_code: + scheme = cache.schemes_by_code.get(row["scheme_code"]) + if scheme is None: raise ValidationError( CorrectionType.SCHEME_NOT_FOUND, f"scheme not found: {row['scheme_code']}" ) + + # Dividend option mismatch: if the feed explicitly declares a + # plan_type (CAMS REINVEST_F, KFintech DIVOPT) and it disagrees with + # the scheme master, route the row to the correction queue instead of + # silently accepting it. This catches the common post-SEBI-2021 case + # where historical feed files still use "DIVIDEND" wording while the + # master has been migrated to "IDCW". + feed_plan = row.get("plan_type_from_feed") + scheme_plan = scheme.get("plan_type") + if feed_plan and scheme_plan and feed_plan != scheme_plan: + raise ValidationError( + CorrectionType.OTHER, + f"plan_type mismatch: feed says '{feed_plan}', " + f"scheme master says '{scheme_plan}' for {row['scheme_code']}", + ) diff --git a/tests/unit/test_account_resolver.py b/tests/unit/test_account_resolver.py index 4122354..14c07f9 100644 --- a/tests/unit/test_account_resolver.py +++ b/tests/unit/test_account_resolver.py @@ -47,3 +47,54 @@ def test_pan_not_found(): cache = PrewarmCache(accounts_by_pan={}) with pytest.raises(PanNotFoundError): resolve_account({"pan": "P1"}, cache) + + +def test_family_pan_falls_back_to_investor_name_match(): + """When the ownership_type can't tie-break a family PAN, a canonical + name match picks the right account. Catches the common + 'Rajesh Kumar' vs 'RAJESH KUMAR' whitespace/case drift between the + feed file and the account master.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "huf": {"id": "u1", "name": "Suresh Kumar HUF"}, + "joint": {"id": "u2", "name": "Rajesh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": " rajesh kumar "} + acc = resolve_account(row, cache) + assert acc["id"] == "u2" + + +def test_family_pan_name_fallback_needs_unique_match(): + """If two stored accounts both match the canonical name, fall through + to the individual default / ambiguous error rather than picking one.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "huf": {"id": "u1", "name": "Rajesh Kumar"}, + "joint": {"id": "u2", "name": "Rajesh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": "RAJESH KUMAR"} + with pytest.raises(AmbiguousPanError) as exc_info: + resolve_account(row, cache) + assert set(exc_info.value.candidate_ids) == {"u1", "u2"} + + +def test_family_pan_name_fallback_ignored_without_investor_name(): + """If the feed row doesn't ship an investor name at all, the fallback + must not activate — we rely on the existing individual default.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "individual": {"id": "u1", "name": "Rajesh Kumar"}, + "joint": {"id": "u2", "name": "Suresh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": None} + acc = resolve_account(row, cache) + assert acc["id"] == "u1" diff --git a/tests/unit/test_cams_adapter.py b/tests/unit/test_cams_adapter.py index c9542d1..2d8155e 100644 --- a/tests/unit/test_cams_adapter.py +++ b/tests/unit/test_cams_adapter.py @@ -97,6 +97,20 @@ def test_cams_classify_non_financial_no_effect(): assert action is Action.NO_EFFECT +def test_cams_classify_nfo_is_buy_new_fund_offer(): + adapter = CamsAdapter() + action, tag, is_rev = adapter.classify_row( + {"transaction_type": "NFO", "transaction_mode": "N"} + ) + assert action is Action.BUY + assert tag == "new_fund_offer" + assert is_rev is False + + +def test_cams_rejected_types_contains_ticob_and_tocob(): + assert CamsAdapter.rejected_types == {"TICOB", "TOCOB"} + + def test_cams_composite_key_deterministic(): adapter = CamsAdapter() row = { diff --git a/tests/unit/test_cleaner.py b/tests/unit/test_cleaner.py index c01ddbf..1e95153 100644 --- a/tests/unit/test_cleaner.py +++ b/tests/unit/test_cleaner.py @@ -35,6 +35,37 @@ def test_cleaner_runs_all_steps_on_minimal_cams_df(): assert "action_tag" in out.columns +def test_cleaner_drops_cams_ticob_and_tocob_rows(): + """CAMS close-of-business variants (TICOB/TOCOB) are unsupported by the + reference pipeline — the source system rejects them at validation time + and the cleaner must drop them before classification, otherwise the + prefix matcher would silently treat them as ordinary TI/TO transfers.""" + cleaner = Cleaner() + adapter = CamsAdapter() + rows = [] + for i, ttype in enumerate(["P", "TICOB", "TOCOB", "P"]): + rows.append( + { + "transaction_id": f"ID{i}", + "folio_number": "F1", + "product_code": "P1", + "scheme_code": "S1", + "units": 100.0, + "amount": 10000.0, + "transaction_date": date(2025, 1, 1), + "transaction_mode": "N", + "transaction_type": ttype, + "transaction_number": f"T{i}", + "registrar_row_index": i, + "__source_meta": [{}], + } + ) + df = pd.DataFrame(rows) + out = cleaner.run(df, adapter) + assert len(out) == 2 + assert set(out["transaction_type"]) == {"P"} + + def test_cleaner_kfintech_preserves_group_columns_through_conflict_resolution(): """Regression: the conflict resolver must not strip transaction_number or folio_number from the cleaned DataFrame, otherwise the composite-key diff --git a/tests/unit/test_validator.py b/tests/unit/test_validator.py index 53d251f..e63c1c6 100644 --- a/tests/unit/test_validator.py +++ b/tests/unit/test_validator.py @@ -62,3 +62,42 @@ def test_missing_folio_number(): validate_row(row, _CACHE) assert exc_info.value.correction_type is CorrectionType.OTHER assert "folio_number" in exc_info.value.message + + +def test_plan_type_mismatch_between_feed_and_scheme_raises(): + """When the feed's translated plan_type disagrees with the scheme + master, the row must not silently pass. This is the LinkedIn-comment + case: historical files with 'DIVIDEND PAYOUT' wording against a + master that's been migrated to 'idcw_reinvest'.""" + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_reinvest"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = "idcw_payout" + with pytest.raises(ValidationError) as exc_info: + validate_row(row, cache) + assert exc_info.value.correction_type is CorrectionType.OTHER + assert "plan_type mismatch" in exc_info.value.message + + +def test_plan_type_matching_feed_and_scheme_passes(): + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = "idcw_payout" + validate_row(row, cache) # should not raise + + +def test_plan_type_silent_feed_does_not_block(): + """When the feed doesn't declare a plan_type (empty / None), we must + not block the row — the scheme master remains the source of truth.""" + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = None + validate_row(row, cache) # should not raise