From 3e2f410b292416ba25af28e9f94893bbfdcd02dd Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Mon, 27 Oct 2025 17:13:20 +0100 Subject: [PATCH 1/6] add batch processing to AF hierarchy downloader --- .../pi-system_hierarchy/connector.json | 4 +- .../pi-system_hierarchy/connector.py | 159 +++++++++++++++++- 2 files changed, 156 insertions(+), 7 deletions(-) diff --git a/python-connectors/pi-system_hierarchy/connector.json b/python-connectors/pi-system_hierarchy/connector.json index 4185155..31754ee 100644 --- a/python-connectors/pi-system_hierarchy/connector.json +++ b/python-connectors/pi-system_hierarchy/connector.json @@ -60,7 +60,7 @@ "label": " ", "type": "BOOLEAN", "description": "Use batch mode", - "visibilityCondition": "model.show_advanced_parameters==true && model.must_retrieve_metrics==true", + "visibilityCondition": "model.show_advanced_parameters==true", "defaultValue": false }, { @@ -68,7 +68,7 @@ "label": " ", "type": "INT", "description": "Batch size", - "visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true && model.must_retrieve_metrics==true", + "visibilityCondition": "model.show_advanced_parameters==true && model.use_batch_mode==true", "minI": 1, "defaultValue": 500 }, diff --git a/python-connectors/pi-system_hierarchy/connector.py b/python-connectors/pi-system_hierarchy/connector.py index 030d73b..bcf49bd 100644 --- a/python-connectors/pi-system_hierarchy/connector.py +++ b/python-connectors/pi-system_hierarchy/connector.py @@ -31,6 +31,8 @@ def __init__(self, config, plugin_config): is_debug_mode=is_debug_mode, network_timer=self.network_timer ) + self.use_batch_mode = config.get("use_batch_mode", False) + self.batch_size = config.get("batch_size", 500) def get_read_schema(self): return None @@ -41,12 +43,18 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, headers = self.client.get_requests_headers() json_response = self.client.get(url=self.database_endpoint, headers=headers, params={}, error_source="traverse") - next_url = self.client.extract_link_with_key(json_response, "Elements") - for item in self.recurse_next_item(next_url): - if limit.is_reached(): - break - yield item + if self.use_batch_mode: + for item in self.batch_next_item(json_response, type="Database"): + if limit.is_reached(): + break + yield item + else: + next_url = self.client.extract_link_with_key(json_response, "Elements") + for item in self.recurse_next_item(next_url): + if limit.is_reached(): + break + yield item def recurse_next_item(self, next_url, parent=None, type=None): logger.info("recurse_next_item") @@ -82,6 +90,147 @@ def recurse_next_item(self, next_url, parent=None, type=None): "Id": item.get("Id") } + def batch_next_item(self, next_item, parent=None, type=None): + todo_list = [] + todo_list.append( + { + "url": self.client.extract_link_with_key(next_item, "Elements"), + "parent": next_item.get("Name"), + "type": "Database" + } + ) + batch_requests_parameters= [] + while todo_list: + item = todo_list.pop() + request_kwargs = { + "url": item.get("url"), + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + if not todo_list or len(batch_requests_parameters) > self.batch_size: + json_responses = self.client._batch_requests(batch_requests_parameters) + batch_requests_parameters = [] + for json_response in json_responses: + response_content = json_response.get("Content", {}) + links = response_content.get("Links", {}) + next_link = links.get("Next", {}) + # do something if there is a next link... + if next_link: + todo_list.append( + { + "url": next_link + } + ) + retrieved_items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + for retrieved_item in retrieved_items: + retrieved_item_path = retrieved_item.get("Path") + elements_url = self.client.extract_link_with_key(retrieved_item, "Elements") + attributes_url = self.client.extract_link_with_key(retrieved_item, "Attributes") + if elements_url: + todo_list.append( + { + "url": elements_url, + "type": "Element", + "parent": retrieved_item_path + } + ) + if attributes_url: + todo_list.append( + { + "url": attributes_url, + "type": "Attribute", + "parent": retrieved_item_path + } + ) + yield { + "ItemType": type, + "Name": retrieved_item.get("Name"), + "Type": retrieved_item.get("Type"), + "Description": retrieved_item.get("Description"), + "Path": retrieved_item.get("Path"), + "Parent": parent, + "DefaultUnitsName": retrieved_item.get("DefaultUnitsName"), + "TemplateName": retrieved_item.get("TemplateName"), + "CategoryNames": retrieved_item.get("CategoryNames"), + "ExtendedProperties": retrieved_item.get("ExtendedProperties"), + "Step": retrieved_item.get("Step"), + "WebId": retrieved_item.get("WebId"), + "Id": retrieved_item.get("Id") + } + + + def batch_recurse_next_item(self, next_items, parents=None, type=None): + # logger.info("batch_recurse_next_item") + if not isinstance(next_items, list): + next_items = [next_items] + if not isinstance(parents, list): + parents = [parents] + batch_requests_parameters= [] + types = [] + items_parents_names = [] + for next_item in next_items: + next_item_name = next_item.get("Path") + next_elements_url = self.client.extract_link_with_key(next_item, "Elements") + if next_elements_url: + request_kwargs = { + "url": next_elements_url, + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + types.append("Element") + items_parents_names.append(next_item_name) + next_attributes_url = self.client.extract_link_with_key(next_item, "Attributes") + if next_attributes_url: + request_kwargs = { + "url": next_attributes_url, + "headers": self.client.get_requests_headers() + } + batch_requests_parameters.append(request_kwargs) + types.append("Attribute") + items_parents_names.append(next_item_name) + if batch_requests_parameters: + json_responses = self.client._batch_requests(batch_requests_parameters) + # for json_response in json_responses: + # # Here we process recurse based on each response in the batch + # # Instead we could process all responses and batch all of them in one go... + # response_content = json_response.get("Content", {}) + # if OSIsoftConstants.DKU_ERROR_KEY in response_content: + # # Do something ? + # pass + # items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + # batched_items = self.batch_recurse_next_item(items) + # for item in batched_items: + # yield item + # approach 2: + next_batch_items = [] + for json_response in json_responses: + response_content = json_response.get("Content", {}) + links = response_content.get("Links", {}) + next_link = links.get("Next", {}) + # do something if there is a next link... + items = response_content.get(OSIsoftConstants.API_ITEM_KEY, []) + next_batch_items.extend(items) + batched_items = self.batch_recurse_next_item(next_batch_items, parents=items_parents_names) + for item in batched_items: + yield item + + for item, parent in zip(next_items, parents): + yield { + "ItemType": type, + "Name": item.get("Name"), + "Type": item.get("Type"), + "Description": item.get("Description"), + "Path": item.get("Path"), + "Parent": parent, + "DefaultUnitsName": item.get("DefaultUnitsName"), + "TemplateName": item.get("TemplateName"), + "CategoryNames": item.get("CategoryNames"), + "ExtendedProperties": item.get("ExtendedProperties"), + "Step": item.get("Step"), + "WebId": item.get("WebId"), + "Id": item.get("Id") + } + def get_writer(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, write_mode="OVERWRITE"): raise NotImplementedError From 11f9efa634678c8a26757ae95d114b110b55bf35 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Thu, 13 Nov 2025 10:38:51 +0100 Subject: [PATCH 2/6] preselect the batch mode --- python-connectors/pi-system_hierarchy/connector.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python-connectors/pi-system_hierarchy/connector.json b/python-connectors/pi-system_hierarchy/connector.json index 31754ee..f66a4ed 100644 --- a/python-connectors/pi-system_hierarchy/connector.json +++ b/python-connectors/pi-system_hierarchy/connector.json @@ -21,7 +21,7 @@ "label": " ", "type": "BOOLEAN", "description": "Show advanced parameters", - "defaultValue": false + "defaultValue": true }, { "name": "server_url", @@ -61,7 +61,7 @@ "type": "BOOLEAN", "description": "Use batch mode", "visibilityCondition": "model.show_advanced_parameters==true", - "defaultValue": false + "defaultValue": true }, { "name": "batch_size", From 9426f33b885c49e74e3cd1cbc33357a86daebf28 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Wed, 19 Nov 2025 13:14:45 +0100 Subject: [PATCH 3/6] fixing parent column --- .../pi-system_hierarchy/connector.py | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/python-connectors/pi-system_hierarchy/connector.py b/python-connectors/pi-system_hierarchy/connector.py index bcf49bd..fac8d13 100644 --- a/python-connectors/pi-system_hierarchy/connector.py +++ b/python-connectors/pi-system_hierarchy/connector.py @@ -1,3 +1,4 @@ +import datetime from dataiku.connector import Connector from osisoft_client import OSIsoftClient from safe_logger import SafeLogger @@ -40,12 +41,14 @@ def get_read_schema(self): def generate_rows(self, dataset_schema=None, dataset_partitioning=None, partition_id=None, records_limit = -1): limit = RecordsLimit(records_limit) + start_time = datetime.datetime.now() headers = self.client.get_requests_headers() json_response = self.client.get(url=self.database_endpoint, headers=headers, params={}, error_source="traverse") + server_name = json_response.get("ExtendedProperties", {}).get("DefaultPIServer", {}).get("Value", "Unknown server name") if self.use_batch_mode: - for item in self.batch_next_item(json_response, type="Database"): + for item in self.batch_next_item(json_response, parent=server_name, type="Database"): if limit.is_reached(): break yield item @@ -55,6 +58,10 @@ def generate_rows(self, dataset_schema=None, dataset_partitioning=None, if limit.is_reached(): break yield item + end_time = datetime.datetime.now() + duration = end_time - start_time + logger.info("generate_rows overall duration = {}s".format(duration.microseconds/1000000 + duration.seconds)) + logger.info("Network timer:{}".format(self.network_timer.get_report())) def recurse_next_item(self, next_url, parent=None, type=None): logger.info("recurse_next_item") @@ -95,11 +102,12 @@ def batch_next_item(self, next_item, parent=None, type=None): todo_list.append( { "url": self.client.extract_link_with_key(next_item, "Elements"), - "parent": next_item.get("Name"), + "parent": "\\\\" + parent + "\\" + next_item.get("Name"), "type": "Database" } ) batch_requests_parameters= [] + parent_of_batched_items = [] while todo_list: item = todo_list.pop() request_kwargs = { @@ -107,10 +115,11 @@ def batch_next_item(self, next_item, parent=None, type=None): "headers": self.client.get_requests_headers() } batch_requests_parameters.append(request_kwargs) + parent_of_batched_items.append(item.get("parent")) if not todo_list or len(batch_requests_parameters) > self.batch_size: json_responses = self.client._batch_requests(batch_requests_parameters) batch_requests_parameters = [] - for json_response in json_responses: + for parent_of_batched_item, json_response in zip(parent_of_batched_items, json_responses): response_content = json_response.get("Content", {}) links = response_content.get("Links", {}) next_link = links.get("Next", {}) @@ -131,7 +140,7 @@ def batch_next_item(self, next_item, parent=None, type=None): { "url": elements_url, "type": "Element", - "parent": retrieved_item_path + "parent": parent_of_batched_item + "\\" + retrieved_item.get("Name") } ) if attributes_url: @@ -139,7 +148,7 @@ def batch_next_item(self, next_item, parent=None, type=None): { "url": attributes_url, "type": "Attribute", - "parent": retrieved_item_path + "parent": parent_of_batched_item + "\\" + retrieved_item.get("Name") } ) yield { @@ -148,7 +157,8 @@ def batch_next_item(self, next_item, parent=None, type=None): "Type": retrieved_item.get("Type"), "Description": retrieved_item.get("Description"), "Path": retrieved_item.get("Path"), - "Parent": parent, + "LinkPath": "{}\\{}".format(parent_of_batched_item, retrieved_item.get("Name")), + "Parent": parent_of_batched_item, "DefaultUnitsName": retrieved_item.get("DefaultUnitsName"), "TemplateName": retrieved_item.get("TemplateName"), "CategoryNames": retrieved_item.get("CategoryNames"), @@ -157,6 +167,7 @@ def batch_next_item(self, next_item, parent=None, type=None): "WebId": retrieved_item.get("WebId"), "Id": retrieved_item.get("Id") } + parent_of_batched_items = [] def batch_recurse_next_item(self, next_items, parents=None, type=None): From 45f12821ba48d41e33a4f72d19e192dce7e9da38 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Wed, 26 Nov 2025 09:39:54 +0100 Subject: [PATCH 4/6] v1.4.1 --- plugin.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin.json b/plugin.json index 517456c..27db1a9 100644 --- a/plugin.json +++ b/plugin.json @@ -1,6 +1,6 @@ { "id": "pi-system", - "version": "1.4.0", + "version": "1.4.1", "meta": { "label": "PI System", "description": "Retrieve data from your OSIsoft PI System servers", From 0010b3fdee35013be2d5feb0095022ff2bf59754 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Wed, 26 Nov 2025 09:40:04 +0100 Subject: [PATCH 5/6] update changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2fc652a..febc0de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## [Version 1.4.1](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.1) - Feature release - 2025-11-26 + +- Add a AF hierarchy downloader + ## [Version 1.4.0](https://github.com/dataiku/dss-plugin-pi-server/releases/tag/v1.4.0) - Feature release - 2025-09-22 - Add write recipe From 7f18196c5816bd05097a4e406c06a33ef792c905 Mon Sep 17 00:00:00 2001 From: Alex Bourret Date: Wed, 26 Nov 2025 09:40:16 +0100 Subject: [PATCH 6/6] beta 1 --- python-lib/osisoft_constants.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python-lib/osisoft_constants.py b/python-lib/osisoft_constants.py index 20c8efa..ce9b74a 100644 --- a/python-lib/osisoft_constants.py +++ b/python-lib/osisoft_constants.py @@ -405,7 +405,7 @@ class OSIsoftConstants(object): "Security": "{base_url}/eventframes/{webid}/security", "SecurityEntries": "{base_url}/eventframes/{webid}/securityentries" } - PLUGIN_VERSION = "1.4.0-beta.1" + PLUGIN_VERSION = "1.4.1-beta.1" VALUE_COLUMN_SUFFIX = "_val" WEB_API_PATH = "piwebapi" WRITE_HEADERS = {'X-Requested-With': 'XmlHttpRequest'}