Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/openreversefeed/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ class FeedAdapter(ABC):
field_map: dict[str, str]
type_flip_map: dict[str, str]

# Transaction types the adapter actively refuses. Rows with a
# transaction_type in this set are dropped by the cleaner before the rest
# of the pipeline runs. Used for CAMS TICOB / TOCOB — the source system
# rejects these at validation time because the COB (close of business)
# suffix is not a transferable order and will produce bogus positions if
# classified as a TI/TO.
rejected_types: set[str] = set()

@abstractmethod
def parse(self, file_path: str | Path) -> pd.DataFrame:
"""Read the file from disk, return a raw DataFrame (source column names)."""
Expand Down
42 changes: 41 additions & 1 deletion src/openreversefeed/adapters/cams.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,20 @@
"BROKCODE": "broker_code",
"PANNO": "pan",
"INVNAME": "investor_name",
# CAMS ships an "REINVEST_F" column that encodes whether the dividend
# option on the scheme is a reinvestment or a payout. We capture the
# raw flag so validators / downstream can check it against the scheme
# master's plan_type.
"REINVEST_F": "dividend_option_flag",
}

# Mapping from the raw CAMS REINVEST_F flag to the canonical plan_type
# vocabulary the scheme master stores. 'Y' means reinvest, 'N' means
# payout. Anything else (empty, 'P' for pure growth, nulls) → None,
# which means "the feed doesn't tell us, don't assert anything."
CAMS_REINVEST_F_TO_PLAN_TYPE: dict[str, str] = {
"Y": "idcw_reinvest",
"N": "idcw_payout",
}

_TYPE_FLIP_MAP: dict[str, str] = {
Expand All @@ -38,7 +52,7 @@
"DP": "D",
}

_BUY_TYPES = {"P", "SI", "TI", "D", "BON"}
_BUY_TYPES = {"P", "SI", "TI", "D", "BON", "NFO"}
_SELL_TYPES = {"R", "SO", "TO", "DP"}
_NO_EFFECT_TYPES = {"N", "J"}

Expand All @@ -52,8 +66,19 @@
"D": "dividend",
"DP": "dividend_payout",
"BON": "bonus",
# NFO (New Fund Offer): initial subscription to a new scheme during the
# offer period. Classified as a purchase in the ledger — the investor is
# acquiring units at the NFO price (usually ₹10) and the outcome is a
# buy position just like a regular purchase.
"NFO": "new_fund_offer",
}

# CAMS transaction types we actively refuse. TICOB / TOCOB are the
# "close of business" variants of transfer in/out and the source system
# rejects them at validation time. If you have a registrar that ships real
# COB data you want to process, subclass CamsAdapter and clear this set.
_REJECTED_TYPES = {"TICOB", "TOCOB"}


class _CamsPairStrategy(PairRemovalStrategy):
def remove(self, df: pd.DataFrame) -> pd.DataFrame:
Expand Down Expand Up @@ -87,6 +112,7 @@ class CamsAdapter(FeedAdapter):
discriminator_headers: set[str] = set()
field_map = _FIELD_MAP
type_flip_map = _TYPE_FLIP_MAP
rejected_types = _REJECTED_TYPES

def parse(self, file_path: str | Path) -> pd.DataFrame:
path = Path(file_path)
Expand All @@ -113,6 +139,20 @@ def normalize(self, raw: pd.DataFrame) -> pd.DataFrame:
else:
df["__source_meta"] = [{}] * len(df)

# Translate the CAMS REINVEST_F flag into the canonical plan_type
# vocabulary the scheme master uses. Leaves None where the feed
# is silent so the validator can skip the check cleanly.
if "dividend_option_flag" in df.columns:
df["plan_type_from_feed"] = (
df["dividend_option_flag"]
.astype(str)
.str.strip()
.str.upper()
.map(CAMS_REINVEST_F_TO_PLAN_TYPE)
)
else:
df["plan_type_from_feed"] = None

df.insert(0, "registrar_row_index", range(len(df)))
return df

