Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
**/**/*pycache*
/*venv
.vscode/
.idea/
52 changes: 52 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,58 @@ Included is a [random data generator][random-fire] which will generate data in l
### Testing
You can run tests locally with via `./run_tests.sh` or view the [CI test results here][travis-ci]

### Spark
Fire pyspark module ensures data flows according to the FIRE data model specification,
resulting in high quality standards used for the transmission and processing of regulatory data.
We ensure raw data is schematized according to fire entities specifications and curated based on entities constraints.

#### Schematizing raw records

Even though records may sometimes "look" structured (e.g. JSON files), enforcing a schema is not just a good
practice; in enterprise settings, it guarantees any missing field is still expected, unexpected fields are
discarded and data types are fully evaluated (e.g. a date should be treated as a date object and not a string).
In the example below, we enforce schema to incoming CSV files for the collateral.
This process is called data schematization.

```python
from fire.spark import FireModel
fire_model = FireModel().load("collateral")
fire_schema = fire_model.schema

collateral_df = spark \
.read \
.format("csv") \
.schema(fire_schema) \
.load("/path/to/raw/data")
```

#### Constraints

Applying a schema is one thing, enforcing its constraints is another.
Given the multiplicity properties of an entity, we can detect if a field is mandatory or not.
With an enumeration object, we ensure its values consistency.

```python
from fire.spark import FireModel
fire_model = FireModel().load("collateral")
fire_constraints = fire_model.constraints
```

The resulting constraints (expressed as spark SQL) can be evaluated on spark dataframe, on batch or in real time.

```python
from pyspark.sql import functions as F

@F.udf("array<string>")
def failed_expectations(expectations):
return [name for name, success in zip(constraints, expectations) if not success]

collateral_invalid = collateral_df \
.withColumn("_fire", F.array([F.expr(value) for value in expectations.values()])) \
.withColumn("_fire", failed_expectations("_fire")) \
.filter(F.size("_fire") > 0)
```


---
[fire]: https://suade.org/fire/
Expand Down
Empty file added fire/__init__.py
Empty file.
Loading