diff --git a/custom-recipes/pi-system-interpolate/recipe.json b/custom-recipes/pi-system-interpolate/recipe.json new file mode 100644 index 0000000..5954daa --- /dev/null +++ b/custom-recipes/pi-system-interpolate/recipe.json @@ -0,0 +1,55 @@ +{ + "meta": { + "label": "Interpolate", + "description": "Interpolate the transposes PI values", + "icon": "icon-dku-timeseries-resample icon-DKU_timeseries_resample" + }, + "kind": "PYTHON", + "selectableFromDataset": "input_dataset", + "inputRoles": [ + { + "name": "input_dataset", + "label": "Dataset containing transposed values", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + + "outputRoles": [ + { + "name": "api_output", + "label": "Main output displayed name", + "description": "", + "arity": "UNARY", + "required": true, + "acceptsDataset": true + } + ], + "params": [ + { + "type": "SEPARATOR", + "name": "separator_input", + "label": "Input parameters" + }, + { + "name": "datetime_column", + "label": "Time column", + "type": "COLUMN", + "columnRole": "input_dataset", + "allowedColumnTypes": [ + "date" + ], + "mandatory": true + }, + { + "name": "show_advanced_parameters", + "label": "Show advanced parameters", + "description": "", + "type": "BOOLEAN", + "defaultValue": false + } + ], + "resourceKeys": [] +} diff --git a/custom-recipes/pi-system-interpolate/recipe.py b/custom-recipes/pi-system-interpolate/recipe.py new file mode 100644 index 0000000..d5e02da --- /dev/null +++ b/custom-recipes/pi-system-interpolate/recipe.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +import dataiku +from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role +import pandas +from safe_logger import SafeLogger +from osisoft_constants import OSIsoftConstants +from osisoft_plugin_common import reorder_dataframe, iso_to_epoch, get_datetime_from_row + + +logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"]) + +logger.info("PIWebAPI Interpolate recipe v{}".format( + OSIsoftConstants.PLUGIN_VERSION +)) +current_timestamps_cache = [] +current_values_cache = [] +next_timestamps_cache = [] +next_values_cache = [] + + +input_dataset = get_input_names_for_role('input_dataset') +config = get_recipe_config() +dku_flow_variables = dataiku.get_flow_variables() + +output_names_stats = get_output_names_for_role('api_output') +output_dataset = dataiku.Dataset(output_names_stats[0]) + +logger.info("Initialization with config={}".format(logger.filter_secrets(config))) + +datetime_column = config.get("datetime_column") + +column_name_suffix_margin = max([ + len(OSIsoftConstants.VALUE_COLUMN_SUFFIX), + len(OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX) +]) + +input_parameters_dataset = dataiku.Dataset(input_dataset[0]) +input_parameters_dataframe = input_parameters_dataset.get_dataframe() + +columns_to_interpolate = [] +for column in input_parameters_dataframe.columns: + if column.endswith(OSIsoftConstants.VALUE_COLUMN_SUFFIX): + columns_name = column.split(OSIsoftConstants.VALUE_COLUMN_SUFFIX)[0] + # Todo: check that the timestamp is there too before adding to the list + columns_to_interpolate.append(columns_name) + +logger.info("Columns to interpolate: {}".format(columns_to_interpolate)) + +results = [] +time_last_request = None +client = None +previous_server_url = "" +groupby_list = {} +file_counter = 0 +previous_row = None +first_dataframe = True +final_row = {} + +with output_dataset.get_writer() as writer: + for index, input_parameters_row in input_parameters_dataframe.iterrows(): + output_rows = [] + this_row = input_parameters_row.to_dict() + reference_time = iso_to_epoch(get_datetime_from_row(input_parameters_row, datetime_column)) + if previous_row is None: + previous_row = this_row + previous_reference_time = reference_time + continue + # At this stage previous_row is past, this_row is present + for column_to_interpolate in columns_to_interpolate: + sample_time = iso_to_epoch(previous_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX))) + value = previous_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.VALUE_COLUMN_SUFFIX)) + if sample_time == previous_reference_time: + # This sample can go in output straigth away + previous_row["{}{}".format(column_to_interpolate, OSIsoftConstants.INTERPOLATED_COLUMN_SUFFIX)] = value + elif sample_time < previous_reference_time: + # Sample is in the past, so next one is in the future + future_value = this_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.VALUE_COLUMN_SUFFIX)) + future_time = iso_to_epoch(this_row.get("{}{}".format(column_to_interpolate, OSIsoftConstants.TIMESTAMP_COLUMN_SUFFIX))) + slope = (future_value - value) / (future_time - sample_time) + interpolated_value = value + slope * (previous_reference_time - sample_time) + interpolated_column_name = "{}{}".format(column_to_interpolate, OSIsoftConstants.INTERPOLATED_COLUMN_SUFFIX) + previous_row[interpolated_column_name] = interpolated_value + elif sample_time > previous_reference_time: + # Temporal paradox, that should never happen + raise Exception("Time issue: On row {}, timestamp for {} is advance of the reference timestamp".format(index, column_to_interpolate)) + output_dataframe = pandas.DataFrame([previous_row]) + output_dataframe = reorder_dataframe(output_dataframe, [datetime_column]) + if first_dataframe: + output_dataset.write_schema_from_dataframe(output_dataframe) + first_dataframe = False + writer.write_dataframe(output_dataframe) + previous_row = this_row + previous_reference_time = reference_time diff --git a/custom-recipes/pi-system-transpose/recipe.py b/custom-recipes/pi-system-transpose/recipe.py index 1505911..d7c34e7 100644 --- a/custom-recipes/pi-system-transpose/recipe.py +++ b/custom-recipes/pi-system-transpose/recipe.py @@ -1,14 +1,13 @@ # -*- coding: utf-8 -*- import dataiku from dataiku.customrecipe import get_input_names_for_role, get_recipe_config, get_output_names_for_role -import pandas as pd +import pandas from safe_logger import SafeLogger import os from temp_utils import CustomTmpFile from osisoft_constants import OSIsoftConstants -import dateutil.parser from column_name import normalise_name -from osisoft_plugin_common import reorder_dataframe +from osisoft_plugin_common import reorder_dataframe, get_datetime_from_row logger = SafeLogger("pi-system plugin", forbiden_keys=["token", "password"]) @@ -36,33 +35,6 @@ def parse_timestamp_and_value(line): return date, value -def get_datetime_from_string(datetime): - try: - time_stamp = dateutil.parser.isoparse(datetime) - return time_stamp - except: - pass - return None - - -def get_datetime_from_pandas(datetime): - try: - time_stamp = datetime.strftime('%Y-%m-%dT%H:%M:%SZ') - return time_stamp - except: - pass - return None - - -def get_datetime_from_row(row, datetime_column): - raw_datetime = row[datetime_column] - if type(raw_datetime) is str: - formated_datetime = get_datetime_from_string(raw_datetime) - else: - formated_datetime = get_datetime_from_pandas(raw_datetime) - return formated_datetime - - def get_latest_values_at_timestamp(file_handles, seek_timestamp): attribute_index = 0 values = {} @@ -220,7 +192,7 @@ def clean_cache(groupby_list): synchronize_on_identifier: value }) unnested_items_rows.append(dictionary) - unnested_items_rows = pd.DataFrame(unnested_items_rows) + unnested_items_rows = pandas.DataFrame(unnested_items_rows) unnested_items_rows = reorder_dataframe(unnested_items_rows, [OSIsoftConstants.TIMESTAMP_COLUMN_NAME, synchronize_on_identifier]) if first_dataframe: output_dataset.write_schema_from_dataframe(unnested_items_rows) diff --git a/python-lib/osisoft_constants.py b/python-lib/osisoft_constants.py index 69d5200..184ac4f 100644 --- a/python-lib/osisoft_constants.py +++ b/python-lib/osisoft_constants.py @@ -204,6 +204,7 @@ class OSIsoftConstants(object): DEFAULT_SCHEME = "https" DEFAULT_WAIT_BEFORE_RETRY = 60 DKU_ERROR_KEY = "Errors" + INTERPOLATED_COLUMN_SUFFIX = "_ip" LINKS = "Links" MAXIMUM_RETRIES_ON_THROTTLING = 5 POSSIBLE_WEB_ID_STARTS = ["F1", "I1", "P1", "L1", "D1"] @@ -376,6 +377,7 @@ class OSIsoftConstants(object): SEARCH_PATH = "search" STREAM_PATH = "streams" STREAMSETS_PATH = "streamsets" + TIME_FORMAT = "%Y-%m-%dT%H:%M:%SZ" TIMESTAMP_COLUMN_NAME = "Timestamp" TIMESTAMP_COLUMN_SUFFIX = "_ts" PIWEBAPI_AF_ENDPOINTS = { diff --git a/python-lib/osisoft_plugin_common.py b/python-lib/osisoft_plugin_common.py index 492a11a..569fd0f 100644 --- a/python-lib/osisoft_plugin_common.py +++ b/python-lib/osisoft_plugin_common.py @@ -373,6 +373,33 @@ def iso_to_epoch(iso_timestamp): return epoch_timestamp +def get_datetime_from_row(row, datetime_column): + raw_datetime = row[datetime_column] + if type(raw_datetime) is str: + formated_datetime = get_datetime_from_string(raw_datetime) + else: + formated_datetime = get_datetime_from_pandas(raw_datetime) + return formated_datetime + + +def get_datetime_from_string(datetime): + try: + _ = date_parser.isoparse(datetime) + return datetime + except Exception: + pass + return None + + +def get_datetime_from_pandas(datetime): + try: + time_stamp = datetime.strftime(OSIsoftConstants.TIME_FORMAT) + return time_stamp + except Exception: + pass + return None + + def reorder_dataframe(unnested_items_rows, first_elements): columns = unnested_items_rows.columns.tolist() for first_element in reversed(first_elements):