Expand Down
40 changes: 40 additions & 0 deletions src/openreversefeed/adapters/kfintech.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,27 @@
"PANNO": "pan",
"INV_NAME": "investor_name",
"BROK_CODE": "broker_code",
# KFintech DIVOPT encodes the dividend / growth plan for the scheme.
# Legacy files use "DIVIDEND PAYOUT" / "DIVIDEND REINVESTMENT" strings,
# post-SEBI-2021 files use "IDCW PAYOUT" / "IDCW REINVESTMENT". The
# validator reconciles the feed flag against the scheme master.
"DIVOPT": "dividend_option_flag",
}

# Mapping from the raw KFintech DIVOPT text to the canonical plan_type
# vocabulary. Matches are case-insensitive and whitespace-collapsed so
# "idcw payout" and "IDCW PAYOUT" resolve to the same value. Legacy
# "DIVIDEND ..." wording is mapped identically — SEBI renamed these to
# IDCW in 2021 but KFintech still ships old strings on historical files.
KFINTECH_DIVOPT_TO_PLAN_TYPE: dict[str, str] = {
"IDCW PAYOUT": "idcw_payout",
"IDCW REINVESTMENT": "idcw_reinvest",
"IDCW REINVEST": "idcw_reinvest",
"DIVIDEND PAYOUT": "idcw_payout",
"DIVIDEND REINVESTMENT": "idcw_reinvest",
"DIVIDEND REINVEST": "idcw_reinvest",
"GROWTH": "growth",
"GROWTH PAYOUT": "growth",
}

_FORMAT2_FIELD_MAP = dict(_FORMAT1_FIELD_MAP)
Expand Down Expand Up @@ -61,6 +82,9 @@
"R": (Action.SELL, "redemption"),
"D": (Action.BUY, "dividend"),
"DP": (Action.SELL, "dividend_payout"),
# NFO (New Fund Offer): initial subscription during the offer period.
# Classified as a buy just like a regular purchase.
"NFO": (Action.BUY, "new_fund_offer"),
}

_FLAG_TO_ACTION = {
Expand Down Expand Up @@ -97,6 +121,22 @@ def _normalize_with_map(self, raw: pd.DataFrame, fmap: dict[str, str]) -> pd.Dat
df["__source_meta"] = raw[unknown_cols].to_dict(orient="records")
else:
df["__source_meta"] = [{}] * len(df)

# Translate the KFintech DIVOPT text into the canonical plan_type
# vocabulary. Legacy files use "DIVIDEND PAYOUT" wording, post-SEBI
# (2021) files use "IDCW PAYOUT" — both map to the same value.
if "dividend_option_flag" in df.columns:
df["plan_type_from_feed"] = (
df["dividend_option_flag"]
.astype(str)
.str.strip()
.str.upper()
.str.replace(r"\s+", " ", regex=True)
.map(KFINTECH_DIVOPT_TO_PLAN_TYPE)
)
else:
df["plan_type_from_feed"] = None

df.insert(0, "registrar_row_index", range(len(df)))
return df

Expand Down
29 changes: 29 additions & 0 deletions src/openreversefeed/core/account_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ class PanNotFoundError(Exception):
pass


def _canonicalize_name(value: Any) -> str:
"""Normalize an investor name for equality comparison.

Collapses whitespace runs, upper-cases, strips punctuation-like
leading/trailing spaces. Deliberately NOT a fuzzy match — two
names only tie if they agree after canonicalization. This catches
the common "RAJESH KUMAR" vs "Rajesh Kumar" (double space) vs
" rajesh kumar" case without introducing false positives.
"""
if value is None:
return ""
return " ".join(str(value).upper().split())


def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]:
pan = str(row["pan"]).strip().upper()
by_ownership = cache.accounts_by_pan.get(pan)
Expand All @@ -32,6 +46,21 @@ def resolve_account(row: dict[str, Any], cache: PrewarmCache) -> dict[str, Any]:
if ot and ot in by_ownership:
return by_ownership[ot]

# Investor-name fallback for family PANs. When multiple accounts share
# the PAN and the row doesn't declare an ownership_type we can match
# against, try a canonicalised name equality against the stored
# account names. The adapters already emit an investor_name column
# (CAMS INVNAME, KFintech INV_NAME).
feed_name = _canonicalize_name(row.get("investor_name"))
if feed_name:
matches = [
acc
for acc in by_ownership.values()
if _canonicalize_name(acc.get("name")) == feed_name
]
if len(matches) == 1:
return matches[0]

