From b309ccdf08a9129fb1718d4c86fd59fddc45ae31 Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:02:37 +0200 Subject: [PATCH 1/4] update: use scystream-sdk 1.4 --- main.py | 115 ++++++++++++---------------- requirements.txt | 2 +- test/test_explanation_entrypoint.py | 28 ++----- test/test_lda_entrypoint.py | 35 ++++----- test/test_lda_modeler.py | 14 ++-- 5 files changed, 78 insertions(+), 116 deletions(-) diff --git a/main.py b/main.py index f5fb3b9..928895c 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,15 @@ import logging -import hashlib -import pandas as pd from scystream.sdk.core import entrypoint +from scystream.sdk.database_handling.database_manager import ( + PandasDatabaseOperations, +) from scystream.sdk.env.settings import ( EnvSettings, InputSettings, OutputSettings, - PostgresSettings, + DatabaseSettings, ) -from sqlalchemy import create_engine, text -from sqlalchemy.sql import quoted_name from algorithms.lda import LDAModeler from algorithms.models import PreprocessedDocument @@ -20,35 +19,20 @@ logging.basicConfig( level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) -def _normalize_table_name(table_name: str) -> str: - max_length = 63 - if len(table_name) <= max_length: - return table_name - digest = hashlib.sha1(table_name.encode("utf-8")).hexdigest()[:10] - prefix_length = max_length - len(digest) - 1 - return f"{table_name[:prefix_length]}_{digest}" - - -def _resolve_db_table(settings: PostgresSettings) -> str: - normalized_name = _normalize_table_name(settings.DB_TABLE) - settings.DB_TABLE = normalized_name - return normalized_name - - -class PreprocessedDocuments(PostgresSettings, InputSettings): +class PreprocessedDocuments(DatabaseSettings, InputSettings): __identifier__ = "preprocessed_docs" -class DocTopicOutput(PostgresSettings, OutputSettings): +class DocTopicOutput(DatabaseSettings, OutputSettings): __identifier__ = "docs_to_topics" -class TopicTermsOutput(PostgresSettings, OutputSettings): +class TopicTermsOutput(DatabaseSettings, OutputSettings): __identifier__ = "top_terms_per_topic" @@ -64,21 +48,22 @@ class LDATopicModeling(EnvSettings): topic_term: TopicTermsOutput -class TopicTermsInput(PostgresSettings, InputSettings): +class TopicTermsInput(DatabaseSettings, InputSettings): __identifier__ = "topic_terms_input" -class QueryInformationInput(PostgresSettings, InputSettings): +class QueryInformationInput(DatabaseSettings, InputSettings): """ Our TopicExplaination needs some kind of information about the actual query executed,this query information includes the query and the source Looking like: query, source, created_at """ + __identifier__ = "query_information_input" -class ExplanationsOutput(PostgresSettings, OutputSettings): +class ExplanationsOutput(DatabaseSettings, OutputSettings): __identifier__ = "explanations_output" @@ -92,33 +77,6 @@ class TopicExplanation(EnvSettings): explanations_output: ExplanationsOutput -def _make_engine(settings: PostgresSettings): - return create_engine( - f"postgresql+psycopg2://{settings.PG_USER}:{settings.PG_PASS}" - f"@{settings.PG_HOST}:{int(settings.PG_PORT)}/" - ) - - -def write_df_to_postgres(df, settings: PostgresSettings): - resolved_table_name = _resolve_db_table(settings) - logger.info(f"Writing DataFrame to DB table '{resolved_table_name}'…") - engine = _make_engine(settings) - table_name = quoted_name(resolved_table_name, quote=True) - df.to_sql(table_name, engine, if_exists="replace", index=False) - logger.info( - "Successfully wrote %s rows to '%s'.", - len(df), - resolved_table_name, - ) - - -def read_table_from_postgres(settings: PostgresSettings) -> pd.DataFrame: - resolved_table_name = _resolve_db_table(settings) - engine = _make_engine(settings) - query = text(f'SELECT * FROM "{resolved_table_name}";') - return pd.read_sql(query, engine) - - def parse_pg_array(val): if isinstance(val, str): return val.strip("{}").split(",") @@ -130,12 +88,16 @@ def lda_topic_modeling(settings): logger.info("Starting LDA topic modeling pipeline…") logger.info("Querying normalized docs from db...") - normalized_docs = read_table_from_postgres(settings.preprocessed_docs) + preprocessed_docs_db = PandasDatabaseOperations( + settings.preprocessed_docs.DB_DSN + ) + normalized_docs = preprocessed_docs_db.read( + table=settings.preprocessed_docs.DB_TABLE + ) preprocessed_docs = [ PreprocessedDocument( - doc_id=row["doc_id"], - tokens=parse_pg_array(row["tokens"]) + doc_id=row["doc_id"], tokens=parse_pg_array(row["tokens"]) ) for _, row in normalized_docs.iterrows() ] @@ -153,7 +115,7 @@ def lda_topic_modeling(settings): max_iter=settings.MAX_ITER, learning_method=settings.LEARNING_METHOD, random_state=42, - n_top_words=settings.N_TOP_WORDS + n_top_words=settings.N_TOP_WORDS, ) lda.fit() @@ -161,8 +123,16 @@ def lda_topic_modeling(settings): topic_terms = lda.extract_topic_terms() # TODO: Use Spark Integration here - write_df_to_postgres(doc_topics, settings.doc_topic) - write_df_to_postgres(topic_terms, settings.topic_term) + logging.info("Writing dataframes to db...") + doc_topic_db = PandasDatabaseOperations(settings.doc_topic.DB_DSN) + topic_terms_db = PandasDatabaseOperations(settings.topic_term.DB_DSN) + + doc_topic_db.write( + table=settings.doc_topic.DB_TABLE, data=doc_topics, mode="overwrite" + ) + topic_terms_db.write( + table=settings.topic_term.DB_TABLE, data=topic_terms, mode="overwrite" + ) @entrypoint(TopicExplanation) @@ -170,25 +140,36 @@ def topic_explanation(settings): logger.info("Starting topic explaination...") logging.info("Querying topic terms from db...") - topic_terms = read_table_from_postgres(settings.topic_terms) + topic_terms_db = PandasDatabaseOperations(settings.topic_terms.DB_DSN) + topic_terms = topic_terms_db.read(table=settings.topic_terms.DB_TABLE) logging.info("Querying query information from db...") - query_information = read_table_from_postgres(settings.query_information) + query_info_db = PandasDatabaseOperations(settings.query_information.DB_DSN) + query_information = query_info_db.read( + table=settings.query_information.DB_TABLE + ) metadata = query_information.iloc[0] + print(metadata) explainer = TopicExplainer( - model_name=settings.MODEL_NAME, - api_key=settings.OLLAMA_API_KEY + model_name=settings.MODEL_NAME, api_key=settings.OLLAMA_API_KEY ) - explainations = explainer.explain_topics( + explanations = explainer.explain_topics( topic_terms=topic_terms, search_query=metadata["query"], source=metadata["source"], - created_at=metadata["created_at"] + created_at=metadata["created_at"], ) - write_df_to_postgres(explainations, settings.explanations_output) + explainations_output_db = PandasDatabaseOperations( + settings.explanations_output.DB_DSN + ) + explainations_output_db.write( + table=settings.explanations_output.DB_TABLE, + data=explanations, + mode="overwrite", + ) logging.info("Topic explanation block finished.") diff --git a/requirements.txt b/requirements.txt index 8137824..a3a186c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -scystream-sdk==1.2.2 +scystream-sdk[database,postgres]==1.4.0 scikit-learn==1.7.2 pandas==2.3.2 numpy==2.3.3 diff --git a/test/test_explanation_entrypoint.py b/test/test_explanation_entrypoint.py index 930e7b5..4a4bd7f 100644 --- a/test/test_explanation_entrypoint.py +++ b/test/test_explanation_entrypoint.py @@ -19,7 +19,7 @@ def postgres_conn(): port=5432, user="postgres", password="postgres", - database="postgres" + database="postgres", ) conn.autocommit = True yield conn @@ -84,26 +84,14 @@ def test_topic_explanation_with_real_ollama(postgres_conn): # ------------------------------------------------------------------ env = { - "topic_terms_input_PG_HOST": "127.0.0.1", - "topic_terms_input_PG_PORT": "5432", - "topic_terms_input_PG_USER": "postgres", - "topic_terms_input_PG_PASS": "postgres", + "topic_terms_input_DB_DSN": "postgresql://postgres:postgres@127.0.0.1:5432/postgres", "topic_terms_input_DB_TABLE": topic_terms_table, - - "query_information_input_PG_HOST": "127.0.0.1", - "query_information_input_PG_PORT": "5432", - "query_information_input_PG_USER": "postgres", - "query_information_input_PG_PASS": "postgres", + "query_information_input_DB_DSN": "postgresql://postgres:postgres@127.0.0.1:5432/postgres", "query_information_input_DB_TABLE": query_information_table, - - "explanations_output_PG_HOST": "127.0.0.1", - "explanations_output_PG_PORT": "5432", - "explanations_output_PG_USER": "postgres", - "explanations_output_PG_PASS": "postgres", + "explanations_output_DB_DSN": "postgresql://postgres:postgres@127.0.0.1:5432/postgres", "explanations_output_DB_TABLE": explanations_output_table, - "MODEL_NAME": "gpt-oss:120b", - "API_KEY": os.environ.get("OLLAMA_API_KEY") + "API_KEY": os.environ.get("OLLAMA_API_KEY"), } for k, v in env.items(): @@ -120,10 +108,10 @@ def test_topic_explanation_with_real_ollama(postgres_conn): # ------------------------------------------------------------------ cur.execute( - f"SELECT * FROM public.{explanations_output_table} ORDER BY topic_id;") + f"SELECT * FROM public.{explanations_output_table} ORDER BY topic_id;" + ) results = pd.DataFrame( - cur.fetchall(), - columns=[desc[0] for desc in cur.description] + cur.fetchall(), columns=[desc[0] for desc in cur.description] ) assert len(results) == 5 diff --git a/test/test_lda_entrypoint.py b/test/test_lda_entrypoint.py index 7087b70..bb18d4b 100644 --- a/test/test_lda_entrypoint.py +++ b/test/test_lda_entrypoint.py @@ -27,7 +27,7 @@ def postgres_conn(): port=5432, user=POSTGRES_USER, password=POSTGRES_PWD, - database="postgres" + database="postgres", ) conn.autocommit = True yield conn @@ -52,24 +52,11 @@ def test_lda_entrypoint(postgres_conn): env = { "N_TOPICS": "5", - - "preprocessed_docs_PG_HOST": "127.0.0.1", - "preprocessed_docs_PG_PORT": "5432", - "preprocessed_docs_PG_USER": POSTGRES_USER, - "preprocessed_docs_PG_PASS": POSTGRES_PWD, + "preprocessed_docs_DB_DSN": f"postgresql://{POSTGRES_USER}:{POSTGRES_PWD}@127.0.0.1:5432/postgres", "preprocessed_docs_DB_TABLE": "norm_docs", - - - "docs_to_topics_PG_HOST": "127.0.0.1", - "docs_to_topics_PG_PORT": "5432", - "docs_to_topics_PG_USER": POSTGRES_USER, - "docs_to_topics_PG_PASS": POSTGRES_PWD, + "docs_to_topics_DB_DSN": f"postgresql://{POSTGRES_USER}:{POSTGRES_PWD}@127.0.0.1:5432/postgres", "docs_to_topics_DB_TABLE": doc_topic_table_name, - - "top_terms_per_topic_PG_HOST": "127.0.0.1", - "top_terms_per_topic_PG_PORT": "5432", - "top_terms_per_topic_PG_USER": POSTGRES_USER, - "top_terms_per_topic_PG_PASS": POSTGRES_PWD, + "top_terms_per_topic_DB_DSN": f"postgresql://{POSTGRES_USER}:{POSTGRES_PWD}@127.0.0.1:5432/postgres", "top_terms_per_topic_DB_TABLE": topic_terms_table_name, } @@ -82,8 +69,9 @@ def test_lda_entrypoint(postgres_conn): # 1. doc-topic distribution cur.execute(f"SELECT * FROM public.{doc_topic_table_name} ORDER BY 1;") - doc_topics = pd.DataFrame(cur.fetchall(), columns=[ - desc[0] for desc in cur.description]) + doc_topics = pd.DataFrame( + cur.fetchall(), columns=[desc[0] for desc in cur.description] + ) assert len(doc_topics) == 2 # expect N_TOPICS + 1 for doc_id assert doc_topics.shape[1] == N_TOPICS + 1 @@ -91,8 +79,11 @@ def test_lda_entrypoint(postgres_conn): # 2. topic-term listing cur.execute( f"SELECT * FROM public.{ - topic_terms_table_name} ORDER BY topic_id, weight DESC;") - topic_terms = pd.DataFrame(cur.fetchall(), columns=[ - desc[0] for desc in cur.description]) + topic_terms_table_name + } ORDER BY topic_id, weight DESC;" + ) + topic_terms = pd.DataFrame( + cur.fetchall(), columns=[desc[0] for desc in cur.description] + ) assert len(topic_terms) > 0 assert "term" in topic_terms.columns diff --git a/test/test_lda_modeler.py b/test/test_lda_modeler.py index 77b71f2..1f35c13 100644 --- a/test/test_lda_modeler.py +++ b/test/test_lda_modeler.py @@ -14,11 +14,13 @@ def small_vocab(): @pytest.fixture def small_dtm(): # 2 documents, 4 terms - return np.array([ - [0, 0, 2, 0], - [1, 1, 0, 3], - [0, 1, 1, 1], - ]) + return np.array( + [ + [0, 0, 2, 0], + [1, 1, 0, 3], + [0, 1, 1, 1], + ] + ) def test_lda_fit(small_dtm, small_vocab): @@ -54,7 +56,7 @@ def test_extract_topic_terms(small_dtm, small_vocab): vocab=small_vocab, doc_ids=["1", "2", "3"], n_topics=2, - n_top_words=2 + n_top_words=2, ) lda.fit() df = lda.extract_topic_terms() From f0da41b99dab17eef37a8d0da2b1e71496a6289f Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:06:50 +0200 Subject: [PATCH 2/4] chore: update cbc.yaml --- cbc.yaml | 40 +++++++++++----------------------------- 1 file changed, 11 insertions(+), 29 deletions(-) diff --git a/cbc.yaml b/cbc.yaml index 7a61d14..28ff7d0 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -1,9 +1,9 @@ -author: Paul Kalhorn +author: Paul Kalhorn description: Compute Block that offers topic modeling algorithms docker_image: ghcr.io/rwth-time/topic-modeling/topic-modeling entrypoints: lda_topic_modeling: - description: Sklearn LDA Topic Modeling + description: Sklearn LDA Topic Modeling envs: LEARNING_METHOD: batch MAX_ITER: 10 @@ -12,64 +12,46 @@ entrypoints: inputs: preprocessed_docs: config: + preprocessed_docs_DB_DSN: null preprocessed_docs_DB_TABLE: null - preprocessed_docs_PG_HOST: null - preprocessed_docs_PG_PASS: null - preprocessed_docs_PG_PORT: null - preprocessed_docs_PG_USER: null - description: A database table, expected to have the doc_id, and tokens (list of strings) + description: A database table, expected to have the doc_id, and tokens (list of strings) type: pg_table outputs: doc_topic: config: + docs_to_topics_DB_DSN: null docs_to_topics_DB_TABLE: null - docs_to_topics_PG_HOST: null - docs_to_topics_PG_PASS: null - docs_to_topics_PG_PORT: null - docs_to_topics_PG_USER: null description: A table that maps documents to their topic-likelihoods type: pg_table topic_term: config: + top_terms_per_topic_DB_DSN: null top_terms_per_topic_DB_TABLE: null - top_terms_per_topic_PG_HOST: null - top_terms_per_topic_PG_PASS: null - top_terms_per_topic_PG_PORT: null - top_terms_per_topic_PG_USER: null description: A table that lists most likely terms for a topic type: pg_table topic_explanation: - description: Explains the topics using the query and top terms + description: Explains the topics using the query and top terms envs: MODEL_NAME: gpt-oss:120b OLLAMA_API_KEY: '' inputs: query_information: config: + query_information_input_DB_DSN: null query_information_input_DB_TABLE: null - query_information_input_PG_HOST: null - query_information_input_PG_PASS: null - query_information_input_PG_PORT: null - query_information_input_PG_USER: null - description: Information of the query used, must contain query, source, created_at + description: Information of the query used, must contain query, source, created_at type: pg_table topic_terms: config: + topic_terms_input_DB_DSN: null topic_terms_input_DB_TABLE: null - topic_terms_input_PG_HOST: null - topic_terms_input_PG_PASS: null - topic_terms_input_PG_PORT: null - topic_terms_input_PG_USER: null description: A table that lists most likely terms for a topic type: pg_table outputs: explanations_output: config: + explanations_output_DB_DSN: null explanations_output_DB_TABLE: null - explanations_output_PG_HOST: null - explanations_output_PG_PASS: null - explanations_output_PG_PORT: null - explanations_output_PG_USER: null description: Output of the generated explanations, contains topic_id and description type: pg_table name: Topic-Modeling From cea0e3c70379241520a302b75e674a267a51636b Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:12:52 +0200 Subject: [PATCH 3/4] chore: ignore tests for linting --- .github/workflows/ci.yaml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index ac2ff03..66e9e03 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -23,6 +23,8 @@ jobs: - name: Run flake8 uses: py-actions/flake8@v2 + with: + args: --exclude=test validate-compute-block: name: Validate Compute Block Config @@ -94,7 +96,7 @@ jobs: env: OLLAMA_API_KEY: ${{ secrets.OLLAMA_API_KEY }} run: pytest -vv - + build: name: Build docker image runs-on: ubuntu-latest @@ -121,7 +123,7 @@ jobs: tags: | type=ref, event=pr type=raw, value=latest, enable=${{ (github.ref == format('refs/heads/{0}', 'main')) }} - + - name: Build and push Docker image uses: docker/build-push-action@v5 with: From c590576ca2cbdb09df6dfe08ac5a93555f47330c Mon Sep 17 00:00:00 2001 From: Paul Kalhorn Date: Thu, 9 Apr 2026 16:15:38 +0200 Subject: [PATCH 4/4] fix: use correct type in cbc --- cbc.yaml | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cbc.yaml b/cbc.yaml index 28ff7d0..d59f1ab 100644 --- a/cbc.yaml +++ b/cbc.yaml @@ -15,20 +15,20 @@ entrypoints: preprocessed_docs_DB_DSN: null preprocessed_docs_DB_TABLE: null description: A database table, expected to have the doc_id, and tokens (list of strings) - type: pg_table + type: database_table outputs: doc_topic: config: docs_to_topics_DB_DSN: null docs_to_topics_DB_TABLE: null description: A table that maps documents to their topic-likelihoods - type: pg_table + type: database_table topic_term: config: top_terms_per_topic_DB_DSN: null top_terms_per_topic_DB_TABLE: null description: A table that lists most likely terms for a topic - type: pg_table + type: database_table topic_explanation: description: Explains the topics using the query and top terms envs: @@ -40,18 +40,18 @@ entrypoints: query_information_input_DB_DSN: null query_information_input_DB_TABLE: null description: Information of the query used, must contain query, source, created_at - type: pg_table + type: database_table topic_terms: config: topic_terms_input_DB_DSN: null topic_terms_input_DB_TABLE: null description: A table that lists most likely terms for a topic - type: pg_table + type: database_table outputs: explanations_output: config: explanations_output_DB_DSN: null explanations_output_DB_TABLE: null description: Output of the generated explanations, contains topic_id and description - type: pg_table + type: database_table name: Topic-Modeling