Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
b1d9c97
Added dagster and query examples (#4)
rodalynbarce Aug 15, 2023
3584996
Added Circular Average/Standard Deviation examples
rodalynbarce Aug 16, 2023
3389ebc
Updated link
rodalynbarce Aug 16, 2023
7aa2009
Added token
rodalynbarce Aug 16, 2023
cff20b6
Updated link
rodalynbarce Aug 16, 2023
1bd25e1
Merge pull request #8 from rodalynbarce/feature/00441
cching95 Aug 16, 2023
c0f0bb0
Initial Commit
vbayon Aug 22, 2023
3befe54
Added Envinronment
vbayon Aug 23, 2023
89f44ec
add python delta sample
JamesKnBr Sep 1, 2023
ceeb40a
Merge pull request #11 from JamesKnBr/main
cching95 Sep 1, 2023
751610b
change relative links
JamesKnBr Sep 1, 2023
22eb27e
Merge pull request #12 from JamesKnBr/main
cching95 Sep 4, 2023
6b8af8e
Added MISO example
rodalynbarce Aug 10, 2023
f38b809
Added RTDIP and Java links
rodalynbarce Aug 10, 2023
ef2d132
Update folder name
rodalynbarce Aug 15, 2023
2e766cf
Update link
rodalynbarce Aug 15, 2023
dc000c8
Update link
rodalynbarce Aug 15, 2023
2239ad7
Update link
rodalynbarce Aug 15, 2023
eb0c453
Added dagster and query examples (#4)
rodalynbarce Aug 15, 2023
d35e3a1
Added MISO sample pipeline
rodalynbarce Aug 21, 2023
72c6908
Update link
rodalynbarce Aug 21, 2023
a3ee41f
Added PJM
rodalynbarce Sep 4, 2023
8dff1c3
Updated doc links
rodalynbarce Sep 4, 2023
d2b641b
Updated pipeline links
rodalynbarce Sep 4, 2023
8cc2cc2
Merge remote-tracking branch 'origin/main' into feature/00434
rodalynbarce Sep 4, 2023
6eb4180
Merge pull request #13 from rodalynbarce/feature/00434
cching95 Sep 4, 2023
16eea66
Add EdgeX eventhub to delta
JamesKnBr Sep 5, 2023
71caec5
Merge pull request #16 from JamesKnBr/samples/00015
cching95 Sep 5, 2023
f20d825
Merge branch 'main' into feature/00007
vbayon Oct 4, 2023
56c1523
Updated samples and documentation
vbayon Nov 1, 2023
a6bb8bc
Edited documentation
vbayon Nov 1, 2023
5be9a8a
Updated
vbayon Nov 3, 2023
ea8bd29
Updated
vbayon Nov 3, 2023
a55f7b5
Documentation update
vbayon Nov 3, 2023
d7a51ca
Documentation updated
vbayon Nov 3, 2023
a594389
Documentation updated
vbayon Nov 3, 2023
ad5a82a
Added Dagster Installers
vbayon Nov 3, 2023
8ae1bc3
Updated
vbayon Nov 4, 2023
a452b74
Updated
vbayon Nov 4, 2023
e4189f6
Updated
vbayon Nov 5, 2023
f7030ec
Updated
vbayon Nov 11, 2023
c0cce10
Updated
vbayon Nov 11, 2023
7f90338
Updated
vbayon Nov 11, 2023
2e0d2dc
Updated
vbayon Nov 11, 2023
bd0e66f
Updated
vbayon Nov 11, 2023
824d13e
Updated
vbayon Nov 11, 2023
aa2c7e1
Updated
vbayon Nov 12, 2023
5d0a935
Notebook reenabled
vbayon Nov 13, 2023
a92e9dd
Documentation Update
vbayon Nov 13, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions pipelines/deploy/EdgeX-Eventhub-to-Delta/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# EdgeX Eventhub to Delta Pipeline

This article provides a guide on how to execute a pipeline that batch reads EdgeX data from an Eventhub and writes to a Delta Table locally using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.

## Prerequisites
This pipeline job requires the packages:

* [rtdip-sdk](../../../../getting-started/installation.md#installing-the-rtdip-sdk)


## Components
|Name|Description|
|---------------------------|----------------------|
|[SparkEventhubSource](../../../code-reference/pipelines/sources/spark/eventhub.md)|Reads data from an Eventhub.|
|[BinaryToStringTransformer](../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Transforms Spark DataFrame column to string.|
|[EdgeXOPCUAJsonToPCDMTransformer](../../../code-reference/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.md)|Transforms EdgeX to PCDM.|
|[SparkDeltaDestination](../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to Delta.|

## Common Errors
|Error|Solution|
|---------------------------|----------------------|
|[com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/ErrorClassesJsonReader]|The Delta version in the Spark Session must be compatible with your local Pyspark version. See [here](https://docs.delta.io/latest/releases.html){ target="_blank" } for version compatibility|



## Example
Below is an example of how to read from and write to Delta Tables locally without the need for Spark

```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py"
```
31 changes: 31 additions & 0 deletions pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import EdgeXOPCUAJsonToPCDMTransformer
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import json


def edgeX_eventhub_to_delta():

# Spark session setup not required if running in Databricks
spark = (SparkSession.builder.appName("MySparkSession")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate())

ehConf = {
"eventhubs.connectionString": "{EventhubConnectionString}",
"eventhubs.consumerGroup": "{EventhubConsumerGroup}",
"eventhubs.startingPosition": json.dumps({"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True})}

source = SparkEventhubSource(spark, ehConf).read_batch()
string_data = BinaryToStringTransformer(source,"body", "body").transform()
PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data,"body").transform()
SparkDeltaDestination(data= PCDM_data, options= {}, destination="{/path/to/destination}").write_batch()

if __name__ == "__main__":
edgeX_eventhub_to_delta()
66 changes: 66 additions & 0 deletions pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Fledge Pipeline using Dagster and Databricks Connect

This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK and Databricks Connect. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.

!!! note "Note"
Reading from Eventhubs is currently not supported on Databricks Connect.

## Prerequisites
Deployment using Databricks Connect requires:

* a Databricks workspace

* a cluster in the same workspace

* a personal access token

Further information on Databricks requirements can be found [here](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html#requirements).


This pipeline job requires the packages:

* [rtdip-sdk](../../../../../getting-started/installation.md#installing-the-rtdip-sdk)

* [databricks-connect](https://pypi.org/project/databricks-connect/)

* [dagster](https://docs.dagster.io/getting-started/install)


!!! note "Dagster Installation"
For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:
```
pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels
```

## Components
|Name|Description|
|---------------------------|----------------------|
|[SparkDeltaSource](../../../../code-reference/pipelines/sources/spark/delta.md)|Read data from a Delta table.|
|[BinaryToStringTransformer](../../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Converts a Spark DataFrame column from binary to string.|
|[FledgeOPCUAJsonToPCDMTransformer](../../../../code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md)|Converts a Spark DataFrame column containing a json string to the Process Control Data Model.|
|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.|

## Authentication
For Databricks authentication, the following fields should be added to a configuration profile in your [`.databrickscfg`](https://docs.databricks.com/en/dev-tools/auth.html#config-profiles) file:

```
[PROFILE]
host = https://{workspace_instance}
token = dapi...
cluster_id = {cluster_id}
```

This profile should match the configurations in your `DatabricksSession` in the example below as it will be used by the [Databricks extension](https://docs.databricks.com/en/dev-tools/vscode-ext-ref.html#configure-the-extension) in VS Code for authenticating your Databricks cluster.

## Example
Below is an example of how to set up a pipeline to read Fledge data from a Delta table, transform it to RTDIP's [PCDM model](../../../../../domains/process_control/data_model.md) and write it to a Delta table.

```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py"
```

## Deploy
The following command deploys the pipeline to dagster:
`dagster dev -f <path/to/file.py>`

Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.
36 changes: 36 additions & 0 deletions pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from dagster import Definitions, ResourceDefinition, graph, op
from databricks.connect import DatabricksSession
from rtdip_sdk.pipelines.sources.spark.delta import SparkDeltaSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination

# Databricks cluster configuration
databricks_resource = ResourceDefinition.hardcoded_resource(
DatabricksSession.builder.remote(
host = "https://{workspace_instance_name}",
token = "{token}",
cluster_id = "{cluster_id}"
).getOrCreate()
)

# Pipeline
@op(required_resource_keys={"databricks"})
def pipeline(context):
spark = context.resources.databricks
source = SparkDeltaSource(spark, {}, "{path_to_table}").read_batch()
transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()

@graph
def fledge_pipeline():
pipeline()

fledge_pipeline_job = fledge_pipeline.to_job(
resource_defs={
"databricks": databricks_resource
}
)

defs = Definitions(jobs=[fledge_pipeline_job])
38 changes: 38 additions & 0 deletions pipelines/deploy/Fledge-Dagster-Pipeline-Local/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Fledge Pipeline using Dagster

This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment.

## Prerequisites
This pipeline job requires the packages:

* [rtdip-sdk](../../../../../getting-started/installation.md#installing-the-rtdip-sdk)

* [dagster](https://docs.dagster.io/getting-started/install)


!!! note "Dagster Installation"
For Mac users with an M1 or M2 chip, installation of dagster should be done as follows:
```
pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels
```

## Components
|Name|Description|
|---------------------------|----------------------|
|[SparkEventhubSource](../../../../code-reference/pipelines/sources/spark/eventhub.md)|Read data from an Eventhub.|
|[BinaryToStringTransformer](../../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Converts a Spark DataFrame column from binary to string.|
|[FledgeOPCUAJsonToPCDMTransformer](../../../../code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md)|Converts a Spark DataFrame column containing a json string to the Process Control Data Model.|
|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.|

## Example
Below is an example of how to set up a pipeline to read Fledge data from an Eventhub, transform it to RTDIP's [PCDM model](../../../../../domains/process_control/data_model.md) and write it to a Delta table on your machine.

```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py"
```

## Deploy
The following command deploys the pipeline to dagster:
`dagster dev -f <path/to/file.py>`

Using the link provided from the command above, click on Launchpad and hit run to run the pipeline.
69 changes: 69 additions & 0 deletions pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import json
from datetime import datetime as dt
from dagster import Definitions, graph, op
from dagster_pyspark.resources import pyspark_resource
from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource
from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer
from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer
from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination

# PySpark cluster configuration
packages = "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22,io.delta:delta-core_2.12:2.4.0"
my_pyspark_resource = pyspark_resource.configured(
{"spark_conf": {"spark.default.parallelism": 1,
"spark.jars.packages": packages,
"spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension",
"spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog"
}
}
)

# EventHub configuration
eventhub_connection_string = "{eventhub_connection_string}"
eventhub_consumer_group = "{eventhub_consumer_group}"

startOffset = "-1"
endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")

startingEventPosition = {
"offset": startOffset,
"seqNo": -1,
"enqueuedTime": None,
"isInclusive": True
}

endingEventPosition = {
"offset": None,
"seqNo": -1,
"enqueuedTime": endTime,
"isInclusive": True
}

ehConf = {
'eventhubs.connectionString' : eventhub_connection_string,
'eventhubs.consumerGroup': eventhub_consumer_group,
'eventhubs.startingPosition' : json.dumps(startingEventPosition),
'eventhubs.endingPosition' : json.dumps(endingEventPosition),
'maxEventsPerTrigger': 1000
}

# Pipeline
@op(required_resource_keys={"spark"})
def pipeline(context):
spark = context.resources.pyspark.spark_session
source = SparkEventhubSource(spark, ehConf).read_batch()
transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform()
transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform()
SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch()

@graph
def fledge_pipeline():
pipeline()

fledge_pipeline_job = fledge_pipeline.to_job(
resource_defs={
"spark": my_pyspark_resource
}
)

defs = Definitions(jobs=[fledge_pipeline_job])
43 changes: 43 additions & 0 deletions pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# MISO Pipeline using RTDIP and Databricks
This article provides a guide on how to deploy a MISO pipeline from a local file to a Databricks workflow using the RTDIP SDK and was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. RTDIP Pipeline Components provide Databricks with all the required Python packages and JARs to execute each component, this will automatically be set up during workflow creation.

## Prerequisites
This pipeline assumes you have a Databricks workspace and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following:

* [RTDIP SDK](../../../../../getting-started/installation.md#installing-the-rtdip-sdk)

* [Java](../../../../../getting-started/installation.md#java)

!!! note "RTDIP SDK Installation"
Ensure you have installed the RTDIP SDK as follows:
```
pip install "rtdip-sdk[pipelines]"
```

## Components
|Name|Description|
|---------------------------|----------------------|
|[MISODailyLoadISOSource](../../../../code-reference/pipelines/sources/spark/iso/miso_daily_load_iso.md)|Read daily load data from MISO API.|
|[MISOToMDMTransformer](../../../../code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md)|Converts MISO Raw data into Meters Data Model.|
|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.|
|[DatabricksSDKDeploy](../../../../code-reference/pipelines/deploy/databricks.md)|Deploys an RTDIP Pipeline to Databricks Workflows leveraging the Databricks [SDK.](https://docs.databricks.com/dev-tools/sdk-python.html)|
|[DeltaTableOptimizeUtility](../../../../code-reference/pipelines/utilities/spark/delta_table_optimize.md)|[Optimizes](https://docs.delta.io/latest/optimizations-oss.html) a Delta Table|
|[DeltaTableVacuumUtility](../../../../code-reference/pipelines/utilities/spark/delta_table_vacuum.md)|[Vacuums](https://docs.delta.io/latest/delta-utility.html#-delta-vacuum) a Delta Table|

## Example
Below is an example of how to set up a pipeline job to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table.
```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/pipeline.py"
```

## Maintenance
The RTDIP SDK can be used to maintain Delta tables in Databricks, an example of how to set up a maintenance job to optimize and vacuum the MISO tables written from the previous example is provided below.
```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/maintenance.py"
```

## Deploy
Deployment to Databricks uses the Databricks [SDK](https://docs.databricks.com/en/dev-tools/sdk-python.html). Users have the option to control the job's configurations including the cluster and schedule.
```python
--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/deploy.py"
```
Loading