From 89307f0bc98c2e9cfadc0d0b4be697b3093c9906 Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:33:57 +0200 Subject: [PATCH 1/7] feat: Implement SharePoint data extraction sources and configuration classes --- sources/sharepoint/__init__.py | 106 ++++++++ sources/sharepoint/helpers.py | 245 ++++++++++++++++++ sources/sharepoint/sharepoint_files_config.py | 76 ++++++ sources/sharepoint_pipeline.py | 62 +++++ 4 files changed, 489 insertions(+) create mode 100644 sources/sharepoint/__init__.py create mode 100644 sources/sharepoint/helpers.py create mode 100644 sources/sharepoint/sharepoint_files_config.py create mode 100644 sources/sharepoint_pipeline.py diff --git a/sources/sharepoint/__init__.py b/sources/sharepoint/__init__.py new file mode 100644 index 000000000..4d53d983f --- /dev/null +++ b/sources/sharepoint/__init__.py @@ -0,0 +1,106 @@ +from typing import Iterator, Dict +import re + +import dlt +from dlt.common.typing import TDataItems +from dlt.common.configuration.specs import configspec, BaseConfiguration +from loguru import logger +import pandas as pd + +from .helpers import SharepointClient +from .sharepoint_files_config import SharepointFilesConfig, SharepointListConfig + + +@configspec +class SharepointCredentials(BaseConfiguration): + client_id: str = None + tenant_id: str = None + site_id: str = None + client_secret: str = None + sub_site_id: str = "" + + +@dlt.source(name="sharepoint_list", max_table_nesting=0) +def sharepoint_list( + sharepoint_list_config: SharepointListConfig, + credentials: SharepointCredentials = dlt.secrets.value, +) -> Iterator[Dict[str, str]]: + client: SharepointClient = SharepointClient(**credentials) + client.connect() + logger.info(f"Connected to SharePoint site: {client.site_info}") + + def get_pipe(sharepoint_list_config: SharepointListConfig): + def get_records(sharepoint_list_config: SharepointListConfig): + data = client.get_items_from_list(list_title=sharepoint_list_config.list_title, select=sharepoint_list_config.select) + yield from data + return dlt.resource(get_records, name=sharepoint_list_config.table_name)(sharepoint_list_config) + yield get_pipe(sharepoint_list_config=sharepoint_list_config) + + +@dlt.source(name="sharepoint_files", max_table_nesting=0) +def sharepoint_files( + sharepoint_files_config: SharepointFilesConfig, + credentials: SharepointCredentials = dlt.secrets.value, +): + client: SharepointClient = SharepointClient(**credentials) + client.connect() + logger.info(f"Connected to SharePoint site: {client.site_info}") + + def get_files( + sharepoint_files_config: SharepointFilesConfig, + last_update_timestamp: dlt.sources.incremental = dlt.sources.incremental( + cursor_path="lastModifiedDateTime", + initial_value="2020-01-01T00:00:00Z", + primary_key=(), + ), + ): + current_last_value = last_update_timestamp.last_value + logger.debug(f"current_last_value: {current_last_value}") + for file_item in client.get_files_from_path( + folder_path=sharepoint_files_config.folder_path, + file_name_startswith=sharepoint_files_config.file_name_startswith, + pattern=sharepoint_files_config.pattern, + ): + if file_item["size"] > sharepoint_files_config.file_size_limit: + logger.warning(f"File {file_item['name']} is too large, skipping") + raise RuntimeError( + f"File {file_item['name']} is larger than the limit of" + f" {sharepoint_files_config.file_size_limit} bytes." + ) + logger.debug( + "filtering files based on lastModifiedDateTime, compare to last_value:" + f" {current_last_value}" + ) + if file_item["lastModifiedDateTime"] > current_last_value or not sharepoint_files_config.is_file_incremental: + logger.info( + f"Processing file after lastModifiedDateTime filter: {file_item['name']}" + ) + + file_item["pd_function"] = sharepoint_files_config.file_type.get_pd_function() + file_item["pd_kwargs"] = sharepoint_files_config.pandas_kwargs + yield file_item + else: + logger.info( + f"Skipping file {file_item['name']} based on lastModifiedDateTime filter" + ) + + def get_records(file_item: Dict) -> TDataItems: + chunksize = file_item["pd_kwargs"].get("chunksize", None) + file_io = client.get_file_bytes_io(file_item=file_item) + + if chunksize: + with file_item["pd_function"](file_io, **file_item["pd_kwargs"]) as reader: + for num, chunk in enumerate(reader): + logger.info(f"Processing chunk {num} of {file_item['name']}") + yield chunk + else: + df = file_item["pd_function"](file_io, **file_item["pd_kwargs"]) + yield df + logger.debug(f"get_records done for {file_item['name']}") + + def get_pipe(sharepoint_files_config: SharepointFilesConfig): + return dlt.resource(get_files, name=f"{sharepoint_files_config.table_name}_files")(sharepoint_files_config) | dlt.transformer( + get_records, name=sharepoint_files_config.table_name, parallelized=False + ) + + yield get_pipe(sharepoint_files_config=sharepoint_files_config) diff --git a/sources/sharepoint/helpers.py b/sources/sharepoint/helpers.py new file mode 100644 index 000000000..7e534ba69 --- /dev/null +++ b/sources/sharepoint/helpers.py @@ -0,0 +1,245 @@ +from typing import Dict, Union, List, Tuple +from io import BytesIO +import re + +from msal import ConfidentialClientApplication +from loguru import logger +from dlt.sources.helpers.rest_client import RESTClient +from dlt.sources.helpers.rest_client.auth import BearerTokenAuth +from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator + + +class SharepointClient: + # * playground: https://developer.microsoft.com/en-us/graph/graph-explorer + # * If the result contains more results, Microsoft Graph returns an @odata.nextLink property + + def __init__( + self, + client_id: str, + tenant_id: str, + site_id: str, + client_secret: str, + sub_site_id: str = "", + ) -> None: + self.client_id = client_id + self.tenant_id = tenant_id + self.client_secret = client_secret + self.sub_site_id = sub_site_id + self.site_id = site_id + if not all([self.client_id, self.tenant_id, self.client_secret, self.site_id]): + raise ValueError( + "client_id, tenant_id, client_secret and site_id are required for connect to" + " SharePoint" + ) + self.graph_api_url = "https://graph.microsoft.com/v1.0/sites" + self.graph_site_url = f"{self.graph_api_url}/{self.site_id}" + if self.sub_site_id: + self.graph_site_url += f"/sites/{self.sub_site_id}" + + def connect(self) -> None: + authority = f"https://login.microsoftonline.com/{self.tenant_id}" + scope = ["https://graph.microsoft.com/.default"] + + app = ConfidentialClientApplication( + self.client_id, + authority=authority, + client_credential=self.client_secret, + ) + + # Get the access token + token_response = app.acquire_token_for_client(scopes=scope) + access_token = token_response.get("access_token", None) + + if access_token: + self.client = RESTClient( + base_url=self.graph_site_url, + auth=BearerTokenAuth(access_token), + paginator=JSONLinkPaginator(next_url_path="@odata.nextLink"), + ) + logger.success(f"Connected to SharePoint site id: {self.site_id} successfully") + else: + raise ConnectionError("Connection failed : ", token_response) + + @property + def sub_sites(self) -> List: + url = f"{self.graph_site_url}/sites" + response = self.client.get(url) + site_info = response.json() + if "value" in site_info: + return site_info["value"] + else: + logger.warning(f"No subsite found in {url}") + + @property + def site_info(self) -> Dict: + url = f"{self.graph_site_url}" + response = self.client.get(url) + site_info = response.json() + if not "error" in site_info: + return site_info + else: + logger.warning(f"No site_info found in {url}") + + def get_all_lists_in_site(self) -> List[Dict]: + url = f"{self.graph_site_url}/lists" + res = self.client.get(url) + res.raise_for_status() + lists_info = res.json() + if "value" in lists_info: + all_items = lists_info["value"] + filtered_lists = [ + item for item in all_items + if item.get("list", {}).get("template") == "genericList" + and "Lists" in item.get("webUrl", "") + ] + return filtered_lists + else: + filtered_lists = [] + if not filtered_lists: + logger.warning(f"No lists found in {url}") + return filtered_lists + + def get_items_from_list(self, list_title: str, select:str = None) -> List[Dict]: + #TODO, pagination not yet implemented + logger.warning( + "Pagination is not implemented for get_items_from_list, " + "it will return only first page of items." + ) + all_lists = self.get_all_lists_in_site() + filtered_lists = [ + x for x in all_lists + if x.get("list", {}).get("template") == "genericList" + and "Lists" in x.get("webUrl", "") + ] + + possible_list_titles = [x["displayName"] for x in filtered_lists] + if list_title not in possible_list_titles: + raise ValueError( + f"List with title '{list_title}' not found in site {self.site_id}. " + f"Available lists: {possible_list_titles}" + ) + + # Get the list ID + list_id = next( + x["id"] for x in filtered_lists if x["displayName"] == list_title + ) + + url = f"{self.graph_site_url}/lists/{list_id}/items?expand=fields" + if select: + url += f"(select={select})" + res = self.client.get(url) + res.raise_for_status() + items_info = res.json() + + if "value" in items_info: + output = [x.get("fields", {}) for x in items_info["value"]] + else: + output = [] + if output: + logger.info(f"Got {len(output)} items from list: {list_title}") + return output + else: + logger.warning(f"No items found in list: {list_title}, with select: {select}") + + def get_files_from_path( + self, folder_path: str, file_name_startswith: str, pattern: str = None + ) -> Dict: + folder_url = ( + f"{self.graph_site_url}/drive/root:/{folder_path}:/children?$filter=startswith(name," + f" '{file_name_startswith}')" + ) + logger.debug(f"Getting files from folder with endpoint: {folder_url}") + res = self.client.get(folder_url) + file_and_folder_items = res.json().get("value", []) + file_items = [x for x in file_and_folder_items if "file" in x.keys()] + if pattern: + logger.debug(f"Filtering files with pattern: {pattern}") + file_items = [x for x in file_items if re.search(pattern, x["name"])] + + logger.debug(f"Got number files from ms graph api: {len(file_items)}") + return file_items + + def get_file_bytes_io(self, file_item: Dict): + file_url = file_item["@microsoft.graph.downloadUrl"] + response = self.client.get(file_url) + if response.status_code == 200: + bytes_io = BytesIO(response.content) + logger.info( + f"File {file_item['name']} downloaded to BytesIO, size: {len(bytes_io.getvalue())}" + ) + return bytes_io + else: + raise FileNotFoundError(f"File not found: {file_item['name']} or can't be downloaded") + + def archive_file(self, file_item: Dict, archive_folder_path: str, new_file_name: str) -> None: + url = f"{self.graph_site_url}/drive/items/{file_item['id']}" + archive_folder_path = self.remove_driver_root_in_path(archive_folder_path) + archive_folder_id = self.create_folder_if_not_exists(folder_path=archive_folder_path) + body = { + "parentReference": {"id": archive_folder_id}, + "name": new_file_name, + } + res = self.client.patch(url, json=body) + if res.status_code == 200: + logger.success( + f"File {file_item['name']} renamed to {new_file_name} in {archive_folder_path}" + ) + else: + raise RuntimeError(f"File {file_item['name']} can't be renamed to {new_file_name}") + + def safe_get_folder_id(self, folder_path: str) -> Union[str, None]: + folder_url = f"{self.graph_site_url}/drive/root:/{folder_path}" + res = self.client.get(folder_url) + if res.status_code == 200: + return res.json()["id"] + + def list_folder(self, folder_path: str) -> Tuple[List, List]: + """List sub folders and files in folder_path + + Args: + folder_path (str): folder_path from sharepoint + + Returns: + Tuple[List, List]: (List of folders, List of files) + """ + if r"/" not in folder_path: + raise ValueError(f"Invalid folder path: {folder_path}, must contain '/'") + folder_url = f"{self.graph_site_url}/drive/root:/{folder_path}:/children" + logger.info(f"Listing from folder_path: {folder_path} using {folder_url}") + res = self.client.get(folder_url) + file_and_folder_items = res.json().get("value", []) + file_items = [x for x in file_and_folder_items if "file" in x.keys()] + folder_items = [x for x in file_and_folder_items if "folder" in x.keys()] + return (folder_items, file_items) + + def create_folder(self, folder_path: str) -> str: + if r"/" not in folder_path: + raise ValueError(f"Invalid folder path: {folder_path}, must contain '/'") + parent_folder, folder_name = folder_path.rsplit("/", 1) + parent_folder_id = self.safe_get_folder_id(parent_folder) + if not parent_folder_id: + raise ValueError(f"Parent folder {parent_folder} not found") + logger.debug(f"Creating folder {folder_name} in {parent_folder}") + folder_url = f"{self.graph_site_url}/drive/items/{parent_folder_id}/children" + body = { + "name": folder_name, + "folder": {}, + "@microsoft.graph.conflictBehavior": "fail", + } + res = self.client.post(folder_url, json=body) + if res.status_code == 201: + logger.success(f"Folder {folder_name} created") + return res.json()["id"] + else: + raise RuntimeError(f"Folder {folder_name} can't be created") + + def create_folder_if_not_exists(self, folder_path: str) -> str: + folder_id = self.safe_get_folder_id(folder_path) + if folder_id: + logger.info(f"Folder {folder_path} already exists") + return folder_id + else: + return self.create_folder(folder_path) + + def remove_driver_root_in_path(self, path: str) -> str: + return re.sub(r"^/drive/root:/", "", path) diff --git a/sources/sharepoint/sharepoint_files_config.py b/sources/sharepoint/sharepoint_files_config.py new file mode 100644 index 000000000..fd9c5ebd6 --- /dev/null +++ b/sources/sharepoint/sharepoint_files_config.py @@ -0,0 +1,76 @@ +from typing import Iterator, Optional, Sequence, List, Dict +import re +from enum import Enum + +from loguru import logger +import pandas as pd +from pydantic import BaseModel + + + +class FileType(Enum): + EXCEL = "excel" + CSV = "csv" + JSON = "json" + PARQUET = "parquet" + SAS = "sas" + SPSS = "spss" + SAV = "sav" + + def get_pd_function(self): + return { + self.EXCEL: pd.read_excel, + self.CSV: pd.read_csv, + self.JSON: pd.read_json, + self.PARQUET: pd.read_parquet, + self.SAS: pd.read_sas, + self.SPSS: pd.read_spss, + }[self] + + +class SharepointListConfig(BaseModel): + table_name: str + list_title: str + select: Optional[str] = None + limit: Optional[int] = None + is_incremental: Optional[bool] = False + + def __init__(self, **data): + super().__init__(**data) + if self.is_incremental is True: + raise NotImplementedError( + "Incremental loading for Sharepoint List is not implemented yet." + ) + +class SharepointFilesConfig(BaseModel): + file_type: FileType + folder_path: str + table_name: str + file_name_startswith: str + pattern: Optional[str] = ".*" + pandas_kwargs: Dict = {} + limit: Optional[int] = None + file_size_limit: Optional[int] = 100_000_000 # 100 MB + is_compressed_folder: Optional[bool] = False + if_apply_str_to_all_columns: Optional[bool] = True + is_file_incremental: bool = False + + def __init__(self, **data): + super().__init__(**data) + self.folder_path = validate_folder_path(self.folder_path) + self.pattern = f"^{self.file_name_startswith}{self.pattern}" + + +def validate_folder_path(folder_path: str) -> str: + if folder_path.startswith("/"): + folder_path = folder_path[1:] + if folder_path.endswith("/"): + folder_path = folder_path[:-1] + if not re.compile(r"^[a-zA-Z0-9_\-/\s\.]*$").match(folder_path): + raise ValueError( + "Invalid folder path, only alphanumeric characters, dashes and underscores are" + f" allowed: {folder_path}" + ) + if re.compile(r"//").search(folder_path): + raise ValueError(f"Invalid folder path with double slashes: {folder_path}") + return folder_path diff --git a/sources/sharepoint_pipeline.py b/sources/sharepoint_pipeline.py new file mode 100644 index 000000000..57b83b1ef --- /dev/null +++ b/sources/sharepoint_pipeline.py @@ -0,0 +1,62 @@ + +import dlt +from sharepoint import sharepoint_list, sharepoint_files, SharepointCredentials +from sharepoint.sharepoint_files_config import SharepointFilesConfig, SharepointListConfig + +if __name__ == "__main__": + # --- 1. Define SharePoint credentials --- + credentials = SharepointCredentials( + client_id="your-client-id", + tenant_id="your-tenant-id", + site_id="your-site-id", + client_secret="your-client-secret", + sub_site_id="" + ) + + # --- 2. Configure SharePoint list extraction --- + list_config = SharepointListConfig( + list_title="test_list", + select="Title,ins", + table_name="sharepoint_list_table" + ) + + # --- 3. Configure SharePoint file extraction --- + files_config = SharepointFilesConfig( + folder_path="General/sharepoint_test", + file_name_startswith="test_", + pattern=r".*\.csv$", + file_type="csv", + table_name="sharepoint_reports", + is_file_incremental=True, + file_size_limit=5_000_000, + pandas_kwargs={} + ) + + # --- 4. Create the DLT pipeline (destination = DuckDB) --- + pipeline = dlt.pipeline( + pipeline_name="sharepoint_to_duckdb", + destination="duckdb", + dataset_name="sharepoint_data", + full_refresh=False + ) + + # --- 5. Run both sources and load to DuckDB --- + print("Loading SharePoint List data...") + list_load_info = pipeline.run( + sharepoint_list(sharepoint_list_config=list_config, credentials=credentials) + ) + print(list_load_info) + with pipeline.sql_client() as client: + df = client.execute("SELECT * FROM sharepoint_list_table LIMIT 10").df() + print(df) + + + print("Loading SharePoint Files data...") + files_load_info = pipeline.run( + sharepoint_files(sharepoint_files_config=files_config, credentials=credentials) + ) + print(files_load_info) + + with pipeline.sql_client() as client: + df = client.execute("SELECT * FROM sharepoint_reports LIMIT 10").df() + print(df) From d9fe95cd9f167c302e01e071c1afa0db9ca7b2a1 Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:36:56 +0200 Subject: [PATCH 2/7] Fix: Remove file size limit from SharePoint file configuration and extraction --- sources/sharepoint/__init__.py | 6 ------ sources/sharepoint/sharepoint_files_config.py | 1 - sources/sharepoint_pipeline.py | 1 - 3 files changed, 8 deletions(-) diff --git a/sources/sharepoint/__init__.py b/sources/sharepoint/__init__.py index 4d53d983f..1740e1c70 100644 --- a/sources/sharepoint/__init__.py +++ b/sources/sharepoint/__init__.py @@ -61,12 +61,6 @@ def get_files( file_name_startswith=sharepoint_files_config.file_name_startswith, pattern=sharepoint_files_config.pattern, ): - if file_item["size"] > sharepoint_files_config.file_size_limit: - logger.warning(f"File {file_item['name']} is too large, skipping") - raise RuntimeError( - f"File {file_item['name']} is larger than the limit of" - f" {sharepoint_files_config.file_size_limit} bytes." - ) logger.debug( "filtering files based on lastModifiedDateTime, compare to last_value:" f" {current_last_value}" diff --git a/sources/sharepoint/sharepoint_files_config.py b/sources/sharepoint/sharepoint_files_config.py index fd9c5ebd6..0e748c2c8 100644 --- a/sources/sharepoint/sharepoint_files_config.py +++ b/sources/sharepoint/sharepoint_files_config.py @@ -50,7 +50,6 @@ class SharepointFilesConfig(BaseModel): pattern: Optional[str] = ".*" pandas_kwargs: Dict = {} limit: Optional[int] = None - file_size_limit: Optional[int] = 100_000_000 # 100 MB is_compressed_folder: Optional[bool] = False if_apply_str_to_all_columns: Optional[bool] = True is_file_incremental: bool = False diff --git a/sources/sharepoint_pipeline.py b/sources/sharepoint_pipeline.py index 57b83b1ef..0e63653ff 100644 --- a/sources/sharepoint_pipeline.py +++ b/sources/sharepoint_pipeline.py @@ -28,7 +28,6 @@ file_type="csv", table_name="sharepoint_reports", is_file_incremental=True, - file_size_limit=5_000_000, pandas_kwargs={} ) From dc6e8f5ce24d92d326b917dcba1f119d03663139 Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:39:31 +0200 Subject: [PATCH 3/7] Refactor: Replace loguru logger with dlt.common logger in SharePoint modules --- sources/sharepoint/__init__.py | 2 +- sources/sharepoint/helpers.py | 2 +- sources/sharepoint/sharepoint_files_config.py | 4 +--- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/sources/sharepoint/__init__.py b/sources/sharepoint/__init__.py index 1740e1c70..3d5670b50 100644 --- a/sources/sharepoint/__init__.py +++ b/sources/sharepoint/__init__.py @@ -4,7 +4,7 @@ import dlt from dlt.common.typing import TDataItems from dlt.common.configuration.specs import configspec, BaseConfiguration -from loguru import logger +from dlt.common import logger import pandas as pd from .helpers import SharepointClient diff --git a/sources/sharepoint/helpers.py b/sources/sharepoint/helpers.py index 7e534ba69..3ccf19fbe 100644 --- a/sources/sharepoint/helpers.py +++ b/sources/sharepoint/helpers.py @@ -3,7 +3,7 @@ import re from msal import ConfidentialClientApplication -from loguru import logger +from dlt.common import logger from dlt.sources.helpers.rest_client import RESTClient from dlt.sources.helpers.rest_client.auth import BearerTokenAuth from dlt.sources.helpers.rest_client.paginators import JSONLinkPaginator diff --git a/sources/sharepoint/sharepoint_files_config.py b/sources/sharepoint/sharepoint_files_config.py index 0e748c2c8..1daf69254 100644 --- a/sources/sharepoint/sharepoint_files_config.py +++ b/sources/sharepoint/sharepoint_files_config.py @@ -1,13 +1,11 @@ -from typing import Iterator, Optional, Sequence, List, Dict +from typing import Optional, Dict import re from enum import Enum -from loguru import logger import pandas as pd from pydantic import BaseModel - class FileType(Enum): EXCEL = "excel" CSV = "csv" From ad83f5709288c935c194be422e43b16bfd544b03 Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:44:28 +0200 Subject: [PATCH 4/7] Refactor: Remove unused methods and imports from SharepointClient class --- sources/sharepoint/helpers.py | 75 +---------------------------------- 1 file changed, 1 insertion(+), 74 deletions(-) diff --git a/sources/sharepoint/helpers.py b/sources/sharepoint/helpers.py index 3ccf19fbe..e88e42f25 100644 --- a/sources/sharepoint/helpers.py +++ b/sources/sharepoint/helpers.py @@ -1,4 +1,4 @@ -from typing import Dict, Union, List, Tuple +from typing import Dict, List from io import BytesIO import re @@ -170,76 +170,3 @@ def get_file_bytes_io(self, file_item: Dict): return bytes_io else: raise FileNotFoundError(f"File not found: {file_item['name']} or can't be downloaded") - - def archive_file(self, file_item: Dict, archive_folder_path: str, new_file_name: str) -> None: - url = f"{self.graph_site_url}/drive/items/{file_item['id']}" - archive_folder_path = self.remove_driver_root_in_path(archive_folder_path) - archive_folder_id = self.create_folder_if_not_exists(folder_path=archive_folder_path) - body = { - "parentReference": {"id": archive_folder_id}, - "name": new_file_name, - } - res = self.client.patch(url, json=body) - if res.status_code == 200: - logger.success( - f"File {file_item['name']} renamed to {new_file_name} in {archive_folder_path}" - ) - else: - raise RuntimeError(f"File {file_item['name']} can't be renamed to {new_file_name}") - - def safe_get_folder_id(self, folder_path: str) -> Union[str, None]: - folder_url = f"{self.graph_site_url}/drive/root:/{folder_path}" - res = self.client.get(folder_url) - if res.status_code == 200: - return res.json()["id"] - - def list_folder(self, folder_path: str) -> Tuple[List, List]: - """List sub folders and files in folder_path - - Args: - folder_path (str): folder_path from sharepoint - - Returns: - Tuple[List, List]: (List of folders, List of files) - """ - if r"/" not in folder_path: - raise ValueError(f"Invalid folder path: {folder_path}, must contain '/'") - folder_url = f"{self.graph_site_url}/drive/root:/{folder_path}:/children" - logger.info(f"Listing from folder_path: {folder_path} using {folder_url}") - res = self.client.get(folder_url) - file_and_folder_items = res.json().get("value", []) - file_items = [x for x in file_and_folder_items if "file" in x.keys()] - folder_items = [x for x in file_and_folder_items if "folder" in x.keys()] - return (folder_items, file_items) - - def create_folder(self, folder_path: str) -> str: - if r"/" not in folder_path: - raise ValueError(f"Invalid folder path: {folder_path}, must contain '/'") - parent_folder, folder_name = folder_path.rsplit("/", 1) - parent_folder_id = self.safe_get_folder_id(parent_folder) - if not parent_folder_id: - raise ValueError(f"Parent folder {parent_folder} not found") - logger.debug(f"Creating folder {folder_name} in {parent_folder}") - folder_url = f"{self.graph_site_url}/drive/items/{parent_folder_id}/children" - body = { - "name": folder_name, - "folder": {}, - "@microsoft.graph.conflictBehavior": "fail", - } - res = self.client.post(folder_url, json=body) - if res.status_code == 201: - logger.success(f"Folder {folder_name} created") - return res.json()["id"] - else: - raise RuntimeError(f"Folder {folder_name} can't be created") - - def create_folder_if_not_exists(self, folder_path: str) -> str: - folder_id = self.safe_get_folder_id(folder_path) - if folder_id: - logger.info(f"Folder {folder_path} already exists") - return folder_id - else: - return self.create_folder(folder_path) - - def remove_driver_root_in_path(self, path: str) -> str: - return re.sub(r"^/drive/root:/", "", path) From c94cb2f888af479f27812d683f6685a0d643fe5d Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:52:33 +0200 Subject: [PATCH 5/7] fix: Change log level from success to info for SharePoint connection message --- sources/sharepoint/helpers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sources/sharepoint/helpers.py b/sources/sharepoint/helpers.py index e88e42f25..852e71a19 100644 --- a/sources/sharepoint/helpers.py +++ b/sources/sharepoint/helpers.py @@ -56,7 +56,7 @@ def connect(self) -> None: auth=BearerTokenAuth(access_token), paginator=JSONLinkPaginator(next_url_path="@odata.nextLink"), ) - logger.success(f"Connected to SharePoint site id: {self.site_id} successfully") + logger.info(f"Connected to SharePoint site id: {self.site_id} successfully") else: raise ConnectionError("Connection failed : ", token_response) From 07249844824307bab82703e8bdef072c49009ddd Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 15:54:52 +0200 Subject: [PATCH 6/7] refactor: Improve code readability by formatting and organizing function definitions in SharePoint modules --- sources/sharepoint/__init__.py | 24 +++++++++++++++---- sources/sharepoint/helpers.py | 18 +++++++++----- sources/sharepoint/sharepoint_files_config.py | 1 + 3 files changed, 32 insertions(+), 11 deletions(-) diff --git a/sources/sharepoint/__init__.py b/sources/sharepoint/__init__.py index 3d5670b50..bb684add8 100644 --- a/sources/sharepoint/__init__.py +++ b/sources/sharepoint/__init__.py @@ -31,9 +31,16 @@ def sharepoint_list( def get_pipe(sharepoint_list_config: SharepointListConfig): def get_records(sharepoint_list_config: SharepointListConfig): - data = client.get_items_from_list(list_title=sharepoint_list_config.list_title, select=sharepoint_list_config.select) + data = client.get_items_from_list( + list_title=sharepoint_list_config.list_title, + select=sharepoint_list_config.select, + ) yield from data - return dlt.resource(get_records, name=sharepoint_list_config.table_name)(sharepoint_list_config) + + return dlt.resource(get_records, name=sharepoint_list_config.table_name)( + sharepoint_list_config + ) + yield get_pipe(sharepoint_list_config=sharepoint_list_config) @@ -65,12 +72,17 @@ def get_files( "filtering files based on lastModifiedDateTime, compare to last_value:" f" {current_last_value}" ) - if file_item["lastModifiedDateTime"] > current_last_value or not sharepoint_files_config.is_file_incremental: + if ( + file_item["lastModifiedDateTime"] > current_last_value + or not sharepoint_files_config.is_file_incremental + ): logger.info( f"Processing file after lastModifiedDateTime filter: {file_item['name']}" ) - file_item["pd_function"] = sharepoint_files_config.file_type.get_pd_function() + file_item["pd_function"] = ( + sharepoint_files_config.file_type.get_pd_function() + ) file_item["pd_kwargs"] = sharepoint_files_config.pandas_kwargs yield file_item else: @@ -93,7 +105,9 @@ def get_records(file_item: Dict) -> TDataItems: logger.debug(f"get_records done for {file_item['name']}") def get_pipe(sharepoint_files_config: SharepointFilesConfig): - return dlt.resource(get_files, name=f"{sharepoint_files_config.table_name}_files")(sharepoint_files_config) | dlt.transformer( + return dlt.resource( + get_files, name=f"{sharepoint_files_config.table_name}_files" + )(sharepoint_files_config) | dlt.transformer( get_records, name=sharepoint_files_config.table_name, parallelized=False ) diff --git a/sources/sharepoint/helpers.py b/sources/sharepoint/helpers.py index 852e71a19..e634bc374 100644 --- a/sources/sharepoint/helpers.py +++ b/sources/sharepoint/helpers.py @@ -88,7 +88,8 @@ def get_all_lists_in_site(self) -> List[Dict]: if "value" in lists_info: all_items = lists_info["value"] filtered_lists = [ - item for item in all_items + item + for item in all_items if item.get("list", {}).get("template") == "genericList" and "Lists" in item.get("webUrl", "") ] @@ -99,15 +100,16 @@ def get_all_lists_in_site(self) -> List[Dict]: logger.warning(f"No lists found in {url}") return filtered_lists - def get_items_from_list(self, list_title: str, select:str = None) -> List[Dict]: - #TODO, pagination not yet implemented + def get_items_from_list(self, list_title: str, select: str = None) -> List[Dict]: + # TODO, pagination not yet implemented logger.warning( "Pagination is not implemented for get_items_from_list, " "it will return only first page of items." ) all_lists = self.get_all_lists_in_site() filtered_lists = [ - x for x in all_lists + x + for x in all_lists if x.get("list", {}).get("template") == "genericList" and "Lists" in x.get("webUrl", "") ] @@ -139,7 +141,9 @@ def get_items_from_list(self, list_title: str, select:str = None) -> List[Dict]: logger.info(f"Got {len(output)} items from list: {list_title}") return output else: - logger.warning(f"No items found in list: {list_title}, with select: {select}") + logger.warning( + f"No items found in list: {list_title}, with select: {select}" + ) def get_files_from_path( self, folder_path: str, file_name_startswith: str, pattern: str = None @@ -169,4 +173,6 @@ def get_file_bytes_io(self, file_item: Dict): ) return bytes_io else: - raise FileNotFoundError(f"File not found: {file_item['name']} or can't be downloaded") + raise FileNotFoundError( + f"File not found: {file_item['name']} or can't be downloaded" + ) diff --git a/sources/sharepoint/sharepoint_files_config.py b/sources/sharepoint/sharepoint_files_config.py index 1daf69254..1a8e4d8a1 100644 --- a/sources/sharepoint/sharepoint_files_config.py +++ b/sources/sharepoint/sharepoint_files_config.py @@ -40,6 +40,7 @@ def __init__(self, **data): "Incremental loading for Sharepoint List is not implemented yet." ) + class SharepointFilesConfig(BaseModel): file_type: FileType folder_path: str From 34776d1b62ab49e0d710e31432033e524bf65a78 Mon Sep 17 00:00:00 2001 From: sd41847 Date: Thu, 23 Oct 2025 16:44:41 +0200 Subject: [PATCH 7/7] fix: Remove unused attributes from SharepointListConfig and SharepointFilesConfig classes --- sources/sharepoint/sharepoint_files_config.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sources/sharepoint/sharepoint_files_config.py b/sources/sharepoint/sharepoint_files_config.py index 1a8e4d8a1..e1bc5a821 100644 --- a/sources/sharepoint/sharepoint_files_config.py +++ b/sources/sharepoint/sharepoint_files_config.py @@ -30,7 +30,6 @@ class SharepointListConfig(BaseModel): table_name: str list_title: str select: Optional[str] = None - limit: Optional[int] = None is_incremental: Optional[bool] = False def __init__(self, **data): @@ -48,9 +47,6 @@ class SharepointFilesConfig(BaseModel): file_name_startswith: str pattern: Optional[str] = ".*" pandas_kwargs: Dict = {} - limit: Optional[int] = None - is_compressed_folder: Optional[bool] = False - if_apply_str_to_all_columns: Optional[bool] = True is_file_incremental: bool = False def __init__(self, **data):