diff --git a/pipelines/deploy/EdgeX-Eventhub-to-Delta/README.md b/pipelines/deploy/EdgeX-Eventhub-to-Delta/README.md new file mode 100644 index 0000000..7c8b78b --- /dev/null +++ b/pipelines/deploy/EdgeX-Eventhub-to-Delta/README.md @@ -0,0 +1,31 @@ +# EdgeX Eventhub to Delta Pipeline + +This article provides a guide on how to execute a pipeline that batch reads EdgeX data from an Eventhub and writes to a Delta Table locally using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment. + +## Prerequisites +This pipeline job requires the packages: + +* [rtdip-sdk](../../../../getting-started/installation.md#installing-the-rtdip-sdk) + + +## Components +|Name|Description| +|---------------------------|----------------------| +|[SparkEventhubSource](../../../code-reference/pipelines/sources/spark/eventhub.md)|Reads data from an Eventhub.| +|[BinaryToStringTransformer](../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Transforms Spark DataFrame column to string.| +|[EdgeXOPCUAJsonToPCDMTransformer](../../../code-reference/pipelines/transformers/spark/edgex_opcua_json_to_pcdm.md)|Transforms EdgeX to PCDM.| +|[SparkDeltaDestination](../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to Delta.| + +## Common Errors +|Error|Solution| +|---------------------------|----------------------| +|[com.google.common.util.concurrent.ExecutionError: java.lang.NoClassDefFoundError: org/apache/spark/ErrorClassesJsonReader]|The Delta version in the Spark Session must be compatible with your local Pyspark version. See [here](https://docs.delta.io/latest/releases.html){ target="_blank" } for version compatibility| + + + +## Example +Below is an example of how to read from and write to Delta Tables locally without the need for Spark + +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py" +``` \ No newline at end of file diff --git a/pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py b/pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py new file mode 100644 index 0000000..a32c802 --- /dev/null +++ b/pipelines/deploy/EdgeX-Eventhub-to-Delta/pipeline.py @@ -0,0 +1,31 @@ +from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource +from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer +from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination +from rtdip_sdk.pipelines.transformers.spark.edgex_opcua_json_to_pcdm import EdgeXOPCUAJsonToPCDMTransformer +from pyspark.sql import SparkSession +from pyspark.sql.types import * +from pyspark.sql.functions import * +import json + + +def edgeX_eventhub_to_delta(): + + # Spark session setup not required if running in Databricks + spark = (SparkSession.builder.appName("MySparkSession") + .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .getOrCreate()) + + ehConf = { + "eventhubs.connectionString": "{EventhubConnectionString}", + "eventhubs.consumerGroup": "{EventhubConsumerGroup}", + "eventhubs.startingPosition": json.dumps({"offset": "0", "seqNo": -1, "enqueuedTime": None, "isInclusive": True})} + + source = SparkEventhubSource(spark, ehConf).read_batch() + string_data = BinaryToStringTransformer(source,"body", "body").transform() + PCDM_data = EdgeXOPCUAJsonToPCDMTransformer(string_data,"body").transform() + SparkDeltaDestination(data= PCDM_data, options= {}, destination="{/path/to/destination}").write_batch() + +if __name__ == "__main__": + edgeX_eventhub_to_delta() \ No newline at end of file diff --git a/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/README.md b/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/README.md new file mode 100644 index 0000000..caff39c --- /dev/null +++ b/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/README.md @@ -0,0 +1,66 @@ +# Fledge Pipeline using Dagster and Databricks Connect + +This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK and Databricks Connect. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment. + +!!! note "Note" + Reading from Eventhubs is currently not supported on Databricks Connect. + +## Prerequisites +Deployment using Databricks Connect requires: + +* a Databricks workspace + +* a cluster in the same workspace + +* a personal access token + +Further information on Databricks requirements can be found [here](https://docs.databricks.com/en/dev-tools/databricks-connect-ref.html#requirements). + + +This pipeline job requires the packages: + +* [rtdip-sdk](../../../../../getting-started/installation.md#installing-the-rtdip-sdk) + +* [databricks-connect](https://pypi.org/project/databricks-connect/) + +* [dagster](https://docs.dagster.io/getting-started/install) + + +!!! note "Dagster Installation" + For Mac users with an M1 or M2 chip, installation of dagster should be done as follows: + ``` + pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels + ``` + +## Components +|Name|Description| +|---------------------------|----------------------| +|[SparkDeltaSource](../../../../code-reference/pipelines/sources/spark/delta.md)|Read data from a Delta table.| +|[BinaryToStringTransformer](../../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Converts a Spark DataFrame column from binary to string.| +|[FledgeOPCUAJsonToPCDMTransformer](../../../../code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md)|Converts a Spark DataFrame column containing a json string to the Process Control Data Model.| +|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.| + +## Authentication +For Databricks authentication, the following fields should be added to a configuration profile in your [`.databrickscfg`](https://docs.databricks.com/en/dev-tools/auth.html#config-profiles) file: + +``` +[PROFILE] +host = https://{workspace_instance} +token = dapi... +cluster_id = {cluster_id} +``` + +This profile should match the configurations in your `DatabricksSession` in the example below as it will be used by the [Databricks extension](https://docs.databricks.com/en/dev-tools/vscode-ext-ref.html#configure-the-extension) in VS Code for authenticating your Databricks cluster. + +## Example +Below is an example of how to set up a pipeline to read Fledge data from a Delta table, transform it to RTDIP's [PCDM model](../../../../../domains/process_control/data_model.md) and write it to a Delta table. + +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py" +``` + +## Deploy +The following command deploys the pipeline to dagster: +`dagster dev -f ` + +Using the link provided from the command above, click on Launchpad and hit run to run the pipeline. \ No newline at end of file diff --git a/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py b/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py new file mode 100644 index 0000000..d03332f --- /dev/null +++ b/pipelines/deploy/Fledge-Dagster-Pipeline-Databricks/pipeline.py @@ -0,0 +1,36 @@ +from dagster import Definitions, ResourceDefinition, graph, op +from databricks.connect import DatabricksSession +from rtdip_sdk.pipelines.sources.spark.delta import SparkDeltaSource +from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer +from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer +from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination + +# Databricks cluster configuration +databricks_resource = ResourceDefinition.hardcoded_resource( + DatabricksSession.builder.remote( + host = "https://{workspace_instance_name}", + token = "{token}", + cluster_id = "{cluster_id}" + ).getOrCreate() +) + +# Pipeline +@op(required_resource_keys={"databricks"}) +def pipeline(context): + spark = context.resources.databricks + source = SparkDeltaSource(spark, {}, "{path_to_table}").read_batch() + transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform() + transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform() + SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch() + +@graph +def fledge_pipeline(): + pipeline() + +fledge_pipeline_job = fledge_pipeline.to_job( + resource_defs={ + "databricks": databricks_resource + } +) + +defs = Definitions(jobs=[fledge_pipeline_job]) \ No newline at end of file diff --git a/pipelines/deploy/Fledge-Dagster-Pipeline-Local/README.md b/pipelines/deploy/Fledge-Dagster-Pipeline-Local/README.md new file mode 100644 index 0000000..1cbb5a1 --- /dev/null +++ b/pipelines/deploy/Fledge-Dagster-Pipeline-Local/README.md @@ -0,0 +1,38 @@ +# Fledge Pipeline using Dagster + +This article provides a guide on how to deploy a pipeline in dagster using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment. + +## Prerequisites +This pipeline job requires the packages: + +* [rtdip-sdk](../../../../../getting-started/installation.md#installing-the-rtdip-sdk) + +* [dagster](https://docs.dagster.io/getting-started/install) + + +!!! note "Dagster Installation" + For Mac users with an M1 or M2 chip, installation of dagster should be done as follows: + ``` + pip install dagster dagster-webserver --find-links=https://github.com/dagster-io/build-grpcio/wiki/Wheels + ``` + +## Components +|Name|Description| +|---------------------------|----------------------| +|[SparkEventhubSource](../../../../code-reference/pipelines/sources/spark/eventhub.md)|Read data from an Eventhub.| +|[BinaryToStringTransformer](../../../../code-reference/pipelines/transformers/spark/binary_to_string.md)|Converts a Spark DataFrame column from binary to string.| +|[FledgeOPCUAJsonToPCDMTransformer](../../../../code-reference/pipelines/transformers/spark/fledge_opcua_json_to_pcdm.md)|Converts a Spark DataFrame column containing a json string to the Process Control Data Model.| +|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.| + +## Example +Below is an example of how to set up a pipeline to read Fledge data from an Eventhub, transform it to RTDIP's [PCDM model](../../../../../domains/process_control/data_model.md) and write it to a Delta table on your machine. + +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py" +``` + +## Deploy +The following command deploys the pipeline to dagster: +`dagster dev -f ` + +Using the link provided from the command above, click on Launchpad and hit run to run the pipeline. \ No newline at end of file diff --git a/pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py b/pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py new file mode 100644 index 0000000..ba0cefd --- /dev/null +++ b/pipelines/deploy/Fledge-Dagster-Pipeline-Local/pipeline.py @@ -0,0 +1,69 @@ +import json +from datetime import datetime as dt +from dagster import Definitions, graph, op +from dagster_pyspark.resources import pyspark_resource +from rtdip_sdk.pipelines.sources.spark.eventhub import SparkEventhubSource +from rtdip_sdk.pipelines.transformers.spark.binary_to_string import BinaryToStringTransformer +from rtdip_sdk.pipelines.transformers.spark.fledge_opcua_json_to_pcdm import FledgeOPCUAJsonToPCDMTransformer +from rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination + +# PySpark cluster configuration +packages = "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22,io.delta:delta-core_2.12:2.4.0" +my_pyspark_resource = pyspark_resource.configured( + {"spark_conf": {"spark.default.parallelism": 1, + "spark.jars.packages": packages, + "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", + "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" + } + } +) + +# EventHub configuration +eventhub_connection_string = "{eventhub_connection_string}" +eventhub_consumer_group = "{eventhub_consumer_group}" + +startOffset = "-1" +endTime = dt.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ") + +startingEventPosition = { + "offset": startOffset, + "seqNo": -1, + "enqueuedTime": None, + "isInclusive": True +} + +endingEventPosition = { + "offset": None, + "seqNo": -1, + "enqueuedTime": endTime, + "isInclusive": True +} + +ehConf = { +'eventhubs.connectionString' : eventhub_connection_string, +'eventhubs.consumerGroup': eventhub_consumer_group, +'eventhubs.startingPosition' : json.dumps(startingEventPosition), +'eventhubs.endingPosition' : json.dumps(endingEventPosition), +'maxEventsPerTrigger': 1000 +} + +# Pipeline +@op(required_resource_keys={"spark"}) +def pipeline(context): + spark = context.resources.pyspark.spark_session + source = SparkEventhubSource(spark, ehConf).read_batch() + transformer = BinaryToStringTransformer(source, "{source_column_name}", "{target_column_name}").transform() + transformer = FledgeOPCUAJsonToPCDMTransformer(transformer, "{source_column_name}").transform() + SparkDeltaDestination(transformer, {}, "{path_to_table}").write_batch() + +@graph +def fledge_pipeline(): + pipeline() + +fledge_pipeline_job = fledge_pipeline.to_job( + resource_defs={ + "spark": my_pyspark_resource + } +) + +defs = Definitions(jobs=[fledge_pipeline_job]) \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/README.md b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/README.md new file mode 100644 index 0000000..78ae943 --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/README.md @@ -0,0 +1,43 @@ +# MISO Pipeline using RTDIP and Databricks +This article provides a guide on how to deploy a MISO pipeline from a local file to a Databricks workflow using the RTDIP SDK and was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. RTDIP Pipeline Components provide Databricks with all the required Python packages and JARs to execute each component, this will automatically be set up during workflow creation. + +## Prerequisites +This pipeline assumes you have a Databricks workspace and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following: + +* [RTDIP SDK](../../../../../getting-started/installation.md#installing-the-rtdip-sdk) + +* [Java](../../../../../getting-started/installation.md#java) + +!!! note "RTDIP SDK Installation" + Ensure you have installed the RTDIP SDK as follows: + ``` + pip install "rtdip-sdk[pipelines]" + ``` + +## Components +|Name|Description| +|---------------------------|----------------------| +|[MISODailyLoadISOSource](../../../../code-reference/pipelines/sources/spark/iso/miso_daily_load_iso.md)|Read daily load data from MISO API.| +|[MISOToMDMTransformer](../../../../code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md)|Converts MISO Raw data into Meters Data Model.| +|[SparkDeltaDestination](../../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.| +|[DatabricksSDKDeploy](../../../../code-reference/pipelines/deploy/databricks.md)|Deploys an RTDIP Pipeline to Databricks Workflows leveraging the Databricks [SDK.](https://docs.databricks.com/dev-tools/sdk-python.html)| +|[DeltaTableOptimizeUtility](../../../../code-reference/pipelines/utilities/spark/delta_table_optimize.md)|[Optimizes](https://docs.delta.io/latest/optimizations-oss.html) a Delta Table| +|[DeltaTableVacuumUtility](../../../../code-reference/pipelines/utilities/spark/delta_table_vacuum.md)|[Vacuums](https://docs.delta.io/latest/delta-utility.html#-delta-vacuum) a Delta Table| + +## Example +Below is an example of how to set up a pipeline job to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table. +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/pipeline.py" +``` + +## Maintenance +The RTDIP SDK can be used to maintain Delta tables in Databricks, an example of how to set up a maintenance job to optimize and vacuum the MISO tables written from the previous example is provided below. +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/maintenance.py" +``` + +## Deploy +Deployment to Databricks uses the Databricks [SDK](https://docs.databricks.com/en/dev-tools/sdk-python.html). Users have the option to control the job's configurations including the cluster and schedule. +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/deploy.py" +``` \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/deploy.py b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/deploy.py new file mode 100644 index 0000000..9ecd09b --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/deploy.py @@ -0,0 +1,83 @@ +from rtdip_sdk.pipelines.deploy import DatabricksSDKDeploy, CreateJob, JobCluster, ClusterSpec, Task, NotebookTask, AutoScale, RuntimeEngine, DataSecurityMode, CronSchedule, Continuous, PauseStatus +from rtdip_sdk.authentication.azure import DefaultAuth + +def deploy(): + credential = DefaultAuth().authenticate() + access_token = credential.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token + + DATABRICKS_WORKSPACE = "{databricks-workspace-url}" + + # Create clusters + cluster_list = [] + cluster_list.append(JobCluster( + job_cluster_key="pipeline-cluster", + new_cluster=ClusterSpec( + node_type_id="Standard_E4ds_v5", + autoscale=AutoScale(min_workers=1, max_workers=8), + spark_version="13.3.x-scala2.12", + data_security_mode=DataSecurityMode.SINGLE_USER, + runtime_engine=RuntimeEngine.STANDARD + ) + )) + + # Create tasks + task_list = [] + task_list.append(Task( + task_key="pipeline", + job_cluster_key="pipeline-cluster", + notebook_task=NotebookTask( + notebook_path="{path/to/pipeline.py}" + ) + )) + + # Create a Databricks Job for the Task + job = CreateJob( + name="rtdip-miso-batch-pipeline-job", + job_clusters=cluster_list, + tasks=task_list, + continuous=Continuous(pause_status=PauseStatus.UNPAUSED) + ) + + # Deploy to Databricks + databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}") + databricks_pipeline_job.deploy() + + cluster_list = [] + cluster_list.append(JobCluster( + job_cluster_key="maintenance-cluster", + new_cluster=ClusterSpec( + node_type_id="Standard_E4ds_v5", + autoscale=AutoScale(min_workers=1, max_workers=3), + spark_version="13.3.x-scala2.12", + data_security_mode=DataSecurityMode.SINGLE_USER, + runtime_engine=RuntimeEngine.PHOTON + ) + )) + + task_list = [] + task_list.append(Task( + task_key="rtdip-miso-maintenance-task", + job_cluster_key="maintenance-cluster", + notebook_task=NotebookTask( + notebook_path="{path/to/maintenance.py}" + ) + )) + + # Create a Databricks Job for the Task + job = CreateJob( + name="rtdip-miso-maintenance-job", + job_clusters=cluster_list, + tasks=task_list, + schedule=CronSchedule( + quartz_cron_expression="4 * * * * ?", + timezone_id="UTC", + pause_status=PauseStatus.UNPAUSED + ) + ) + + # Deploy to Databricks + databricks_pipeline_job = DatabricksSDKDeploy(databricks_job=job, host=DATABRICKS_WORKSPACE, token=access_token, workspace_directory="{path/to/databricks/workspace/directory}") + databricks_pipeline_job.deploy() + +if __name__ == "__main__": + deploy() \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/maintenance.py b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/maintenance.py new file mode 100644 index 0000000..cfe17aa --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/maintenance.py @@ -0,0 +1,22 @@ +from rtdip_sdk.pipelines.utilities import DeltaTableOptimizeUtility, DeltaTableVacuumUtility + +def maintenance(): + TABLE_NAMES = [ + "{path.to.table.miso_usage_data}", + "{path.to.table.miso_meta_data}" + ] + + for table in TABLE_NAMES: + + DeltaTableOptimizeUtility( + spark=spark, + table_name=table + ).execute() + + DeltaTableVacuumUtility( + spark=spark, + table_name=table + ).execute() + +if __name__ == "__main__": + maintenance() \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/pipeline.py b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/pipeline.py new file mode 100644 index 0000000..75b28d9 --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Databricks/pipeline.py @@ -0,0 +1,44 @@ +from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource +from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer +from rtdip_sdk.pipelines.destinations import SparkDeltaDestination + +def pipeline(): + source_df = MISODailyLoadISOSource( + spark = spark, + options = { + "load_type": "actual", + "date": "20230520", + } + ).read_batch() + + transform_value_df = MISOToMDMTransformer( + spark=spark, + data=source_df, + output_type= "usage" + ).transform() + + transform_meta_df = MISOToMDMTransformer( + spark=spark, + data=source_df, + output_type= "meta" + ).transform() + + SparkDeltaDestination( + data=transform_value_df, + options={ + "partitionBy":"timestamp" + }, + destination="miso_usage_data" + ).write_batch() + + SparkDeltaDestination( + data=transform_meta_df, + options={ + "partitionBy":"timestamp" + }, + destination="miso_meta_data", + mode="overwrite" + ).write_batch() + +if __name__ == "__main__": + pipeline() \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/README.md b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/README.md new file mode 100644 index 0000000..bef58ce --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/README.md @@ -0,0 +1,34 @@ +# MISO Pipeline using RTDIP +This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. + +## Prerequisites +This pipeline assumes you have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following: + +* [RTDIP SDK](../../../../getting-started/installation.md#installing-the-rtdip-sdk) + +* [Java](../../../../getting-started/installation.md#java) + +!!! note "RTDIP SDK Installation" + Ensure you have installed the RTDIP SDK as follows: + ``` + pip install "rtdip-sdk[pipelines,pyspark]" + ``` + +## Components +|Name|Description| +|---------------------------|----------------------| +|[MISODailyLoadISOSource](../../../code-reference/pipelines/sources/spark/iso/miso_daily_load_iso.md)|Read daily load data from MISO API.| +|[MISOToMDMTransformer](../../../code-reference/pipelines/transformers/spark/iso/miso_to_mdm.md)|Converts MISO Raw data into Meters Data Model.| +|[SparkDeltaDestination](../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.| + +## Example +Below is an example of how to set up a pipeline to read daily load data from the MISO API, transform it into the Meters Data Model and write it to a Delta table. +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/pipeline.py:6:" +``` + +!!! note "Using environments" + If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver: + ```python + --8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/pipeline.py::5" + ``` \ No newline at end of file diff --git a/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/pipeline.py b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/pipeline.py new file mode 100644 index 0000000..7ba1ce3 --- /dev/null +++ b/pipelines/deploy/MISODailyLoad-Batch-Pipeline-Local/pipeline.py @@ -0,0 +1,53 @@ +import sys, os + +os.environ['PYSPARK_PYTHON'] = sys.executable +os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable + +from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource +from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer +from rtdip_sdk.pipelines.destinations import SparkDeltaDestination +from pyspark.sql import SparkSession + +def pipeline(): + spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\ + .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate() + + source_df = MISODailyLoadISOSource( + spark = spark, + options = { + "load_type": "actual", + "date": "20230520", + } + ).read_batch() + + transform_value_df = MISOToMDMTransformer( + spark=spark, + data=source_df, + output_type= "usage" + ).transform() + + transform_meta_df = MISOToMDMTransformer( + spark=spark, + data=source_df, + output_type= "meta" + ).transform() + + SparkDeltaDestination( + data=transform_value_df, + options={ + "partitionBy":"timestamp" + }, + destination="miso_usage_data" + ).write_batch() + + SparkDeltaDestination( + data=transform_meta_df, + options={ + "partitionBy":"timestamp" + }, + destination="miso_meta_data" + ).write_batch() + +if __name__ == "__main__": + pipeline() \ No newline at end of file diff --git a/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/README.md b/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/README.md new file mode 100644 index 0000000..d2f500b --- /dev/null +++ b/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/README.md @@ -0,0 +1,34 @@ +# MISO Pipeline using RTDIP +This article provides a guide on how to execute a MISO pipeline using RTDIP. This pipeline was tested on an M2 Macbook Pro using VS Code in a Conda (3.11) environment. + +## Prerequisites +This pipeline assumes you have a valid API key from [PJM](https://apiportal.pjm.com/) and have followed the installation instructions as specified in the Getting Started section. In particular ensure you have installed the following: + +* [RTDIP SDK](../../../../getting-started/installation.md#installing-the-rtdip-sdk) + +* [Java](../../../../getting-started/installation.md#java) + +!!! note "RTDIP SDK Installation" + Ensure you have installed the RTDIP SDK as follows: + ``` + pip install "rtdip-sdk[pipelines,pyspark]" + ``` + +## Components +|Name|Description| +|---------------------------|----------------------| +|[PJMDailyLoadISOSource](../../../code-reference/pipelines/sources/spark/iso/pjm_daily_load_iso.md)|Read daily load data from MISO API.| +|[PJMToMDMTransformer](../../../code-reference/pipelines/transformers/spark/iso/pjm_to_mdm.md)|Converts PJM Raw data into Meters Data Model.| +|[SparkDeltaDestination](../../../code-reference/pipelines/destinations/spark/delta.md)|Writes to a Delta table.| + +## Example +Below is an example of how to set up a pipeline to read daily load data from the PJM API, transform it into the Meters Data Model and write it to a Delta table. +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/pipeline.py:6:" +``` + +!!! note "Using environments" + If using an environment, include the following lines at the top of your script to prevent a difference in Python versions in worker and driver: + ```python + --8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/pipeline.py::5" + ``` \ No newline at end of file diff --git a/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/pipeline.py b/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/pipeline.py new file mode 100644 index 0000000..8fb7d91 --- /dev/null +++ b/pipelines/deploy/PJMDailyLoad-Batch-Pipeline-Local/pipeline.py @@ -0,0 +1,53 @@ +import sys, os + +os.environ['PYSPARK_PYTHON'] = sys.executable +os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable + +from rtdip_sdk.pipelines.sources import PJMDailyLoadISOSource +from rtdip_sdk.pipelines.transformers import PJMToMDMTransformer +from rtdip_sdk.pipelines.destinations import SparkDeltaDestination +from pyspark.sql import SparkSession + +def pipeline(): + spark = SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")\ + .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog").getOrCreate() + + source_df = PJMDailyLoadISOSource( + spark = spark, + options = { + "api_key": "{api_key}", + "load_type": "actual" + } + ).read_batch() + + transform_value_df = PJMToMDMTransformer( + spark=spark, + data=source_df, + output_type= "usage" + ).transform() + + transform_meta_df = PJMToMDMTransformer( + spark=spark, + data=source_df, + output_type= "meta" + ).transform() + + SparkDeltaDestination( + data=transform_value_df, + options={ + "partitionBy":"timestamp" + }, + destination="pjm_usage_data" + ).write_batch() + + SparkDeltaDestination( + data=transform_meta_df, + options={ + "partitionBy":"timestamp" + }, + destination="pjm_meta_data" + ).write_batch() + +if __name__ == "__main__": + pipeline() \ No newline at end of file diff --git a/pipelines/deploy/Python-Delta-to-Delta/README.md b/pipelines/deploy/Python-Delta-to-Delta/README.md new file mode 100644 index 0000000..9dfc253 --- /dev/null +++ b/pipelines/deploy/Python-Delta-to-Delta/README.md @@ -0,0 +1,22 @@ +# Python Delta Local Pipeline + +This article provides a guide on how to execute a simple Delta Table copy locally without Spark using the RTDIP SDK. This pipeline was tested on an M2 Macbook Pro using VS Code in a Python (3.10) environment. + +## Prerequisites +This pipeline job requires the packages: + +* [rtdip-sdk](../../../../getting-started/installation.md#installing-the-rtdip-sdk) + + +## Components +|Name|Description| +|---------------------------|----------------------| +|[PythonDeltaSource](../../../code-reference/pipelines/sources/python/delta.md)|Reads data from a Delta Table.| +|[PythonDeltaDestination](../../../code-reference/pipelines/destinations/python/delta.md)|Writes to a Delta table.| + +## Example +Below is an example of how to read from and write to Delta Tables locally without the need for Spark + +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/pipelines/deploy/Python-Delta-to-Delta/pipeline.py" +``` \ No newline at end of file diff --git a/pipelines/deploy/Python-Delta-to-Delta/pipeline.py b/pipelines/deploy/Python-Delta-to-Delta/pipeline.py new file mode 100644 index 0000000..a3fb585 --- /dev/null +++ b/pipelines/deploy/Python-Delta-to-Delta/pipeline.py @@ -0,0 +1,6 @@ +from rtdip_sdk.pipelines.sources.python.delta import PythonDeltaSource +from rtdip_sdk.pipelines.destinations.python.delta import PythonDeltaDestination + +source = PythonDeltaSource("{/path/to/source/table}").read_batch() + +destination = PythonDeltaDestination(source, "{/path/to/destination/table}", mode="append").write_batch() diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/Dockerfile b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/Dockerfile new file mode 100644 index 0000000..22468cf --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/Dockerfile @@ -0,0 +1,16 @@ +FROM ubuntu:22.04 +RUN apt-get update && apt-get upgrade -y +RUN apt-get install curl -y +RUN apt-get install unzip -y +RUN apt-get autoclean -y +RUN apt-get autoremove -y +RUN useradd -rm -d /home/rtdip -s /bin/bash -g root -G sudo -u 1001 rtdip +COPY run_conda_installer.sh /home/rtdip/ +RUN mkdir -p /home/rtdip/apps/lfenergy +COPY environment.yml /home/rtdip/apps/lfenergy/ +COPY MISO_pipeline_sample*.* /home/rtdip/apps/lfenergy/ +COPY dagster.yaml /home/rtdip/apps/lfenergy/ +COPY workspace.yaml /home/rtdip/apps/lfenergy/ +RUN chmod +x /home/rtdip/run_conda_installer.sh +WORKDIR /home/rtdip +ENTRYPOINT ["/home/rtdip/run_conda_installer.sh"] diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample.py b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample.py new file mode 100644 index 0000000..a54870a --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python +# coding: utf-8 + +# In[1]: + + +from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource +from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer +from rtdip_sdk.pipelines.destinations import SparkDeltaDestination +from pyspark.sql import SparkSession + +import shutil +import os + +spark_warehouse_local_path: str = "spark-warehouse" +if os.path.exists(spark_warehouse_local_path) and os.path.isdir(spark_warehouse_local_path): + try: + shutil.rmtree("spark-warehouse") + except Exception as ex: + print(str(ex)) + +spark = ( + SparkSession.builder.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + .getOrCreate() +) + +source_df = MISODailyLoadISOSource( + spark=spark, + options={ + "load_type": "actual", + "date": "20230520", + }, +).read_batch() + +transform_value_df = MISOToMDMTransformer( + spark=spark, data=source_df, output_type="usage" +).transform() + +transform_meta_df = MISOToMDMTransformer( + spark=spark, data=source_df, output_type="meta" +).transform() + +SparkDeltaDestination( + data=transform_value_df, + options={"partitionBy": "timestamp"}, + destination="miso_usage_data", +).write_batch() + +SparkDeltaDestination( + data=transform_meta_df, + options={"partitionBy": "timestamp"}, + destination="miso_meta_data", +).write_batch() + +spark.stop() diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample_dagster.py b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample_dagster.py new file mode 100644 index 0000000..90b0b0d --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/MISO_pipeline_sample_dagster.py @@ -0,0 +1,65 @@ +from rtdip_sdk.pipelines.destinations import SparkDeltaDestination +from rtdip_sdk.pipelines.transformers import MISOToMDMTransformer +from rtdip_sdk.pipelines.sources import MISODailyLoadISOSource + +from pyspark.sql import SparkSession + +from dagster import asset + +import shutil +import os + + +@asset +def run_miso_ingest(): + + # First: Clear local files + spark_warehouse_local_path: str = "spark-warehouse" + if os.path.exists(spark_warehouse_local_path) and os.path.isdir(spark_warehouse_local_path): + try: + shutil.rmtree("spark-warehouse") + except Exception as ex: + print(str(ex)) + + spark = ( + SparkSession.builder.config( + "spark.jars.packages", "io.delta:delta-core_2.12:2.4.0" + ) + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config( + "spark.sql.catalog.spark_catalog", + "org.apache.spark.sql.delta.catalog.DeltaCatalog", + ) + .getOrCreate() + ) + + # Build the query + source_df = MISODailyLoadISOSource( + spark=spark, + options={ + "load_type": "actual", + "date": "20230520", + }, + ).read_batch() + + transform_value_df = MISOToMDMTransformer( + spark=spark, data=source_df, output_type="usage" + ).transform() + + transform_meta_df = MISOToMDMTransformer( + spark=spark, data=source_df, output_type="meta" + ).transform() + + SparkDeltaDestination( + data=transform_value_df, + options={"partitionBy": "timestamp"}, + destination="miso_usage_data", + ).write_batch() + + SparkDeltaDestination( + data=transform_meta_df, + options={"partitionBy": "timestamp"}, + destination="miso_meta_data", + ).write_batch() + + spark.stop() diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/README.md b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/README.md new file mode 100644 index 0000000..77af9ef --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/README.md @@ -0,0 +1,25 @@ +# Spark Single Node Dagster MySql Notebook AWS Integration + +This article provides a guide on how to create a conda based self-contained environment to run LFEnergy RTDIP that integrates the following components: +* Java JDK and Apache Spark (Single node configuration). Currently, v3.4.1 Spark (PySpark) has been configured and tested. +* AWS Libraries (e.g for accessing files in AWS S3 if required). +* Jupyter Notebook server. +* Dagster (MySQL backend). + +The components of this environment are all pinned to a specific source distribution of RTDIP and have been tested in x86 Windows (using gitbash) and Linux environments. + +## Prerequisites +* Docker desktop or another local Docker environment (e.g. Ubuntu Docker). +* gitbash environment for Windows environments. + +When the installation completes, a Jupyter notebook will be running locally on port 8080 and Dagster Webserver will be running on port 3000 +Please check that this port is available or change the configuration in the installer if required. + +# Deployment +Run *run_in_docker.sh*. After the installer completes: +* At http://localhost:8080/ a jupyter notebook server will be running. Notebooks can be created to run for example new RTDIP pipelines. Sample MISO_pipeline_sample.py provided. This pipeline can be run in a Notebook. +* At http://localhost:3000/ Dagster Server with the sample MISO_pipeline_sample_dagster.py configured as a Dagster asset. +* To test the Notebook environment, create a new notebook and copy the contents of MISO_pipeline_sample.py into it and run it. This pipeline queries MISO (Midcontinent Independent System Operator) and saves the results of the query locally under a newly created directory called spark-warehouse. +* To test the Dagster environment, materialize the asset. +* For debugging purposes and running from inside the container, a file called *conda_environment_lfenergy.sh* is created under /home/rtdip/apps/lfenergy. Please use this file to activate the conda environment (e.g. *source ./conda_environment_lfenergy.sh; conda activate lfenergy*) within the container. + diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/dagster.yaml b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/dagster.yaml new file mode 100644 index 0000000..d8ea6e3 --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/dagster.yaml @@ -0,0 +1,47 @@ +# MYSQL Configuration +scheduler: + module: dagster.core.scheduler + class: DagsterDaemonScheduler + +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + +run_storage: + module: dagster_mysql.run_storage + class: MySQLRunStorage + config: + mysql_db: + hostname: DAGSTER_MYSQL_HOST + username: DAGSTER_MYSQL_USER + password: DAGSTER_MYSQL_PASSWORD + db_name: DAGSTER_MYSQL_DB + port: DAGSTER_MYSQL_PORT + + +event_log_storage: + module: dagster_mysql.event_log + class: MySQLEventLogStorage + config: + mysql_db: + hostname: DAGSTER_MYSQL_HOST + username: DAGSTER_MYSQL_USER + password: DAGSTER_MYSQL_PASSWORD + db_name: DAGSTER_MYSQL_DB + port: DAGSTER_MYSQL_PORT + +schedule_storage: + module: dagster_mysql.schedule_storage + class: MySQLScheduleStorage + config: + mysql_db: + hostname: DAGSTER_MYSQL_HOST + username: DAGSTER_MYSQL_USER + password: DAGSTER_MYSQL_PASSWORD + db_name: DAGSTER_MYSQL_DB + port: DAGSTER_MYSQL_PORT + + +run_launcher: + module: dagster.core.launcher + class: DefaultRunLauncher \ No newline at end of file diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/environment.yml b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/environment.yml new file mode 100644 index 0000000..82f7625 --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/environment.yml @@ -0,0 +1,63 @@ +# Pinned 09/20/2023 +name: lfenergy +channels: + - conda-forge + - defaults +dependencies: + - python>=3.8,<3.12 + # - mkdocs-material==9.1.21 + - mkdocs-material-extensions==1.1.1 + - jinja2==3.1.2 + - pytest==7.4.0 + - pytest-mock==3.11.1 + - pytest-cov==4.1.0 + - pylint==2.17.4 + - pip==23.1.2 + - turbodbc==4.5.10 + - numpy>=1.23.4 + - pandas>=1.5.2,<3.0.0 + - oauthlib>=3.2.2 + - cryptography>=38.0.3 + - azure-identity==1.12.0 + - azure-storage-file-datalake==12.12.0 + - azure-keyvault-secrets==4.7.0 + - boto3==1.28.2 + - pyodbc==4.0.39 + - fastapi==0.100.1 + - httpx==0.24.1 + - trio==0.22.1 + - pyspark==3.4.1 + - delta-spark>=2.2.0,<3.1.0 + - grpcio>=1.48.1 + - grpcio-status>=1.48.1 + - googleapis-common-protos>=1.56.4 + - openai==0.27.8 + - mkdocstrings==0.22.0 + - mkdocstrings-python==1.4.0 + - mkdocs-macros-plugin==1.0.1 + - pygments==2.16.1 + - pymdown-extensions==10.1.0 + - databricks-sql-connector==2.9.3 + - databricks-sdk==0.6.0 + - semver==3.0.0 + - xlrd==2.0.1 + - pygithub==1.59.0 + - strawberry-graphql[fastapi,pydantic]==0.194.4 + - web3==6.5.0 + - twine==4.0.2 + - delta-sharing-python==0.7.4 + - polars==0.18.8 + - moto[s3]==4.1.14 + - xarray>=2023.1.0,<2023.8.0 + - ecmwf-api-client==1.6.3 + - netCDF4==1.6.4 + - black==23.7.0 + - pip: + - dependency-injector==4.41.0 + - azure-functions==1.15.0 + - nest_asyncio==1.5.6 + - hvac==1.1.1 + - langchain==0.0.291 + - build==0.10.0 + - deltalake==0.10.1 + - mkdocs-material==9.2.0b3 diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_conda_installer.sh b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_conda_installer.sh new file mode 100644 index 0000000..d2e414b --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_conda_installer.sh @@ -0,0 +1,288 @@ +#!/usr/bin/env bash +# Dependencies: x86_64 Architecture, Linux (Tested on Ubuntu >= 20.04 LTS) curl, unzip +start_time=`date +%s` +echo "PATH: $PATH" +CONDA_CHECK_CMD="conda" +CONDA_CMD_DESCRIPTION=$CONDA_CHECK_CMD +CONDA_INSTALLER_NAME="Miniconda3-latest-Linux-x86_64.sh" +CONDA_INSTALLER_URL="https://repo.anaconda.com/miniconda/$CONDA_INSTALLER_NAME" +DEPLOYER_TMP_DIR=$(echo ${TMPDIR:-/tmp}"/DEPLOYER") +MINICONDA_NAME=miniconda +MINICONDA_PATH=$HOME/$MINICONDA_NAME/ +PATH=$MINICONDA_PATH/bin:$PATH +CONDA_ENV="lfenergy" +CONDA_ENV_HOME=$(pwd)/apps/$CONDA_ENV +mkdir -p $CONDA_ENV_HOME +CWD=$(pwd) +echo "Current Working Directory: $CWD" +echo "CONDA ENV HOME: $CONDA_ENV_HOME" +echo "DEPLOYER TMP Dir: $DEPLOYER_TMP_DIR" +if ! command -v $CONDA_CHECK_CMD &> /dev/null +then + echo "Current dir:" + echo "$CONDA_CMD_DESCRIPTION could not be found. Going to Install it" + mkdir -p $DEPLOYER_TMP_DIR + echo "Working Dir to download conda:" + cd $DEPLOYER_TMP_DIR + pwd + curl -O --url $CONDA_INSTALLER_URL + chmod +x $DEPLOYER_TMP_DIR/*.sh + bash $CONDA_INSTALLER_NAME -b -p $HOME/miniconda +fi +cd $CONDA_ENV_HOME +echo "Current Dir:" +pwd + +echo "Updating Conda" +conda update -n base conda -y +echo "Installing Mamba Solver" +conda install -n base conda-libmamba-solver -y +echo "Setting Solver to libmama" +conda config --set solver libmamba + +# RTDIP +export RTDIP_FILE_NAME="InnowattsRelease.zip" +export RTDIP_DOWNLOAD_URL="https://github.com/vbayon/core/archive/refs/heads/$RTDIP_FILE_NAME" +export RTDIP_DIR="core-InnowattsRelease" + +echo "Installing RTDIP ***********************************" +rm -rf ./$RTDIP_DIR +rm -rf ./api +rm -rf ./sdk +curl -L -o $RTDIP_FILE_NAME $RTDIP_DOWNLOAD_URL +unzip -o ./$RTDIP_FILE_NAME > /dev/null +cp -r ./$RTDIP_DIR/src/sdk/python/* . +rm ./$RTDIP_FILE_NAME + + +echo "Creating the environment with [CONDA]: CONDA LIBMAMBA SOLVER" +## Copying the env file +rm ./environment.yml +cp ./$RTDIP_DIR/environment.yml ./ +find ./environment.yml -type f -exec sed -i 's/rtdip-sdk/lfenergy/g' {} \; +conda env create -f environment.yml + +# +# JDK +echo "JDK jdk-17.0.2 ***********************************" +export JAVA_VERSION="jdk-17.0.2" +export JDK_FILE_NAME="openjdk-17.0.2_linux-x64_bin.tar.gz" +export JDK_DOWNLOAD_URL="https://download.java.net/java/GA/jdk17.0.2/dfd4a8d0985749f896bed50d7138ee7f/8/GPL/$JDK_FILE_NAME" + +if [ -f "$CONDA_ENV/$JDK_FILE_NAME" ]; then + echo "$CONDA_ENV/$JDK_FILE_NAME Exists" + echo "Removing JDK: $JDK_FILE_NAME" + rm -rf $CONDA_ENV/$JAVA_VERSION + unlink $HOME/JDK +fi + +if test -f "$JDK_FILE_NAME"; +then + echo "$JDK_FILE_NAME exists" +else + echo "$JDK_FILE_NAME does not exists. Downloading it from $JDK_DOWNLOAD_URL" + curl -o $JDK_FILE_NAME $JDK_DOWNLOAD_URL +fi + +tar xvfz $JDK_FILE_NAME > /dev/null +ln -s $CONDA_ENV_HOME/$JAVA_VERSION $HOME/JDK +export JAVA_HOME=$HOME/JDK +export PATH=$HOME/JDK/bin:$PATH + +# SPARK 3.4.1 +echo "Installing SPARK 3.4.1***********************************" +export SPARK_VERSION="spark-3.4.1-bin-hadoop3" +export SPARK_FILE_NAME="spark-3.4.1-bin-hadoop3.tgz" +export SPARK_DOWNLOAD_URL="https://dlcdn.apache.org/spark/spark-3.4.1/$SPARK_FILE_NAME" + + +if [ -f "$CONDA_ENV/$SPARK_VERSION" ]; then + echo "$CONDA_ENV/$SPARK_FILE_NAME Exists" + echo "Removing Spark: $SPARK_FILE_NAME" + rm -rf $CONDA_ENV/$SPARK_VERSION + unlink $HOME/SPARK +fi + +if test -f "$SPARK_FILE_NAME"; +then + echo "$SPARK_FILE_NAME exists" +else + echo "$SPARK_FILE_NAME does not exists. Downloading it from $SPARK_DOWNLOAD_URL" + curl -o $SPARK_FILE_NAME $SPARK_DOWNLOAD_URL +fi + + +tar xvfz $SPARK_FILE_NAME > /dev/null +ln -s $CONDA_ENV_HOME/$SPARK_VERSION $HOME/SPARK +export SPARK_HOME=$HOME/SPARK + + +# Extra libraries +echo "Installing Extra Libs ***********************************" +export AWS_JAVA_SDK_BUNDLE_JAR_FILE_NAME="aws-java-sdk-bundle-1.11.1026.jar" +export AWS_JAVA_SDK_BUNDLE_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/1.11.1026/$AWS_JAVA_SDK_BUNDLE_JAR_FILE_NAME" + +export HADOOP_AWS_JAR_FILE_NAME="hadoop-aws-3.3.2.jar" +export HADOOP_AWS_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/3.3.2/$HADOOP_AWS_JAR_FILE_NAME" + +export HADOOP_COMMON_JAR_FILE_NAME="hadoop-common-3.3.2.jar" +export HADOOP_COMMON_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-common/3.3.2/$HADOOP_COMMON_JAR_FILE_NAME" + +export HADOOP_HDFS_JAR_FILE_NAME="hadoop-hdfs-3.3.2.jar" +export HADOOP_HDFS_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/3.3.2/$HADOOP_HDFS_JAR_FILE_NAME" + +export WOODSTOCK_CORE_JAR_FILE_NAME="woodstox-core-6.5.1.jar" +export WOODSTOCK_CORE_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/fasterxml/woodstox/woodstox-core/6.5.1/$WOODSTOCK_CORE_JAR_FILE_NAME" + +export STAX2_API_JAR_FILE_NAME="stax2-api-4.2.1.jar" +export STAX2_API__JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/codehaus/woodstox/stax2-api/4.2.1/$STAX2_API_JAR_FILE_NAME" + +export COMMONS_CONFIGURATION_JAR_FILE_NAME="commons-configuration2-2.9.0.jar" +export COMMONS_CONFIGURATION_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/apache/commons/commons-configuration2/2.9.0/$COMMONS_CONFIGURATION_JAR_FILE_NAME" + +export RE2J_JAR_FILE_NAME="re2j-1.7.jar" +export RE2J_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/google/re2j/re2j/1.7/$RE2J_JAR_FILE_NAME" + +export AZURE_EVENTHUBS_SPARK_JAR_FILE_NAME="azure-eventhubs-spark_2.12-2.3.22.jar" +export AZURE_EVENTHUBS_SPARK_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/microsoft/azure/azure-eventhubs-spark_2.12/2.3.22/$AZURE_EVENTHUBS_SPARK_JAR_FILE_NAME" + +export AZURE_EVENTHUBS_JAR_FILE_NAME="azure-eventhubs-3.3.0.jar" +export AZURE_EVENTHUBS_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/com/microsoft/azure/azure-eventhubs/3.3.0/$AZURE_EVENTHUBS_JAR_FILE_NAME" + +export SCALA_JAVA8_COMPAT_JAR_FILE_NAME="scala-java8-compat_2.12-1.0.2.jar" +export SCALA_JAVA8_COMPAT_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/scala-lang/modules/scala-java8-compat_2.12/1.0.2/$SCALA_JAVA8_COMPAT_JAR_FILE_NAME" + +export PROTON_J_JAR_FILE_NAME="proton-j-0.34.1.jar" +export PROTON_J_JAR_DOWNLOAD_URL="https://repo1.maven.org/maven2/org/apache/qpid/proton-j/0.34.1/$PROTON_J_JAR_FILE_NAME" + +curl -o $AWS_JAVA_SDK_BUNDLE_JAR_FILE_NAME $AWS_JAVA_SDK_BUNDLE_JAR_DOWNLOAD_URL +mv $AWS_JAVA_SDK_BUNDLE_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $HADOOP_AWS_JAR_FILE_NAME $HADOOP_AWS_JAR_DOWNLOAD_URL +mv $HADOOP_AWS_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $HADOOP_COMMON_JAR_FILE_NAME $HADOOP_COMMON_JAR_DOWNLOAD_URL +mv $HADOOP_COMMON_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $HADOOP_HDFS_JAR_FILE_NAME $HADOOP_HDFS_JAR_DOWNLOAD_URL +mv $HADOOP_HDFS_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $WOODSTOCK_CORE_JAR_FILE_NAME $WOODSTOCK_CORE_JAR_DOWNLOAD_URL +mv $WOODSTOCK_CORE_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $STAX2_API_JAR_FILE_NAME $STAX2_API__JAR_DOWNLOAD_URL +mv $STAX2_API_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $COMMONS_CONFIGURATION_JAR_FILE_NAME $COMMONS_CONFIGURATION_JAR_DOWNLOAD_URL +mv $COMMONS_CONFIGURATION_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $RE2J_JAR_FILE_NAME $RE2J_JAR_DOWNLOAD_URL +mv $RE2J_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $AZURE_EVENTHUBS_SPARK_JAR_FILE_NAME $AZURE_EVENTHUBS_SPARK_JAR_DOWNLOAD_URL +mv $AZURE_EVENTHUBS_SPARK_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $AZURE_EVENTHUBS_JAR_FILE_NAME $AZURE_EVENTHUBS_JAR_DOWNLOAD_URL +mv $AZURE_EVENTHUBS_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $SCALA_JAVA8_COMPAT_JAR_FILE_NAME $SCALA_JAVA8_COMPAT_JAR_DOWNLOAD_URL +mv $SCALA_JAVA8_COMPAT_JAR_FILE_NAME $SPARK_HOME/jars + +curl -o $PROTON_J_JAR_FILE_NAME $PROTON_J_JAR_DOWNLOAD_URL +mv $PROTON_J_JAR_FILE_NAME $SPARK_HOME/jars + +echo "Finished INSTALLING $JAVA_VERSION and $SPARK_VERSION and Extra Libraries" + + Password Generator +apt-get install pwgen +## +echo "Installing MySQL" +# Generate random passwords +MYSQL_ROOT_PASSWORD=$(pwgen -s -c -n 10) +MYSQL_DAGSTER_PASSWORD=$(pwgen -s -c -n 10) + +# MySQL Config for MySQL Clients connecting to MySQL Server +export MYSQL_HOSTNAME="localhost" +export MYSQL_PORT="3306" +export MYSQL_DAGSTER_DATABASE_NAME="dagster" +export MYSQL_VOLUME="/tmp" +export MYSQL_DAGSTER_USERNAME="dagster" + +apt-get install debconf -y +debconf-set-selections <<< "mysql-server mysql-server/root_password password $MYSQL_ROOT_PASSWORD" +debconf-set-selections <<< "mysql-server mysql-server/root_password_again password $MYSQL_ROOT_PASSWORD" + + +apt-get install -y mysql-server +apt-get install -y mysql-client +service mysql --full-restart +mysql -uroot -p$MYSQL_ROOT_PASSWORD -e "CREATE DATABASE ${MYSQL_DAGSTER_DATABASE_NAME} CHARACTER SET utf8 COLLATE utf8_unicode_ci;;" +mysql -uroot -p$MYSQL_ROOT_PASSWORD -e "CREATE USER ${MYSQL_DAGSTER_USERNAME}@localhost IDENTIFIED BY '${MYSQL_DAGSTER_PASSWORD}';" +mysql -uroot -p$MYSQL_ROOT_PASSWORD -e "GRANT ALL PRIVILEGES ON ${MYSQL_DAGSTER_DATABASE_NAME}.* TO '${MYSQL_DAGSTER_USERNAME}'@'localhost';" +mysql -uroot -p$MYSQL_ROOT_PASSWORD -e "FLUSH PRIVILEGES;" + +export DAGSTER_HOME=$CONDA_ENV_HOME + +sed -i "s/DAGSTER_MYSQL_HOST/$MYSQL_HOSTNAME/" $DAGSTER_HOME/dagster.yaml +sed -i "s/DAGSTER_MYSQL_USER/$MYSQL_DAGSTER_USERNAME/" $DAGSTER_HOME/dagster.yaml +sed -i "s/DAGSTER_MYSQL_PASSWORD/$MYSQL_DAGSTER_PASSWORD/" $DAGSTER_HOME/dagster.yaml +sed -i "s/DAGSTER_MYSQL_DB/$MYSQL_DAGSTER_DATABASE_NAME/" $DAGSTER_HOME/dagster.yaml +sed -i "s/DAGSTER_MYSQL_PORT/$MYSQL_PORT/" $DAGSTER_HOME/dagster.yaml + +eval "$(conda shell.bash hook)" +conda config --set default_threads 4 +conda env list +# Load by default conda environment vars when running in container +# To avoid error: CommandNotFoundError: Your shell has not been properly configured to use 'conda activate'. +# source $HOME/$MINICONDA_NAME/etc/profile.d/conda.sh +## +conda install -y conda-build +conda activate $CONDA_ENV + +# Adding source code to the lib path +conda develop $CONDA_ENV_HOME + +conda info +echo "Finished Installing Conda [Mamba] Env $CONDA_ENV" +end_time=`date +%s` +runtime=$((end_time-start_time)) +echo "Total Installation Runtime: $runtime [seconds]" +echo "Test environment not intended for using in production. Backup any changes made to this environment" +# +export DAGSTER_PORT="3000" +export HOST="0.0.0.0" +CONDA_ENVIRONMENT_FILE_NAME="conda_environment_$CONDA_ENV.sh" +echo "#!/usr/bin/env bash" > $CONDA_ENVIRONMENT_FILE_NAME +echo "export PATH=$PATH" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export JAVA_HOME=$JAVA_HOME" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export SPARK_HOME=$SPARK_HOME" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export DAGSTER_HOME=$DAGSTER_HOME" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export HOST=$HOST" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export DAGSTER_PORT=$DAGSTER_PORT" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export MYSQL_DAGSTER_USERNAME=$MYSQL_DAGSTER_USERNAME" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "export MYSQL_DAGSTER_PASSWORD=$MYSQL_DAGSTER_PASSWORD" >> $CONDA_ENVIRONMENT_FILE_NAME +echo "source $HOME/$MINICONDA_NAME/etc/profile.d/conda.sh" >> $CONDA_ENVIRONMENT_FILE_NAME +chmod +x $CONDA_ENVIRONMENT_FILE_NAME +echo "export SPARK_HOME=$SPARK_HOME" +echo "NOTEBOOK_PORT: $NOTEBOOK_PORT" +# Install and Run Notebook +conda install -y notebook=6.5.4 +export NOTEBOOK_PORT="8080" +# Install and run Dagster +echo "Going to install dagster" +conda install -y dagster=1.5.6 +conda install -y dagster-mysql=1.5.6 +echo "Going to install dagster-webserver" +yes | pip install dagster-webserver==1.5.6 +echo "Going to run Jupyter on host:$HOST/port:$NOTEBOOK_PORT" +jupyter notebook --no-browser --port=$NOTEBOOK_PORT --ip=$HOST --NotebookApp.token='' --NotebookApp.password='' --allow-root & +echo "Going to run Dagster on host:$HOST/port:$DAGSTER_PORT " +echo "Running Dagster daemon" +dagster-daemon run > dagster_daemon_logs.txt 2>&1 & +echo "Allowing for dagster-daemon to start running" +sleep 60 +echo "Running Webserver" +dagster-webserver -h $HOST -p $DAGSTER_PORT > dagster_webserver_logs.txt 2>&1 & +tail -f *.txt +sleep infinity + diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_in_docker.sh b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_in_docker.sh new file mode 100644 index 0000000..6fbf2d8 --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/run_in_docker.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +docker container stop rtdip +docker container rm rtdip +docker system prune -a -f +docker image rm "rtdip:Dockerfile" +docker build -t "rtdip:Dockerfile" . +docker run --name rtdip --publish 8080:8080 --publish 3000:3000 "rtdip:Dockerfile" diff --git a/pipelines/deploy/Spark-Single-Node-Notebook-AWS/workspace.yaml b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/workspace.yaml new file mode 100644 index 0000000..41b38ec --- /dev/null +++ b/pipelines/deploy/Spark-Single-Node-Notebook-AWS/workspace.yaml @@ -0,0 +1,4 @@ +# workspace.yaml + +load_from: + - python_file: MISO_pipeline_sample_dagster.py \ No newline at end of file diff --git a/queries/Circular-Average/README.md b/queries/Circular-Average/README.md new file mode 100644 index 0000000..b268e22 --- /dev/null +++ b/queries/Circular-Average/README.md @@ -0,0 +1,30 @@ +# Circular Average + +[Circular Average](../../code-reference/query/circular-average.md) - A function that receives a dataframe of raw tag data and computes the circular mean for samples in a range, returning the results. + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit of the data| +region|str|Region| +asset|str|Asset| +data_security_level|str|Level of data security| +data_type|str|Type of the data (float, integer, double, string) +tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +start_date|str|Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +end_date|str|End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +time_interval_rate|str|The time interval rate (numeric input)| +time_interval_unit|str|The time interval unit (second, minute, day, hour)| +lower_bound|int|Lower boundary for the sample range| +upper_bound|int|Upper boundary for the sample range| +include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Circular-Average/circular_average.py" +``` \ No newline at end of file diff --git a/queries/Circular-Average/circular_average.py b/queries/Circular-Average/circular_average.py new file mode 100644 index 0000000..556e2dc --- /dev/null +++ b/queries/Circular-Average/circular_average.py @@ -0,0 +1,25 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import circular_average + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "time_interval_rate": "15", + "time_interval_unit": "minute", + "lower_bound": 0, + "upper_bound": 360, + "include_bad_data": True, +} +x = circular_average.get(connection, parameters) +print(x) diff --git a/queries/Circular-Standard-Deviation/README.md b/queries/Circular-Standard-Deviation/README.md new file mode 100644 index 0000000..8d7bff9 --- /dev/null +++ b/queries/Circular-Standard-Deviation/README.md @@ -0,0 +1,30 @@ +# Circular Standard Deviation + +[Circular Standard Deviation](../../code-reference/query/circular-standard-deviation.md) - A function that receives a dataframe of raw tag data and computes the circular standard deviation for samples assumed to be in the range, returning the results. + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit of the data| +region|str|Region| +asset|str|Asset| +data_security_level|str|Level of data security| +data_type|str|Type of the data (float, integer, double, string) +tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +start_date|str|Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +end_date|str|End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +time_interval_rate|str|The time interval rate (numeric input)| +time_interval_unit|str|The time interval unit (second, minute, day, hour)| +lower_bound|int|Lower boundary for the sample range| +upper_bound|int|Upper boundary for the sample range| +include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Circular-Standard-Deviation/circular_standard_deviation.py" +``` \ No newline at end of file diff --git a/queries/Circular-Standard-Deviation/circular_standard_deviation.py b/queries/Circular-Standard-Deviation/circular_standard_deviation.py new file mode 100644 index 0000000..34b3601 --- /dev/null +++ b/queries/Circular-Standard-Deviation/circular_standard_deviation.py @@ -0,0 +1,25 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import circular_standard_deviation + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "time_interval_rate": "15", + "time_interval_unit": "minute", + "lower_bound": 0, + "upper_bound": 360, + "include_bad_data": True, +} +x = circular_standard_deviation.get(connection, parameters) +print(x) diff --git a/queries/Interpolate/README.md b/queries/Interpolate/README.md new file mode 100644 index 0000000..057acb2 --- /dev/null +++ b/queries/Interpolate/README.md @@ -0,0 +1,34 @@ +# Interpolate + +[Interpolate](../../code-reference/query/interpolate.md) - takes resampling one step further to estimate the values of unknown data points that fall between existing, known data points. In addition to the resampling parameters, interpolation also requires: + +Interpolation Method - Forward Fill, Backward Fill or Linear + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit of the data| +region|str|Region| +asset|str|Asset| +data_security_level|str|Level of data security| +data_type|str|Type of the data (float, integer, double, string) +tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +start_date|str|Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +end_date|str|End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +sample_rate|int|(deprecated) Please use time_interval_rate instead. See below.| +sample_unit|str|(deprecated) Please use time_interval_unit instead. See below.| +time_interval_rate|str|The time interval rate (numeric input)| +time_interval_unit|str|The time interval unit (second, minute, day, hour)| +agg_method|str|Aggregation Method (first, last, avg, min, max)| +interpolation_method|str|Interpolation method (forward_fill, backward_fill, linear)| +include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Interpolate/interpolate.py" +``` \ No newline at end of file diff --git a/queries/Interpolate/interpolate.py b/queries/Interpolate/interpolate.py new file mode 100644 index 0000000..3698e95 --- /dev/null +++ b/queries/Interpolate/interpolate.py @@ -0,0 +1,25 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import interpolate + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "time_interval_rate": "15", + "time_interval_unit": "minute", + "agg_method": "first", + "interpolation_method": "forward_fill", + "include_bad_data": True, +} +x = interpolate.get(connection, parameters) +print(x) diff --git a/queries/Interpolation-at-Time/README.md b/queries/Interpolation-at-Time/README.md new file mode 100644 index 0000000..fe1ba21 --- /dev/null +++ b/queries/Interpolation-at-Time/README.md @@ -0,0 +1,28 @@ +# Interpolation at Time + +[Interpolation at Time](../../code-reference/query/interpolation-at-time.md) - works out the linear interpolation at a specific time based on the points before and after. This is achieved by providing the following parameter: + +Timestamps - A list of timestamp or timestamps + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit of the data| +|region|str|Region| +|asset|str|Asset| +|data_security_level|str|Level of data security| +|data_type|str|Type of the data (float, integer, double, string)| +|tag_names|str|List of tagname or tagnames ["tag_1", "tag_2"]| +|timestamps|list|List of timestamp or timestamps in the format YYY-MM-DDTHH:MM:SS or YYY-MM-DDTHH:MM:SS+zz:zz where %z is the timezone. (Example +00:00 is the UTC timezone)| +|window_length|int|Add longer window time in days for the start or end of specified date to cater for edge cases.| +|include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Interpolation-at-Time/interpolation_at_time.py" +``` \ No newline at end of file diff --git a/queries/Interpolation-at-Time/interpolation_at_time.py b/queries/Interpolation-at-Time/interpolation_at_time.py new file mode 100644 index 0000000..441ab07 --- /dev/null +++ b/queries/Interpolation-at-Time/interpolation_at_time.py @@ -0,0 +1,20 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import interpolation_at_time + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "timestamps": ["2023-01-01", "2023-01-02"], + "window_length": 1, +} +x = interpolation_at_time.get(connection, parameters) +print(x) diff --git a/queries/Metadata/README.md b/queries/Metadata/README.md new file mode 100644 index 0000000..d32ea96 --- /dev/null +++ b/queries/Metadata/README.md @@ -0,0 +1,22 @@ +# Metadata + +[Metadata](../../code-reference/query/metadata.md) queries provide contextual information for time series measurements and include information such as names, descriptions and units of measure. + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit| +|region|str|Region| +|asset|str|Asset| +|data_security_level|str|Level of data security| +|tag_names|(optional, list)|Either pass a list of tagname/tagnames ["tag_1", "tag_2"] or leave the list blank [] or leave the parameter out completely| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Metadata/metadata.py" +``` \ No newline at end of file diff --git a/queries/Metadata/metadata.py b/queries/Metadata/metadata.py new file mode 100644 index 0000000..05009d7 --- /dev/null +++ b/queries/Metadata/metadata.py @@ -0,0 +1,17 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import metadata + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], +} +x = metadata.get(connection, parameters) +print(x) diff --git a/queries/Raw/README.md b/queries/Raw/README.md new file mode 100644 index 0000000..0df6883 --- /dev/null +++ b/queries/Raw/README.md @@ -0,0 +1,26 @@ +# Raw + +[Raw](../../code-reference/query/raw.md) facilitates performing raw extracts of time series data, typically filtered by a Tag Name or Device Name and an event time. + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit| +|region|str|Region| +|asset|str|Asset| +|data_security_level|str|Level of data security| +|data_type|str|Type of the data (float, integer, double, string)| +|tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +|start_date|str|Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|end_date|str|End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Raw/raw.py" +``` \ No newline at end of file diff --git a/queries/Raw/raw.py b/queries/Raw/raw.py new file mode 100644 index 0000000..9de0b9d --- /dev/null +++ b/queries/Raw/raw.py @@ -0,0 +1,21 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import raw + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "include_bad_data": True, +} +x = raw.get(connection, parameters) +print(x) diff --git a/queries/Resample/README.md b/queries/Resample/README.md new file mode 100644 index 0000000..3d1754b --- /dev/null +++ b/queries/Resample/README.md @@ -0,0 +1,37 @@ +# Resample + +[Resample](../../code-reference/query/resample.md) enables changing the frequency of time series observations. This is achieved by providing the following parameters: + +Sample Rate - (deprecated) +Sample Unit - (deprecated) +Time Interval Rate - The time interval rate +Time Interval Unit - The time interval unit (second, minute, day, hour) +Aggregation Method - Aggregations including first, last, avg, min, max + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit of the data| +|region|str|Region| +|asset|str|Asset| +|data_security_level|str|Level of data security| +|data_type|str|Type of the data (float, integer, double, string)| +|tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +|start_date|str|Start date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|end_date|str|End date (Either a date in the format YY-MM-DD or a datetime in the format YYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|sample_rate|int|(deprecated) Please use time_interval_rate instead. See below.| +|sample_unit|str|(deprecated) Please use time_interval_unit instead. See below.| +|time_interval_rate|str|The time interval rate (numeric input)| +|time_interval_unit|str|The time interval unit (second, minute, day, hour)| +|agg_method|str|Aggregation Method (first, last, avg, min, max)| +|include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Resample/resample.py" +``` \ No newline at end of file diff --git a/queries/Resample/resample.py b/queries/Resample/resample.py new file mode 100644 index 0000000..1a23e9e --- /dev/null +++ b/queries/Resample/resample.py @@ -0,0 +1,24 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import resample + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "time_interval_rate": "15", + "time_interval_unit": "minute", + "agg_method": "first", + "include_bad_data": True, +} +x = resample.get(connection, parameters) +print(x) diff --git a/queries/Time-Weighted-Average/README.md b/queries/Time-Weighted-Average/README.md new file mode 100644 index 0000000..a6f5caa --- /dev/null +++ b/queries/Time-Weighted-Average/README.md @@ -0,0 +1,37 @@ +# Time Weighted Average + +[Time Weighted Averages](../../code-reference/query/time-weighted-average.md) provide an unbiased average when working with irregularly sampled data. The RTDIP SDK requires the following parameters to perform time weighted average queries: + +Window Size Mins - (deprecated) +Time Interval Rate - The time interval rate +Time Interval Unit - The time interval unit (second, minute, day, hour) +Window Length - Adds a longer window time for the start or end of specified date to cater for edge cases +Step - Data points with step "enabled" or "disabled". The options for step are "true", "false" or "metadata" as string types. For "metadata", the query requires that the TagName has a step column configured correctly in the meta data table + +## Prerequisites +Ensure you have installed the RTDIP SDK as specified in the [Getting Started](../../../getting-started/installation.md#installing-the-rtdip-sdk) section. + +This example is using [DefaultAuth()](../../code-reference/authentication/azure.md) and [DatabricksSQLConnection()](../../code-reference/query/db-sql-connector.md) to authenticate and connect. You can find other ways to authenticate here. The alternative built in connection methods are either by [PYODBCSQLConnection()](../../code-reference/query/pyodbc-sql-connector.md), [TURBODBCSQLConnection()](../../code-reference/query/turbodbc-sql-connector.md) or [SparkConnection()](../../code-reference/query/spark-connector.md). + +## Parameters +|Name|Type|Description| +|---|---|---| +|business_unit|str|Business unit| +|region|str|Region| +|asset|str|Asset| +|data_security_level|str|Level of data security| +|data_type|str|Type of the data (float, integer, double, string)| +|tag_names|list|List of tagname or tagnames ["tag_1", "tag_2"]| +|start_date|str|Start date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|end_date|str|End date (Either a utc date in the format YYYY-MM-DD or a utc datetime in the format YYYY-MM-DDTHH:MM:SS or specify the timezone offset in the format YYYY-MM-DDTHH:MM:SS+zz:zz)| +|window_size_mins|int|(deprecated) Window size in minutes. Please use time_interval_rate and time_interval_unit below instead| +|time_interval_rate|str|The time interval rate (numeric input)| +|time_interval_unit|str|The time interval unit (second, minute, day, hour)| +|window_length|int|Add longer window time in days for the start or end of specified date to cater for edge cases| +|include_bad_data|bool|Include "Bad" data points with True or remove "Bad" data points with False| +|step|str|Data points with step "enabled" or "disabled". The options for step are "true", "false" or "metadata". "metadata" will retrieve the step value from the metadata table| + +## Example +```python +--8<-- "https://raw.githubusercontent.com/rtdip/samples/main/queries/Time-Weighted-Average/time_weighted_average.py" +``` \ No newline at end of file diff --git a/queries/Time-Weighted-Average/time_weighted_average.py b/queries/Time-Weighted-Average/time_weighted_average.py new file mode 100644 index 0000000..6ba9afd --- /dev/null +++ b/queries/Time-Weighted-Average/time_weighted_average.py @@ -0,0 +1,25 @@ +from rtdip_sdk.authentication.azure import DefaultAuth +from rtdip_sdk.connectors import DatabricksSQLConnection +from rtdip_sdk.queries import time_weighted_average + +auth = DefaultAuth().authenticate() +token = auth.get_token("2ff814a6-3304-4ab8-85cb-cd0e6f879c1d/.default").token +connection = DatabricksSQLConnection("{server_hostname}", "{http_path}", token) + +parameters = { + "business_unit": "{business_unit}", + "region": "{region}", + "asset": "{asset_name}", + "data_security_level": "{security_level}", + "data_type": "float", + "tag_names": ["{tag_name_1}", "{tag_name_2}"], + "start_date": "2023-01-01", + "end_date": "2023-01-31", + "time_interval_rate": "15", + "time_interval_unit": "minute", + "window_length": 1, + "include_bad_data": True, + "step": "true" +} +x = time_weighted_average.get(connection, parameters) +print(x)