# Prefer 'individual' default
if "individual" in by_ownership:
return by_ownership["individual"]
Expand Down
9 changes: 9 additions & 0 deletions src/openreversefeed/core/cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ def run(self, df: pd.DataFrame, adapter: FeedAdapter) -> pd.DataFrame:

step = df.copy()

# Drop rows whose transaction_type is in the adapter's rejected set
# (e.g. CAMS TICOB / TOCOB). These do not represent real orders and
# silently classifying them as transfer in/out would corrupt positions.
rejected = getattr(adapter, "rejected_types", set()) or set()
if rejected and "transaction_type" in step.columns:
step = step[~step["transaction_type"].isin(rejected)].reset_index(drop=True)
if step.empty:
return step

# Step 4b — pair removal (registrar-specific)
if adapter.registrar is Registrar.KFINTECH:
step = remove_kfintech_pairs(step)
Expand Down
18 changes: 17 additions & 1 deletion src/openreversefeed/core/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,23 @@ def validate_row(row: dict[str, Any], cache: PrewarmCache) -> None:
if pan not in cache.accounts_by_pan:
raise ValidationError(CorrectionType.PAN_NOT_FOUND, f"PAN not in accounts: {pan}")

if row["scheme_code"] not in cache.schemes_by_code:
scheme = cache.schemes_by_code.get(row["scheme_code"])
if scheme is None:
raise ValidationError(
CorrectionType.SCHEME_NOT_FOUND, f"scheme not found: {row['scheme_code']}"
)

# Dividend option mismatch: if the feed explicitly declares a
# plan_type (CAMS REINVEST_F, KFintech DIVOPT) and it disagrees with
# the scheme master, route the row to the correction queue instead of
# silently accepting it. This catches the common post-SEBI-2021 case
# where historical feed files still use "DIVIDEND" wording while the
# master has been migrated to "IDCW".
feed_plan = row.get("plan_type_from_feed")
scheme_plan = scheme.get("plan_type")
if feed_plan and scheme_plan and feed_plan != scheme_plan:
raise ValidationError(
CorrectionType.OTHER,
f"plan_type mismatch: feed says '{feed_plan}', "
f"scheme master says '{scheme_plan}' for {row['scheme_code']}",
)
51 changes: 51 additions & 0 deletions tests/unit/test_account_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,54 @@ def test_pan_not_found():
cache = PrewarmCache(accounts_by_pan={})
with pytest.raises(PanNotFoundError):
resolve_account({"pan": "P1"}, cache)


def test_family_pan_falls_back_to_investor_name_match():
"""When the ownership_type can't tie-break a family PAN, a canonical
name match picks the right account. Catches the common
'Rajesh Kumar' vs 'RAJESH KUMAR' whitespace/case drift between the
feed file and the account master."""
cache = PrewarmCache(
accounts_by_pan={
"P1": {
"huf": {"id": "u1", "name": "Suresh Kumar HUF"},
"joint": {"id": "u2", "name": "Rajesh Kumar"},
}
}
)
row = {"pan": "P1", "ownership_type": None, "investor_name": " rajesh kumar "}
acc = resolve_account(row, cache)
assert acc["id"] == "u2"


def test_family_pan_name_fallback_needs_unique_match():
"""If two stored accounts both match the canonical name, fall through
to the individual default / ambiguous error rather than picking one."""
cache = PrewarmCache(
accounts_by_pan={
"P1": {
"huf": {"id": "u1", "name": "Rajesh Kumar"},
"joint": {"id": "u2", "name": "Rajesh Kumar"},
}
}
)
row = {"pan": "P1", "ownership_type": None, "investor_name": "RAJESH KUMAR"}
with pytest.raises(AmbiguousPanError) as exc_info:
resolve_account(row, cache)
assert set(exc_info.value.candidate_ids) == {"u1", "u2"}


