From 6e1807d17c028e1b5026ba869d7fbf40f67cce3b Mon Sep 17 00:00:00 2001 From: sandeep raju <161712005+sandeeprjs92@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:24:43 +0530 Subject: [PATCH 1/3] add nfo classification and reject cams ticob/tocob MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Acting on a LinkedIn comment about edge-case transaction types. NFO (New Fund Offer) is a standard BSE STAR MF purchase type used for initial subscriptions during a fund offer period. Classify it as (BUY, new_fund_offer) in both CAMS and KFintech adapters. Add NFO to the CAMS _BUY_TYPES set. CAMS TICOB and TOCOB (transfer in/out close-of-business variants) are not real orders — the source pipeline rejects them at validation time. Port that rejection by adding a per-adapter rejected_types set and filtering those rows out in Cleaner.run before anything else runs. Default is an empty set so other adapters are unaffected. I also researched FC and standalone COB via web search and the BSE STAR MF file structure spec. Neither is documented in public CAMS or BSE references and neither appears in the upstream source pipeline. Leaving them unhandled until a commenter or maintainer can clarify the specific registrar semantics — guessing at buy/sell for a code I cannot verify would be worse than a visible no-op. --- src/openreversefeed/adapters/base.py | 8 ++++++ src/openreversefeed/adapters/cams.py | 14 ++++++++++- src/openreversefeed/adapters/kfintech.py | 3 +++ src/openreversefeed/core/cleaner.py | 9 +++++++ tests/unit/test_cams_adapter.py | 14 +++++++++++ tests/unit/test_cleaner.py | 31 ++++++++++++++++++++++++ 6 files changed, 78 insertions(+), 1 deletion(-) diff --git a/src/openreversefeed/adapters/base.py b/src/openreversefeed/adapters/base.py index f0901c4..fb9776e 100644 --- a/src/openreversefeed/adapters/base.py +++ b/src/openreversefeed/adapters/base.py @@ -35,6 +35,14 @@ class FeedAdapter(ABC): field_map: dict[str, str] type_flip_map: dict[str, str] + # Transaction types the adapter actively refuses. Rows with a + # transaction_type in this set are dropped by the cleaner before the rest + # of the pipeline runs. Used for CAMS TICOB / TOCOB — the source system + # rejects these at validation time because the COB (close of business) + # suffix is not a transferable order and will produce bogus positions if + # classified as a TI/TO. + rejected_types: set[str] = set() + @abstractmethod def parse(self, file_path: str | Path) -> pd.DataFrame: """Read the file from disk, return a raw DataFrame (source column names).""" diff --git a/src/openreversefeed/adapters/cams.py b/src/openreversefeed/adapters/cams.py index cdb4dec..001fa67 100644 --- a/src/openreversefeed/adapters/cams.py +++ b/src/openreversefeed/adapters/cams.py @@ -38,7 +38,7 @@ "DP": "D", } -_BUY_TYPES = {"P", "SI", "TI", "D", "BON"} +_BUY_TYPES = {"P", "SI", "TI", "D", "BON", "NFO"} _SELL_TYPES = {"R", "SO", "TO", "DP"} _NO_EFFECT_TYPES = {"N", "J"} @@ -52,8 +52,19 @@ "D": "dividend", "DP": "dividend_payout", "BON": "bonus", + # NFO (New Fund Offer): initial subscription to a new scheme during the + # offer period. Classified as a purchase in the ledger — the investor is + # acquiring units at the NFO price (usually ₹10) and the outcome is a + # buy position just like a regular purchase. + "NFO": "new_fund_offer", } +# CAMS transaction types we actively refuse. TICOB / TOCOB are the +# "close of business" variants of transfer in/out and the source system +# rejects them at validation time. If you have a registrar that ships real +# COB data you want to process, subclass CamsAdapter and clear this set. +_REJECTED_TYPES = {"TICOB", "TOCOB"} + class _CamsPairStrategy(PairRemovalStrategy): def remove(self, df: pd.DataFrame) -> pd.DataFrame: @@ -87,6 +98,7 @@ class CamsAdapter(FeedAdapter): discriminator_headers: set[str] = set() field_map = _FIELD_MAP type_flip_map = _TYPE_FLIP_MAP + rejected_types = _REJECTED_TYPES def parse(self, file_path: str | Path) -> pd.DataFrame: path = Path(file_path) diff --git a/src/openreversefeed/adapters/kfintech.py b/src/openreversefeed/adapters/kfintech.py index a24afd2..d7996a2 100644 --- a/src/openreversefeed/adapters/kfintech.py +++ b/src/openreversefeed/adapters/kfintech.py @@ -61,6 +61,9 @@ "R": (Action.SELL, "redemption"), "D": (Action.BUY, "dividend"), "DP": (Action.SELL, "dividend_payout"), + # NFO (New Fund Offer): initial subscription during the offer period. + # Classified as a buy just like a regular purchase. + "NFO": (Action.BUY, "new_fund_offer"), } _FLAG_TO_ACTION = { diff --git a/src/openreversefeed/core/cleaner.py b/src/openreversefeed/core/cleaner.py index 81c6597..46ab970 100644 --- a/src/openreversefeed/core/cleaner.py +++ b/src/openreversefeed/core/cleaner.py @@ -20,6 +20,15 @@ def run(self, df: pd.DataFrame, adapter: FeedAdapter) -> pd.DataFrame: step = df.copy() + # Drop rows whose transaction_type is in the adapter's rejected set + # (e.g. CAMS TICOB / TOCOB). These do not represent real orders and + # silently classifying them as transfer in/out would corrupt positions. + rejected = getattr(adapter, "rejected_types", set()) or set() + if rejected and "transaction_type" in step.columns: + step = step[~step["transaction_type"].isin(rejected)].reset_index(drop=True) + if step.empty: + return step + # Step 4b — pair removal (registrar-specific) if adapter.registrar is Registrar.KFINTECH: step = remove_kfintech_pairs(step) diff --git a/tests/unit/test_cams_adapter.py b/tests/unit/test_cams_adapter.py index c9542d1..2d8155e 100644 --- a/tests/unit/test_cams_adapter.py +++ b/tests/unit/test_cams_adapter.py @@ -97,6 +97,20 @@ def test_cams_classify_non_financial_no_effect(): assert action is Action.NO_EFFECT +def test_cams_classify_nfo_is_buy_new_fund_offer(): + adapter = CamsAdapter() + action, tag, is_rev = adapter.classify_row( + {"transaction_type": "NFO", "transaction_mode": "N"} + ) + assert action is Action.BUY + assert tag == "new_fund_offer" + assert is_rev is False + + +def test_cams_rejected_types_contains_ticob_and_tocob(): + assert CamsAdapter.rejected_types == {"TICOB", "TOCOB"} + + def test_cams_composite_key_deterministic(): adapter = CamsAdapter() row = { diff --git a/tests/unit/test_cleaner.py b/tests/unit/test_cleaner.py index c01ddbf..1e95153 100644 --- a/tests/unit/test_cleaner.py +++ b/tests/unit/test_cleaner.py @@ -35,6 +35,37 @@ def test_cleaner_runs_all_steps_on_minimal_cams_df(): assert "action_tag" in out.columns +def test_cleaner_drops_cams_ticob_and_tocob_rows(): + """CAMS close-of-business variants (TICOB/TOCOB) are unsupported by the + reference pipeline — the source system rejects them at validation time + and the cleaner must drop them before classification, otherwise the + prefix matcher would silently treat them as ordinary TI/TO transfers.""" + cleaner = Cleaner() + adapter = CamsAdapter() + rows = [] + for i, ttype in enumerate(["P", "TICOB", "TOCOB", "P"]): + rows.append( + { + "transaction_id": f"ID{i}", + "folio_number": "F1", + "product_code": "P1", + "scheme_code": "S1", + "units": 100.0, + "amount": 10000.0, + "transaction_date": date(2025, 1, 1), + "transaction_mode": "N", + "transaction_type": ttype, + "transaction_number": f"T{i}", + "registrar_row_index": i, + "__source_meta": [{}], + } + ) + df = pd.DataFrame(rows) + out = cleaner.run(df, adapter) + assert len(out) == 2 + assert set(out["transaction_type"]) == {"P"} + + def test_cleaner_kfintech_preserves_group_columns_through_conflict_resolution(): """Regression: the conflict resolver must not strip transaction_number or folio_number from the cleaned DataFrame, otherwise the composite-key From 9a0fb1d10fa1fde349c28ad61dd394ac57277e80 Mon Sep 17 00:00:00 2001 From: sandeep raju <161712005+sandeeprjs92@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:27:05 +0530 Subject: [PATCH 2/3] capture dividend option flag and reject plan_type mismatches Acting on a LinkedIn comment about dividend option mismatches. CAMS ships a REINVEST_F column ('Y' = reinvest, 'N' = payout) and KFintech ships DIVOPT with human-readable strings that differ across file vintages: legacy files still use 'DIVIDEND PAYOUT' / 'DIVIDEND REINVESTMENT' wording while post-SEBI-2021 files use 'IDCW PAYOUT' / 'IDCW REINVESTMENT'. Both map to the same canonical plan_type in the scheme master. Each adapter's normalize() now emits a plan_type_from_feed column populated from the raw flag via a case-insensitive, whitespace- collapsed lookup. The per-row validator reads that column and raises a ValidationError (CorrectionType.OTHER) whenever the feed's declared plan_type disagrees with the scheme master's value. When the feed is silent on dividend option (empty column or 'GROWTH' with no payout/reinvest distinction), the scheme master remains the source of truth and validation passes. --- src/openreversefeed/adapters/cams.py | 28 +++++++++++++++++ src/openreversefeed/adapters/kfintech.py | 37 ++++++++++++++++++++++ src/openreversefeed/core/validator.py | 18 ++++++++++- tests/unit/test_validator.py | 39 ++++++++++++++++++++++++ 4 files changed, 121 insertions(+), 1 deletion(-) diff --git a/src/openreversefeed/adapters/cams.py b/src/openreversefeed/adapters/cams.py index 001fa67..dbc269f 100644 --- a/src/openreversefeed/adapters/cams.py +++ b/src/openreversefeed/adapters/cams.py @@ -25,6 +25,20 @@ "BROKCODE": "broker_code", "PANNO": "pan", "INVNAME": "investor_name", + # CAMS ships an "REINVEST_F" column that encodes whether the dividend + # option on the scheme is a reinvestment or a payout. We capture the + # raw flag so validators / downstream can check it against the scheme + # master's plan_type. + "REINVEST_F": "dividend_option_flag", +} + +# Mapping from the raw CAMS REINVEST_F flag to the canonical plan_type +# vocabulary the scheme master stores. 'Y' means reinvest, 'N' means +# payout. Anything else (empty, 'P' for pure growth, nulls) → None, +# which means "the feed doesn't tell us, don't assert anything." +CAMS_REINVEST_F_TO_PLAN_TYPE: dict[str, str] = { + "Y": "idcw_reinvest", + "N": "idcw_payout", } _TYPE_FLIP_MAP: dict[str, str] = { @@ -125,6 +139,20 @@ def normalize(self, raw: pd.DataFrame) -> pd.DataFrame: else: df["__source_meta"] = [{}] * len(df) + # Translate the CAMS REINVEST_F flag into the canonical plan_type + # vocabulary the scheme master uses. Leaves None where the feed + # is silent so the validator can skip the check cleanly. + if "dividend_option_flag" in df.columns: + df["plan_type_from_feed"] = ( + df["dividend_option_flag"] + .astype(str) + .str.strip() + .str.upper() + .map(CAMS_REINVEST_F_TO_PLAN_TYPE) + ) + else: + df["plan_type_from_feed"] = None + df.insert(0, "registrar_row_index", range(len(df))) return df diff --git a/src/openreversefeed/adapters/kfintech.py b/src/openreversefeed/adapters/kfintech.py index d7996a2..fd1c0a4 100644 --- a/src/openreversefeed/adapters/kfintech.py +++ b/src/openreversefeed/adapters/kfintech.py @@ -28,6 +28,27 @@ "PANNO": "pan", "INV_NAME": "investor_name", "BROK_CODE": "broker_code", + # KFintech DIVOPT encodes the dividend / growth plan for the scheme. + # Legacy files use "DIVIDEND PAYOUT" / "DIVIDEND REINVESTMENT" strings, + # post-SEBI-2021 files use "IDCW PAYOUT" / "IDCW REINVESTMENT". The + # validator reconciles the feed flag against the scheme master. + "DIVOPT": "dividend_option_flag", +} + +# Mapping from the raw KFintech DIVOPT text to the canonical plan_type +# vocabulary. Matches are case-insensitive and whitespace-collapsed so +# "idcw payout" and "IDCW PAYOUT" resolve to the same value. Legacy +# "DIVIDEND ..." wording is mapped identically — SEBI renamed these to +# IDCW in 2021 but KFintech still ships old strings on historical files. +KFINTECH_DIVOPT_TO_PLAN_TYPE: dict[str, str] = { + "IDCW PAYOUT": "idcw_payout", + "IDCW REINVESTMENT": "idcw_reinvest", + "IDCW REINVEST": "idcw_reinvest", + "DIVIDEND PAYOUT": "idcw_payout", + "DIVIDEND REINVESTMENT": "idcw_reinvest", + "DIVIDEND REINVEST": "idcw_reinvest", + "GROWTH": "growth", + "GROWTH PAYOUT": "growth", } _FORMAT2_FIELD_MAP = dict(_FORMAT1_FIELD_MAP) @@ -100,6 +121,22 @@ def _normalize_with_map(self, raw: pd.DataFrame, fmap: dict[str, str]) -> pd.Dat df["__source_meta"] = raw[unknown_cols].to_dict(orient="records") else: df["__source_meta"] = [{}] * len(df) + + # Translate the KFintech DIVOPT text into the canonical plan_type + # vocabulary. Legacy files use "DIVIDEND PAYOUT" wording, post-SEBI + # (2021) files use "IDCW PAYOUT" — both map to the same value. + if "dividend_option_flag" in df.columns: + df["plan_type_from_feed"] = ( + df["dividend_option_flag"] + .astype(str) + .str.strip() + .str.upper() + .str.replace(r"\s+", " ", regex=True) + .map(KFINTECH_DIVOPT_TO_PLAN_TYPE) + ) + else: + df["plan_type_from_feed"] = None + df.insert(0, "registrar_row_index", range(len(df))) return df diff --git a/src/openreversefeed/core/validator.py b/src/openreversefeed/core/validator.py index 9fd2174..8f59c04 100644 --- a/src/openreversefeed/core/validator.py +++ b/src/openreversefeed/core/validator.py @@ -39,7 +39,23 @@ def validate_row(row: dict[str, Any], cache: PrewarmCache) -> None: if pan not in cache.accounts_by_pan: raise ValidationError(CorrectionType.PAN_NOT_FOUND, f"PAN not in accounts: {pan}") - if row["scheme_code"] not in cache.schemes_by_code: + scheme = cache.schemes_by_code.get(row["scheme_code"]) + if scheme is None: raise ValidationError( CorrectionType.SCHEME_NOT_FOUND, f"scheme not found: {row['scheme_code']}" ) + + # Dividend option mismatch: if the feed explicitly declares a + # plan_type (CAMS REINVEST_F, KFintech DIVOPT) and it disagrees with + # the scheme master, route the row to the correction queue instead of + # silently accepting it. This catches the common post-SEBI-2021 case + # where historical feed files still use "DIVIDEND" wording while the + # master has been migrated to "IDCW". + feed_plan = row.get("plan_type_from_feed") + scheme_plan = scheme.get("plan_type") + if feed_plan and scheme_plan and feed_plan != scheme_plan: + raise ValidationError( + CorrectionType.OTHER, + f"plan_type mismatch: feed says '{feed_plan}', " + f"scheme master says '{scheme_plan}' for {row['scheme_code']}", + ) diff --git a/tests/unit/test_validator.py b/tests/unit/test_validator.py index 53d251f..e63c1c6 100644 --- a/tests/unit/test_validator.py +++ b/tests/unit/test_validator.py @@ -62,3 +62,42 @@ def test_missing_folio_number(): validate_row(row, _CACHE) assert exc_info.value.correction_type is CorrectionType.OTHER assert "folio_number" in exc_info.value.message + + +def test_plan_type_mismatch_between_feed_and_scheme_raises(): + """When the feed's translated plan_type disagrees with the scheme + master, the row must not silently pass. This is the LinkedIn-comment + case: historical files with 'DIVIDEND PAYOUT' wording against a + master that's been migrated to 'idcw_reinvest'.""" + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_reinvest"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = "idcw_payout" + with pytest.raises(ValidationError) as exc_info: + validate_row(row, cache) + assert exc_info.value.correction_type is CorrectionType.OTHER + assert "plan_type mismatch" in exc_info.value.message + + +def test_plan_type_matching_feed_and_scheme_passes(): + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = "idcw_payout" + validate_row(row, cache) # should not raise + + +def test_plan_type_silent_feed_does_not_block(): + """When the feed doesn't declare a plan_type (empty / None), we must + not block the row — the scheme master remains the source of truth.""" + cache = PrewarmCache( + schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}}, + accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}}, + ) + row = dict(_VALID) + row["plan_type_from_feed"] = None + validate_row(row, cache) # should not raise From e4bd1dee8c2e3b86033d8be95b53858ca9eeb972 Mon Sep 17 00:00:00 2001 From: sandeep raju <161712005+sandeeprjs92@users.noreply.github.com> Date: Wed, 15 Apr 2026 10:42:22 +0530 Subject: [PATCH 3/3] resolve family pan by canonical investor name when ownership_type is silent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Acting on a LinkedIn comment about messy / inconsistent investor data on family PANs. When a family PAN maps to multiple accounts and the row doesn't carry an ownership_type the resolver can tie-break with, try an investor-name equality match after canonicalisation. Canonicalisation is upper-case + whitespace collapse — deliberately not fuzzy, because false positives on family accounts are worse than an AmbiguousPanError the operator can fix in the correction queue. Only activates when (a) the PAN has multiple accounts, (b) the row carries an investor_name, (c) exactly one stored account matches. If two stored accounts share the same canonical name or the row is silent on the name, we fall through to the existing individual default / ambiguous path unchanged. The adapters already map CAMS INVNAME and KFintech INV_NAME to the investor_name canonical column, so no adapter change needed. --- src/openreversefeed/core/account_resolver.py | 29 +++++++++++ tests/unit/test_account_resolver.py | 51 ++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/src/openreversefeed/core/account_resolver.py b/src/openreversefeed/core/account_resolver.py index a4cc62f..2fcdd1c 100644 --- a/src/openreversefeed/core/account_resolver.py +++ b/src/openreversefeed/core/account_resolver.py @@ -17,6 +17,20 @@ class PanNotFoundError(Exception): pass +def _canonicalize_name(value: Any) -> str: + """Normalize an investor name for equality comparison. + + Collapses whitespace runs, upper-cases, strips punctuation-like + leading/trailing spaces. Deliberately NOT a fuzzy match — two + names only tie if they agree after canonicalization. This catches + the common "RAJESH KUMAR" vs "Rajesh Kumar" (double space) vs + " rajesh kumar" case without introducing false positives. + """ + if value is None: + return "" + return " ".join(str(value).upper().split()) + + def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]: pan = str(row["pan"]).strip().upper() by_ownership = cache.accounts_by_pan.get(pan) @@ -32,6 +46,21 @@ def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]: if ot and ot in by_ownership: return by_ownership[ot] + # Investor-name fallback for family PANs. When multiple accounts share + # the PAN and the row doesn't declare an ownership_type we can match + # against, try a canonicalised name equality against the stored + # account names. The adapters already emit an investor_name column + # (CAMS INVNAME, KFintech INV_NAME). + feed_name = _canonicalize_name(row.get("investor_name")) + if feed_name: + matches = [ + acc + for acc in by_ownership.values() + if _canonicalize_name(acc.get("name")) == feed_name + ] + if len(matches) == 1: + return matches[0] + # Prefer 'individual' default if "individual" in by_ownership: return by_ownership["individual"] diff --git a/tests/unit/test_account_resolver.py b/tests/unit/test_account_resolver.py index 4122354..14c07f9 100644 --- a/tests/unit/test_account_resolver.py +++ b/tests/unit/test_account_resolver.py @@ -47,3 +47,54 @@ def test_pan_not_found(): cache = PrewarmCache(accounts_by_pan={}) with pytest.raises(PanNotFoundError): resolve_account({"pan": "P1"}, cache) + + +def test_family_pan_falls_back_to_investor_name_match(): + """When the ownership_type can't tie-break a family PAN, a canonical + name match picks the right account. Catches the common + 'Rajesh Kumar' vs 'RAJESH KUMAR' whitespace/case drift between the + feed file and the account master.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "huf": {"id": "u1", "name": "Suresh Kumar HUF"}, + "joint": {"id": "u2", "name": "Rajesh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": " rajesh kumar "} + acc = resolve_account(row, cache) + assert acc["id"] == "u2" + + +def test_family_pan_name_fallback_needs_unique_match(): + """If two stored accounts both match the canonical name, fall through + to the individual default / ambiguous error rather than picking one.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "huf": {"id": "u1", "name": "Rajesh Kumar"}, + "joint": {"id": "u2", "name": "Rajesh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": "RAJESH KUMAR"} + with pytest.raises(AmbiguousPanError) as exc_info: + resolve_account(row, cache) + assert set(exc_info.value.candidate_ids) == {"u1", "u2"} + + +def test_family_pan_name_fallback_ignored_without_investor_name(): + """If the feed row doesn't ship an investor name at all, the fallback + must not activate — we rely on the existing individual default.""" + cache = PrewarmCache( + accounts_by_pan={ + "P1": { + "individual": {"id": "u1", "name": "Rajesh Kumar"}, + "joint": {"id": "u2", "name": "Suresh Kumar"}, + } + } + ) + row = {"pan": "P1", "ownership_type": None, "investor_name": None} + acc = resolve_account(row, cache) + assert acc["id"] == "u1"