def test_family_pan_name_fallback_ignored_without_investor_name():
"""If the feed row doesn't ship an investor name at all, the fallback
must not activate — we rely on the existing individual default."""
cache = PrewarmCache(
accounts_by_pan={
"P1": {
"individual": {"id": "u1", "name": "Rajesh Kumar"},
"joint": {"id": "u2", "name": "Suresh Kumar"},
}
}
)
row = {"pan": "P1", "ownership_type": None, "investor_name": None}
acc = resolve_account(row, cache)
assert acc["id"] == "u1"
14 changes: 14 additions & 0 deletions tests/unit/test_cams_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ def test_cams_classify_non_financial_no_effect():
assert action is Action.NO_EFFECT


def test_cams_classify_nfo_is_buy_new_fund_offer():
adapter = CamsAdapter()
action, tag, is_rev = adapter.classify_row(
{"transaction_type": "NFO", "transaction_mode": "N"}
)
assert action is Action.BUY
assert tag == "new_fund_offer"
assert is_rev is False


def test_cams_rejected_types_contains_ticob_and_tocob():
assert CamsAdapter.rejected_types == {"TICOB", "TOCOB"}


def test_cams_composite_key_deterministic():
adapter = CamsAdapter()
row = {
Expand Down
31 changes: 31 additions & 0 deletions tests/unit/test_cleaner.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,37 @@ def test_cleaner_runs_all_steps_on_minimal_cams_df():
assert "action_tag" in out.columns


def test_cleaner_drops_cams_ticob_and_tocob_rows():
"""CAMS close-of-business variants (TICOB/TOCOB) are unsupported by the
reference pipeline — the source system rejects them at validation time
and the cleaner must drop them before classification, otherwise the
prefix matcher would silently treat them as ordinary TI/TO transfers."""
cleaner = Cleaner()
adapter = CamsAdapter()
rows = []
for i, ttype in enumerate(["P", "TICOB", "TOCOB", "P"]):
rows.append(
{
"transaction_id": f"ID{i}",
"folio_number": "F1",
"product_code": "P1",
"scheme_code": "S1",
"units": 100.0,
"amount": 10000.0,
"transaction_date": date(2025, 1, 1),
"transaction_mode": "N",
"transaction_type": ttype,
"transaction_number": f"T{i}",
"registrar_row_index": i,
"__source_meta": [{}],
}
)
df = pd.DataFrame(rows)
out = cleaner.run(df, adapter)
assert len(out) == 2
assert set(out["transaction_type"]) == {"P"}


def test_cleaner_kfintech_preserves_group_columns_through_conflict_resolution():
"""Regression: the conflict resolver must not strip transaction_number or
folio_number from the cleaned DataFrame, otherwise the composite-key
Expand Down
39 changes: 39 additions & 0 deletions tests/unit/test_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,42 @@ def test_missing_folio_number():
validate_row(row, _CACHE)
assert exc_info.value.correction_type is CorrectionType.OTHER
assert "folio_number" in exc_info.value.message


def test_plan_type_mismatch_between_feed_and_scheme_raises():
"""When the feed's translated plan_type disagrees with the scheme
master, the row must not silently pass. This is the LinkedIn-comment
case: historical files with 'DIVIDEND PAYOUT' wording against a
master that's been migrated to 'idcw_reinvest'."""
cache = PrewarmCache(
schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_reinvest"}},
accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}},
)
row = dict(_VALID)
row["plan_type_from_feed"] = "idcw_payout"
with pytest.raises(ValidationError) as exc_info:
validate_row(row, cache)
assert exc_info.value.correction_type is CorrectionType.OTHER
assert "plan_type mismatch" in exc_info.value.message


def test_plan_type_matching_feed_and_scheme_passes():
cache = PrewarmCache(
schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}},
accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}},
)
row = dict(_VALID)
row["plan_type_from_feed"] = "idcw_payout"
validate_row(row, cache) # should not raise


def test_plan_type_silent_feed_does_not_block():
"""When the feed doesn't declare a plan_type (empty / None), we must
not block the row — the scheme master remains the source of truth."""
cache = PrewarmCache(
schemes_by_code={"SCH1": {"id": 1, "amc_id": 10, "plan_type": "idcw_payout"}},
accounts_by_pan={"ABCDE1234F": {"individual": {"id": "acc-1"}}},
)
row = dict(_VALID)
row["plan_type_from_feed"] = None
validate_row(row, cache) # should not raise
Loading