From 3fed218d6d2a69b90d21215a08cfeb1a059c8603 Mon Sep 17 00:00:00 2001 From: swong3 Date: Fri, 14 Nov 2025 18:51:39 +0000 Subject: [PATCH 1/5] Added gowalla config --- .../gowalla_data_preprocessor_config.py | 307 ++++++++++++++++++ 1 file changed, 307 insertions(+) create mode 100644 examples/id_embeddings/gowalla_data_preprocessor_config.py diff --git a/examples/id_embeddings/gowalla_data_preprocessor_config.py b/examples/id_embeddings/gowalla_data_preprocessor_config.py new file mode 100644 index 000000000..21af4ac14 --- /dev/null +++ b/examples/id_embeddings/gowalla_data_preprocessor_config.py @@ -0,0 +1,307 @@ +""" +Data Preprocessor Configuration for the Gowalla bipartite graph dataset. + +This configuration defines how to preprocess the Gowalla user-item interaction +graph for use with GiGL. The graph consists of: +- User nodes (one node type) +- Item nodes (another node type) +- User-to-item edges (interactions) + +The configuration handles creating node tables with degree features from the +raw edge data and defines the preprocessing specs for both nodes and edges. +""" + +from typing import Any, Final + +from google.cloud.bigquery.job import WriteDisposition + +from gigl.common.logger import Logger +from gigl.env.pipelines_config import get_resource_config +from gigl.src.common.types import AppliedTaskIdentifier +from gigl.src.common.types.graph_data import EdgeType, EdgeUsageType, NodeType, Relation +from gigl.src.common.types.pb_wrappers.gigl_resource_config import ( + GiglResourceConfigWrapper, +) +from gigl.src.common.utils.bq import BqUtils +from gigl.src.data_preprocessor.lib.data_preprocessor_config import ( + DataPreprocessorConfig, + build_ingestion_feature_spec_fn, + build_passthrough_transform_preprocessing_fn, +) +from gigl.src.data_preprocessor.lib.ingest.bigquery import ( + BigqueryEdgeDataReference, + BigqueryNodeDataReference, +) +from gigl.src.data_preprocessor.lib.ingest.reference import ( + EdgeDataReference, + NodeDataReference, +) +from gigl.src.data_preprocessor.lib.types import ( + EdgeDataPreprocessingSpec, + EdgeOutputIdentifier, + NodeDataPreprocessingSpec, + NodeOutputIdentifier, +) + +logger = Logger() + +# Node type names +USER_NODE_TYPE_NAME: Final[str] = "user" +ITEM_NODE_TYPE_NAME: Final[str] = "item" + +# Column names +NODE_ID_COLUMN: Final[str] = "node_id" +DEGREE_COLUMN: Final[str] = "degree" +SRC_COLUMN: Final[str] = "from_user_id" +DST_COLUMN: Final[str] = "to_item_id" + + +class GowallaDataPreprocessorConfig(DataPreprocessorConfig): + """ + Data preprocessor configuration for the Gowalla bipartite graph dataset. + + This config handles: + 1. Creating node tables (users and items) with degree features from edge data + 2. Defining preprocessing specs for user and item nodes + 3. Defining preprocessing specs for user-item edges + + Args: + edge_table (str): Full BigQuery path to the edge table (e.g., "project.dataset.train_edges") + **kwargs: Additional configuration arguments (currently unused but maintained for extensibility) + """ + + def __init__(self, edge_table: str, **kwargs: Any) -> None: + super().__init__() + + # Store the edge table path + self._edge_table = edge_table + logger.info(f"Initializing Gowalla config with edge table: {self._edge_table}") + + # Define node types + self._user_node_type = NodeType(USER_NODE_TYPE_NAME) + self._item_node_type = NodeType(ITEM_NODE_TYPE_NAME) + + # Define edge type (user -> item) + self._user_item_edge_type = EdgeType( + src_node_type=self._user_node_type, + dst_node_type=self._item_node_type, + relation=Relation("to"), + ) + + # Get resource config + self._resource_config: GiglResourceConfigWrapper = get_resource_config() + + # Node tables will be created in prepare_for_pipeline + self._user_node_table: str = "" + self._item_node_table: str = "" + + def prepare_for_pipeline( + self, applied_task_identifier: AppliedTaskIdentifier + ) -> None: + """ + Prepare node tables before running the preprocessing pipeline. + + This method creates user and item node tables from the edge table, + with node degree as a simple feature. + + Args: + applied_task_identifier (AppliedTaskIdentifier): Unique identifier for this pipeline run + """ + logger.info("Preparing node tables for Gowalla dataset...") + + bq_utils = BqUtils(project=self._resource_config.project) + + # Define table paths + table_prefix = ( + f"{self._resource_config.project}." + f"{self._resource_config.temp_assets_bq_dataset_name}." + f"{applied_task_identifier}" + ) + + self._user_node_table = f"{table_prefix}_user_nodes" + self._item_node_table = f"{table_prefix}_item_nodes" + + # Create user node table with degree feature + logger.info(f"Creating user node table: {self._user_node_table}") + user_node_query = f""" + SELECT + {SRC_COLUMN} AS {NODE_ID_COLUMN}, + COUNT(*) AS {DEGREE_COLUMN} + FROM + `{self._edge_table}` + GROUP BY + {SRC_COLUMN} + """ + + bq_utils.run_query( + query=user_node_query, + labels={}, + destination=self._user_node_table, + write_disposition=WriteDisposition.WRITE_TRUNCATE, + ) + logger.info(f"Created user node table with degree features") + + # Create item node table with degree feature + logger.info(f"Creating item node table: {self._item_node_table}") + item_node_query = f""" + SELECT + {DST_COLUMN} AS {NODE_ID_COLUMN}, + COUNT(*) AS {DEGREE_COLUMN} + FROM + `{self._edge_table}` + GROUP BY + {DST_COLUMN} + """ + + bq_utils.run_query( + query=item_node_query, + labels={}, + destination=self._item_node_table, + write_disposition=WriteDisposition.WRITE_TRUNCATE, + ) + logger.info(f"Created item node table with degree features") + + # Log statistics + user_count = bq_utils.count_number_of_rows_in_bq_table( + bq_table=self._user_node_table, labels={} + ) + item_count = bq_utils.count_number_of_rows_in_bq_table( + bq_table=self._item_node_table, labels={} + ) + logger.info(f"Node tables created: {user_count} users, {item_count} items") + + def get_nodes_preprocessing_spec( + self, + ) -> dict[NodeDataReference, NodeDataPreprocessingSpec]: + """ + Define preprocessing specifications for user and item nodes. + + Returns: + dict[NodeDataReference, NodeDataPreprocessingSpec]: Mapping of node data + references to their preprocessing specs. + """ + node_data_ref_to_preprocessing_specs: dict[ + NodeDataReference, NodeDataPreprocessingSpec + ] = {} + + # User node preprocessing spec + logger.info("Defining user node preprocessing spec...") + user_node_data_ref = BigqueryNodeDataReference( + reference_uri=self._user_node_table, + node_type=self._user_node_type, + ) + + user_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[NODE_ID_COLUMN, DEGREE_COLUMN], + ) + + user_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + user_node_output_id = NodeOutputIdentifier(NODE_ID_COLUMN) + + node_data_ref_to_preprocessing_specs[ + user_node_data_ref + ] = NodeDataPreprocessingSpec( + identifier_output=user_node_output_id, + features_outputs=[DEGREE_COLUMN], + labels_outputs=[], # No labels for unsupervised tasks + feature_spec_fn=user_feature_spec_fn, + preprocessing_fn=user_preprocessing_fn, + ) + + # Item node preprocessing spec + logger.info("Defining item node preprocessing spec...") + item_node_data_ref = BigqueryNodeDataReference( + reference_uri=self._item_node_table, + node_type=self._item_node_type, + ) + + item_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[NODE_ID_COLUMN, DEGREE_COLUMN], + ) + + item_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + item_node_output_id = NodeOutputIdentifier(NODE_ID_COLUMN) + + node_data_ref_to_preprocessing_specs[ + item_node_data_ref + ] = NodeDataPreprocessingSpec( + identifier_output=item_node_output_id, + features_outputs=[DEGREE_COLUMN], + labels_outputs=[], # No labels for unsupervised tasks + feature_spec_fn=item_feature_spec_fn, + preprocessing_fn=item_preprocessing_fn, + ) + + logger.info("Node preprocessing specs defined for users and items") + return node_data_ref_to_preprocessing_specs + + def get_edges_preprocessing_spec( + self, + ) -> dict[EdgeDataReference, EdgeDataPreprocessingSpec]: + """ + Define preprocessing specifications for user-item edges. + + Returns: + dict[EdgeDataReference, EdgeDataPreprocessingSpec]: Mapping of edge data + references to their preprocessing specs. + """ + edge_data_ref_to_preprocessing_specs: dict[ + EdgeDataReference, EdgeDataPreprocessingSpec + ] = {} + + logger.info("Defining edge preprocessing spec...") + + # Main edge data reference (user -> item interactions) + main_edge_data_ref = BigqueryEdgeDataReference( + reference_uri=self._edge_table, + edge_type=self._user_item_edge_type, + edge_usage_type=EdgeUsageType.MAIN, + ) + + # Edge output identifier + edge_output_id = EdgeOutputIdentifier( + src_node=NodeOutputIdentifier(SRC_COLUMN), + dst_node=NodeOutputIdentifier(DST_COLUMN), + ) + + # Feature spec (just src and dst, no edge features) + edge_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[SRC_COLUMN, DST_COLUMN], + ) + + # Passthrough preprocessing (no transformations needed) + edge_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + edge_data_ref_to_preprocessing_specs[ + main_edge_data_ref + ] = EdgeDataPreprocessingSpec( + identifier_output=edge_output_id, + features_outputs=[], # No edge features in raw data + labels_outputs=[], # No edge labels + feature_spec_fn=edge_feature_spec_fn, + preprocessing_fn=edge_preprocessing_fn, + ) + + logger.info("Edge preprocessing spec defined") + return edge_data_ref_to_preprocessing_specs + + +def get_gowalla_preprocessor_config(edge_table: str, **kwargs: Any) -> GowallaDataPreprocessorConfig: + """ + Factory function to create a GowallaDataPreprocessorConfig instance. + + Args: + edge_table (str): Full BigQuery path to the edge table + **kwargs: Additional configuration arguments + + Returns: + GowallaDataPreprocessorConfig: Configured preprocessor instance + + Example: + >>> config = get_gowalla_preprocessor_config( + ... edge_table="snap-umap-dev.gigl_temp_assets.swong3_gowalla_user_to_item_edge_table" + ... ) + """ + return GowallaDataPreprocessorConfig(edge_table=edge_table, **kwargs) From 68189de4a663b0a7c448a9ea4e560ae6e4b53271 Mon Sep 17 00:00:00 2001 From: swong3 Date: Fri, 14 Nov 2025 18:59:16 +0000 Subject: [PATCH 2/5] remove string --- examples/id_embeddings/gowalla_data_preprocessor_config.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/examples/id_embeddings/gowalla_data_preprocessor_config.py b/examples/id_embeddings/gowalla_data_preprocessor_config.py index 21af4ac14..f8630bf73 100644 --- a/examples/id_embeddings/gowalla_data_preprocessor_config.py +++ b/examples/id_embeddings/gowalla_data_preprocessor_config.py @@ -299,9 +299,5 @@ def get_gowalla_preprocessor_config(edge_table: str, **kwargs: Any) -> GowallaDa Returns: GowallaDataPreprocessorConfig: Configured preprocessor instance - Example: - >>> config = get_gowalla_preprocessor_config( - ... edge_table="snap-umap-dev.gigl_temp_assets.swong3_gowalla_user_to_item_edge_table" - ... ) """ return GowallaDataPreprocessorConfig(edge_table=edge_table, **kwargs) From 386b53508089fe09465ee1bd9c7f1209f781e574 Mon Sep 17 00:00:00 2001 From: swong3 Date: Thu, 11 Dec 2025 17:39:55 +0000 Subject: [PATCH 3/5] Updated data_prepro file --- .../gowalla_data_preprocessor_config.py | 239 ++++++++++++++---- 1 file changed, 196 insertions(+), 43 deletions(-) diff --git a/examples/id_embeddings/gowalla_data_preprocessor_config.py b/examples/id_embeddings/gowalla_data_preprocessor_config.py index f8630bf73..f36cd9f91 100644 --- a/examples/id_embeddings/gowalla_data_preprocessor_config.py +++ b/examples/id_embeddings/gowalla_data_preprocessor_config.py @@ -61,31 +61,52 @@ class GowallaDataPreprocessorConfig(DataPreprocessorConfig): Data preprocessor configuration for the Gowalla bipartite graph dataset. This config handles: - 1. Creating node tables (users and items) with degree features from edge data + 1. Creating node tables (users and items) with degree features from both train and test edges 2. Defining preprocessing specs for user and item nodes - 3. Defining preprocessing specs for user-item edges + 3. Defining preprocessing specs for train and test edges (as separate edge types) Args: - edge_table (str): Full BigQuery path to the edge table (e.g., "project.dataset.train_edges") + train_edge_table (str): Full BigQuery path to the training edge table + test_edge_table (str): Full BigQuery path to the test edge table **kwargs: Additional configuration arguments (currently unused but maintained for extensibility) """ - def __init__(self, edge_table: str, **kwargs: Any) -> None: + def __init__(self, train_edge_table: str, test_edge_table: str, **kwargs: Any) -> None: super().__init__() - # Store the edge table path - self._edge_table = edge_table - logger.info(f"Initializing Gowalla config with edge table: {self._edge_table}") + # Store the edge table paths + self._train_edge_table = train_edge_table + self._test_edge_table = test_edge_table + logger.info(f"Initializing Gowalla config with train edge table: {self._train_edge_table}") + logger.info(f"Initializing Gowalla config with test edge table: {self._test_edge_table}") # Define node types self._user_node_type = NodeType(USER_NODE_TYPE_NAME) self._item_node_type = NodeType(ITEM_NODE_TYPE_NAME) - # Define edge type (user -> item) - self._user_item_edge_type = EdgeType( + # Define edge types (bidirectional: user->item and item->user for both train and test) + # Forward edges: user -> item + self._user_to_train_item_edge_type = EdgeType( src_node_type=self._user_node_type, dst_node_type=self._item_node_type, - relation=Relation("to"), + relation=Relation("to_train"), + ) + self._user_to_test_item_edge_type = EdgeType( + src_node_type=self._user_node_type, + dst_node_type=self._item_node_type, + relation=Relation("to_test"), + ) + + # Reverse edges: item -> user + self._item_to_train_user_edge_type = EdgeType( + src_node_type=self._item_node_type, + dst_node_type=self._user_node_type, + relation=Relation("to_train"), + ) + self._item_to_test_user_edge_type = EdgeType( + src_node_type=self._item_node_type, + dst_node_type=self._user_node_type, + relation=Relation("to_test"), ) # Get resource config @@ -101,13 +122,13 @@ def prepare_for_pipeline( """ Prepare node tables before running the preprocessing pipeline. - This method creates user and item node tables from the edge table, - with node degree as a simple feature. + This method creates user and item node tables from BOTH train and test edge tables, + with node degree as a simple feature (aggregated across both train and test). Args: applied_task_identifier (AppliedTaskIdentifier): Unique identifier for this pipeline run """ - logger.info("Preparing node tables for Gowalla dataset...") + logger.info("Preparing node tables for Gowalla dataset (combining train and test edges)...") bq_utils = BqUtils(project=self._resource_config.project) @@ -118,19 +139,36 @@ def prepare_for_pipeline( f"{applied_task_identifier}" ) + logger.info(f"Table prefix: {table_prefix}") + self._user_node_table = f"{table_prefix}_user_nodes" self._item_node_table = f"{table_prefix}_item_nodes" - # Create user node table with degree feature + # Create user node table with degree feature (combining train and test edges) logger.info(f"Creating user node table: {self._user_node_table}") user_node_query = f""" + SELECT + {NODE_ID_COLUMN}, + SUM({DEGREE_COLUMN}) AS {DEGREE_COLUMN} + FROM ( + SELECT + {SRC_COLUMN} AS {NODE_ID_COLUMN}, + COUNT(*) AS {DEGREE_COLUMN} + FROM + `{self._train_edge_table}` + GROUP BY + {SRC_COLUMN} + UNION ALL SELECT {SRC_COLUMN} AS {NODE_ID_COLUMN}, COUNT(*) AS {DEGREE_COLUMN} FROM - `{self._edge_table}` + `{self._test_edge_table}` + GROUP BY + {SRC_COLUMN} + ) GROUP BY - {SRC_COLUMN} + {NODE_ID_COLUMN} """ bq_utils.run_query( @@ -139,18 +177,33 @@ def prepare_for_pipeline( destination=self._user_node_table, write_disposition=WriteDisposition.WRITE_TRUNCATE, ) - logger.info(f"Created user node table with degree features") + logger.info(f"Created user node table with degree features from train and test edges") - # Create item node table with degree feature + # Create item node table with degree feature (combining train and test edges) logger.info(f"Creating item node table: {self._item_node_table}") item_node_query = f""" + SELECT + {NODE_ID_COLUMN}, + SUM({DEGREE_COLUMN}) AS {DEGREE_COLUMN} + FROM ( + SELECT + {DST_COLUMN} AS {NODE_ID_COLUMN}, + COUNT(*) AS {DEGREE_COLUMN} + FROM + `{self._train_edge_table}` + GROUP BY + {DST_COLUMN} + UNION ALL SELECT {DST_COLUMN} AS {NODE_ID_COLUMN}, COUNT(*) AS {DEGREE_COLUMN} FROM - `{self._edge_table}` + `{self._test_edge_table}` + GROUP BY + {DST_COLUMN} + ) GROUP BY - {DST_COLUMN} + {NODE_ID_COLUMN} """ bq_utils.run_query( @@ -159,7 +212,7 @@ def prepare_for_pipeline( destination=self._item_node_table, write_disposition=WriteDisposition.WRITE_TRUNCATE, ) - logger.info(f"Created item node table with degree features") + logger.info(f"Created item node table with degree features from train and test edges") # Log statistics user_count = bq_utils.count_number_of_rows_in_bq_table( @@ -241,7 +294,13 @@ def get_edges_preprocessing_spec( self, ) -> dict[EdgeDataReference, EdgeDataPreprocessingSpec]: """ - Define preprocessing specifications for user-item edges. + Define preprocessing specifications for bidirectional train and test edges. + + Returns four separate edge specs: + 1. Training edges: user -> to_train -> item + 2. Test edges: user -> to_test -> item + 3. Reverse training edges: item -> to_train -> user + 4. Reverse test edges: item -> to_test -> user Returns: dict[EdgeDataReference, EdgeDataPreprocessingSpec]: Mapping of edge data @@ -251,53 +310,147 @@ def get_edges_preprocessing_spec( EdgeDataReference, EdgeDataPreprocessingSpec ] = {} - logger.info("Defining edge preprocessing spec...") + logger.info("Defining edge preprocessing specs for bidirectional train and test edges...") + + # ========== Forward Training Edges: user -> to_train -> item ========== + user_to_train_item_edge_ref = BigqueryEdgeDataReference( + reference_uri=self._train_edge_table, + edge_type=self._user_to_train_item_edge_type, + edge_usage_type=EdgeUsageType.MAIN, + ) + + user_to_train_item_output_id = EdgeOutputIdentifier( + src_node=NodeOutputIdentifier(SRC_COLUMN), + dst_node=NodeOutputIdentifier(DST_COLUMN), + ) + + user_to_train_item_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[SRC_COLUMN, DST_COLUMN], + ) + + user_to_train_item_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + edge_data_ref_to_preprocessing_specs[ + user_to_train_item_edge_ref + ] = EdgeDataPreprocessingSpec( + identifier_output=user_to_train_item_output_id, + features_outputs=[], + labels_outputs=[], + feature_spec_fn=user_to_train_item_feature_spec_fn, + preprocessing_fn=user_to_train_item_preprocessing_fn, + ) + logger.info("Forward training edge spec defined (user -> to_train -> item)") - # Main edge data reference (user -> item interactions) - main_edge_data_ref = BigqueryEdgeDataReference( - reference_uri=self._edge_table, - edge_type=self._user_item_edge_type, + # ========== Forward Test Edges: user -> to_test -> item ========== + user_to_test_item_edge_ref = BigqueryEdgeDataReference( + reference_uri=self._test_edge_table, + edge_type=self._user_to_test_item_edge_type, edge_usage_type=EdgeUsageType.MAIN, ) - # Edge output identifier - edge_output_id = EdgeOutputIdentifier( + user_to_test_item_output_id = EdgeOutputIdentifier( src_node=NodeOutputIdentifier(SRC_COLUMN), dst_node=NodeOutputIdentifier(DST_COLUMN), ) - # Feature spec (just src and dst, no edge features) - edge_feature_spec_fn = build_ingestion_feature_spec_fn( + user_to_test_item_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[SRC_COLUMN, DST_COLUMN], + ) + + user_to_test_item_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + edge_data_ref_to_preprocessing_specs[ + user_to_test_item_edge_ref + ] = EdgeDataPreprocessingSpec( + identifier_output=user_to_test_item_output_id, + features_outputs=[], + labels_outputs=[], + feature_spec_fn=user_to_test_item_feature_spec_fn, + preprocessing_fn=user_to_test_item_preprocessing_fn, + ) + logger.info("Forward test edge spec defined (user -> to_test -> item)") + + # ========== Reverse Training Edges: item -> to_train -> user ========== + # Same BigQuery table, but swap src and dst + item_to_train_user_edge_ref = BigqueryEdgeDataReference( + reference_uri=self._train_edge_table, + edge_type=self._item_to_train_user_edge_type, + edge_usage_type=EdgeUsageType.MAIN, + ) + + # Swap src and dst for reverse direction + item_to_train_user_output_id = EdgeOutputIdentifier( + src_node=NodeOutputIdentifier(DST_COLUMN), # item (was dst) + dst_node=NodeOutputIdentifier(SRC_COLUMN), # user (was src) + ) + + item_to_train_user_feature_spec_fn = build_ingestion_feature_spec_fn( + fixed_int_fields=[SRC_COLUMN, DST_COLUMN], + ) + + item_to_train_user_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + + edge_data_ref_to_preprocessing_specs[ + item_to_train_user_edge_ref + ] = EdgeDataPreprocessingSpec( + identifier_output=item_to_train_user_output_id, + features_outputs=[], + labels_outputs=[], + feature_spec_fn=item_to_train_user_feature_spec_fn, + preprocessing_fn=item_to_train_user_preprocessing_fn, + ) + logger.info("Reverse training edge spec defined (item -> to_train -> user)") + + # ========== Reverse Test Edges: item -> to_test -> user ========== + # Same BigQuery table, but swap src and dst + item_to_test_user_edge_ref = BigqueryEdgeDataReference( + reference_uri=self._test_edge_table, + edge_type=self._item_to_test_user_edge_type, + edge_usage_type=EdgeUsageType.MAIN, + ) + + # Swap src and dst for reverse direction + item_to_test_user_output_id = EdgeOutputIdentifier( + src_node=NodeOutputIdentifier(DST_COLUMN), # item (was dst) + dst_node=NodeOutputIdentifier(SRC_COLUMN), # user (was src) + ) + + item_to_test_user_feature_spec_fn = build_ingestion_feature_spec_fn( fixed_int_fields=[SRC_COLUMN, DST_COLUMN], ) - # Passthrough preprocessing (no transformations needed) - edge_preprocessing_fn = build_passthrough_transform_preprocessing_fn() + item_to_test_user_preprocessing_fn = build_passthrough_transform_preprocessing_fn() edge_data_ref_to_preprocessing_specs[ - main_edge_data_ref + item_to_test_user_edge_ref ] = EdgeDataPreprocessingSpec( - identifier_output=edge_output_id, - features_outputs=[], # No edge features in raw data - labels_outputs=[], # No edge labels - feature_spec_fn=edge_feature_spec_fn, - preprocessing_fn=edge_preprocessing_fn, + identifier_output=item_to_test_user_output_id, + features_outputs=[], + labels_outputs=[], + feature_spec_fn=item_to_test_user_feature_spec_fn, + preprocessing_fn=item_to_test_user_preprocessing_fn, ) + logger.info("Reverse test edge spec defined (item -> to_test -> user)") - logger.info("Edge preprocessing spec defined") + logger.info("All edge preprocessing specs defined (4 edge types: forward and reverse for train and test)") return edge_data_ref_to_preprocessing_specs -def get_gowalla_preprocessor_config(edge_table: str, **kwargs: Any) -> GowallaDataPreprocessorConfig: +def get_gowalla_preprocessor_config( + train_edge_table: str, test_edge_table: str, **kwargs: Any +) -> GowallaDataPreprocessorConfig: """ Factory function to create a GowallaDataPreprocessorConfig instance. Args: - edge_table (str): Full BigQuery path to the edge table + train_edge_table (str): Full BigQuery path to the training edge table + test_edge_table (str): Full BigQuery path to the test edge table **kwargs: Additional configuration arguments Returns: GowallaDataPreprocessorConfig: Configured preprocessor instance """ - return GowallaDataPreprocessorConfig(edge_table=edge_table, **kwargs) + return GowallaDataPreprocessorConfig( + train_edge_table=train_edge_table, test_edge_table=test_edge_table, **kwargs + ) From e4c0654bee9d4066a5591c9ae74d7274d6d617a9 Mon Sep 17 00:00:00 2001 From: swong3 Date: Thu, 18 Dec 2025 21:42:28 +0000 Subject: [PATCH 4/5] Extracted SQL query into reusable function --- .../gowalla_data_preprocessor_config.py | 127 +++++++++--------- 1 file changed, 65 insertions(+), 62 deletions(-) diff --git a/examples/id_embeddings/gowalla_data_preprocessor_config.py b/examples/id_embeddings/gowalla_data_preprocessor_config.py index f36cd9f91..9f65a9101 100644 --- a/examples/id_embeddings/gowalla_data_preprocessor_config.py +++ b/examples/id_embeddings/gowalla_data_preprocessor_config.py @@ -7,8 +7,9 @@ - Item nodes (another node type) - User-to-item edges (interactions) -The configuration handles creating node tables with degree features from the -raw edge data and defines the preprocessing specs for both nodes and edges. +The configuration handles creating node tables from the raw edge data and +defines the preprocessing specs for both nodes and edges. Note that node features +are not included since LightGCN learns embeddings directly from node IDs. """ from typing import Any, Final @@ -51,17 +52,54 @@ # Column names NODE_ID_COLUMN: Final[str] = "node_id" -DEGREE_COLUMN: Final[str] = "degree" SRC_COLUMN: Final[str] = "from_user_id" DST_COLUMN: Final[str] = "to_item_id" +def _build_node_table_query( + node_id_column: str, + train_edge_table: str, + test_edge_table: str, +) -> str: + """ + Build a SQL query to create a node table from train and test edge tables. + + The query aggregates nodes from both edge tables to ensure all nodes + (even those only appearing in test edges) are included. + + Args: + node_id_column (str): The column name in the edge table representing this node type + (e.g., SRC_COLUMN for user nodes, DST_COLUMN for item nodes) + train_edge_table (str): Full BigQuery path to the training edge table + test_edge_table (str): Full BigQuery path to the test edge table + + Returns: + str: SQL query that creates a node table with node_id column + """ + return f""" + SELECT + {node_id_column} AS {NODE_ID_COLUMN} + FROM + `{train_edge_table}` + UNION DISTINCT + SELECT + {node_id_column} AS {NODE_ID_COLUMN} + FROM + `{test_edge_table}` + """ + + class GowallaDataPreprocessorConfig(DataPreprocessorConfig): """ Data preprocessor configuration for the Gowalla bipartite graph dataset. + NOTE: The gowalla dataset we are using [1] has pre-split edges. As such we use their split in this pre-processor. + Those edges were uploaded to BQ with GiGL/python/gigl/scripts/load_gowalla_to_bq.py + + [1]: https://github.com/xiangwang1223/neural_graph_collaborative_filtering/tree/master/Data/gowalla + This config handles: - 1. Creating node tables (users and items) with degree features from both train and test edges + 1. Creating node tables (users and items) from both train and test edges 2. Defining preprocessing specs for user and item nodes 3. Defining preprocessing specs for train and test edges (as separate edge types) @@ -122,8 +160,8 @@ def prepare_for_pipeline( """ Prepare node tables before running the preprocessing pipeline. - This method creates user and item node tables from BOTH train and test edge tables, - with node degree as a simple feature (aggregated across both train and test). + This method creates user and item node tables from BOTH train and test edge tables. + This ensures all nodes are included, even those that only appear in test edges. Args: applied_task_identifier (AppliedTaskIdentifier): Unique identifier for this pipeline run @@ -144,32 +182,13 @@ def prepare_for_pipeline( self._user_node_table = f"{table_prefix}_user_nodes" self._item_node_table = f"{table_prefix}_item_nodes" - # Create user node table with degree feature (combining train and test edges) + # Create user node table logger.info(f"Creating user node table: {self._user_node_table}") - user_node_query = f""" - SELECT - {NODE_ID_COLUMN}, - SUM({DEGREE_COLUMN}) AS {DEGREE_COLUMN} - FROM ( - SELECT - {SRC_COLUMN} AS {NODE_ID_COLUMN}, - COUNT(*) AS {DEGREE_COLUMN} - FROM - `{self._train_edge_table}` - GROUP BY - {SRC_COLUMN} - UNION ALL - SELECT - {SRC_COLUMN} AS {NODE_ID_COLUMN}, - COUNT(*) AS {DEGREE_COLUMN} - FROM - `{self._test_edge_table}` - GROUP BY - {SRC_COLUMN} - ) - GROUP BY - {NODE_ID_COLUMN} - """ + user_node_query = _build_node_table_query( + node_id_column=SRC_COLUMN, + train_edge_table=self._train_edge_table, + test_edge_table=self._test_edge_table, + ) bq_utils.run_query( query=user_node_query, @@ -177,34 +196,15 @@ def prepare_for_pipeline( destination=self._user_node_table, write_disposition=WriteDisposition.WRITE_TRUNCATE, ) - logger.info(f"Created user node table with degree features from train and test edges") + logger.info(f"Created user node table from train and test edges") - # Create item node table with degree feature (combining train and test edges) + # Create item node table logger.info(f"Creating item node table: {self._item_node_table}") - item_node_query = f""" - SELECT - {NODE_ID_COLUMN}, - SUM({DEGREE_COLUMN}) AS {DEGREE_COLUMN} - FROM ( - SELECT - {DST_COLUMN} AS {NODE_ID_COLUMN}, - COUNT(*) AS {DEGREE_COLUMN} - FROM - `{self._train_edge_table}` - GROUP BY - {DST_COLUMN} - UNION ALL - SELECT - {DST_COLUMN} AS {NODE_ID_COLUMN}, - COUNT(*) AS {DEGREE_COLUMN} - FROM - `{self._test_edge_table}` - GROUP BY - {DST_COLUMN} - ) - GROUP BY - {NODE_ID_COLUMN} - """ + item_node_query = _build_node_table_query( + node_id_column=DST_COLUMN, + train_edge_table=self._train_edge_table, + test_edge_table=self._test_edge_table, + ) bq_utils.run_query( query=item_node_query, @@ -212,7 +212,7 @@ def prepare_for_pipeline( destination=self._item_node_table, write_disposition=WriteDisposition.WRITE_TRUNCATE, ) - logger.info(f"Created item node table with degree features from train and test edges") + logger.info(f"Created item node table from train and test edges") # Log statistics user_count = bq_utils.count_number_of_rows_in_bq_table( @@ -229,6 +229,9 @@ def get_nodes_preprocessing_spec( """ Define preprocessing specifications for user and item nodes. + Note: No node features are provided since LightGCN learns node embeddings + directly from node IDs rather than from input features. + Returns: dict[NodeDataReference, NodeDataPreprocessingSpec]: Mapping of node data references to their preprocessing specs. @@ -245,7 +248,7 @@ def get_nodes_preprocessing_spec( ) user_feature_spec_fn = build_ingestion_feature_spec_fn( - fixed_int_fields=[NODE_ID_COLUMN, DEGREE_COLUMN], + fixed_int_fields=[NODE_ID_COLUMN], ) user_preprocessing_fn = build_passthrough_transform_preprocessing_fn() @@ -256,7 +259,7 @@ def get_nodes_preprocessing_spec( user_node_data_ref ] = NodeDataPreprocessingSpec( identifier_output=user_node_output_id, - features_outputs=[DEGREE_COLUMN], + features_outputs=[], # No features - LightGCN uses only node IDs labels_outputs=[], # No labels for unsupervised tasks feature_spec_fn=user_feature_spec_fn, preprocessing_fn=user_preprocessing_fn, @@ -270,7 +273,7 @@ def get_nodes_preprocessing_spec( ) item_feature_spec_fn = build_ingestion_feature_spec_fn( - fixed_int_fields=[NODE_ID_COLUMN, DEGREE_COLUMN], + fixed_int_fields=[NODE_ID_COLUMN], ) item_preprocessing_fn = build_passthrough_transform_preprocessing_fn() @@ -281,7 +284,7 @@ def get_nodes_preprocessing_spec( item_node_data_ref ] = NodeDataPreprocessingSpec( identifier_output=item_node_output_id, - features_outputs=[DEGREE_COLUMN], + features_outputs=[], # No features - LightGCN uses only node IDs labels_outputs=[], # No labels for unsupervised tasks feature_spec_fn=item_feature_spec_fn, preprocessing_fn=item_preprocessing_fn, From 429af5ecd1ed177876a5c734f5148a9a286275ce Mon Sep 17 00:00:00 2001 From: swong3 Date: Thu, 18 Dec 2025 23:25:21 +0000 Subject: [PATCH 5/5] Added query building function --- .../gowalla_data_preprocessor_config.py | 37 ++++++------------- 1 file changed, 12 insertions(+), 25 deletions(-) diff --git a/examples/id_embeddings/gowalla_data_preprocessor_config.py b/examples/id_embeddings/gowalla_data_preprocessor_config.py index 9f65a9101..bc76405ef 100644 --- a/examples/id_embeddings/gowalla_data_preprocessor_config.py +++ b/examples/id_embeddings/gowalla_data_preprocessor_config.py @@ -52,6 +52,7 @@ # Column names NODE_ID_COLUMN: Final[str] = "node_id" +PLACEHOLDER_FEATURE_COLUMN: Final[str] = "placeholder_feature" SRC_COLUMN: Final[str] = "from_user_id" DST_COLUMN: Final[str] = "to_item_id" @@ -67,6 +68,10 @@ def _build_node_table_query( The query aggregates nodes from both edge tables to ensure all nodes (even those only appearing in test edges) are included. + Note: A placeholder feature column is added to satisfy GiGL's data preprocessor + requirements, which expects at least one feature column in addition to node_id. + This column is not used by the LightGCN model. + Args: node_id_column (str): The column name in the edge table representing this node type (e.g., SRC_COLUMN for user nodes, DST_COLUMN for item nodes) @@ -74,16 +79,18 @@ def _build_node_table_query( test_edge_table (str): Full BigQuery path to the test edge table Returns: - str: SQL query that creates a node table with node_id column + str: SQL query that creates a node table with node_id and placeholder_feature columns """ return f""" SELECT - {node_id_column} AS {NODE_ID_COLUMN} + {node_id_column} AS {NODE_ID_COLUMN}, + 0 AS {PLACEHOLDER_FEATURE_COLUMN} FROM `{train_edge_table}` UNION DISTINCT SELECT - {node_id_column} AS {NODE_ID_COLUMN} + {node_id_column} AS {NODE_ID_COLUMN}, + 0 AS {PLACEHOLDER_FEATURE_COLUMN} FROM `{test_edge_table}` """ @@ -248,7 +255,7 @@ def get_nodes_preprocessing_spec( ) user_feature_spec_fn = build_ingestion_feature_spec_fn( - fixed_int_fields=[NODE_ID_COLUMN], + fixed_int_fields=[NODE_ID_COLUMN, PLACEHOLDER_FEATURE_COLUMN], ) user_preprocessing_fn = build_passthrough_transform_preprocessing_fn() @@ -273,7 +280,7 @@ def get_nodes_preprocessing_spec( ) item_feature_spec_fn = build_ingestion_feature_spec_fn( - fixed_int_fields=[NODE_ID_COLUMN], + fixed_int_fields=[NODE_ID_COLUMN, PLACEHOLDER_FEATURE_COLUMN], ) item_preprocessing_fn = build_passthrough_transform_preprocessing_fn() @@ -437,23 +444,3 @@ def get_edges_preprocessing_spec( logger.info("All edge preprocessing specs defined (4 edge types: forward and reverse for train and test)") return edge_data_ref_to_preprocessing_specs - - -def get_gowalla_preprocessor_config( - train_edge_table: str, test_edge_table: str, **kwargs: Any -) -> GowallaDataPreprocessorConfig: - """ - Factory function to create a GowallaDataPreprocessorConfig instance. - - Args: - train_edge_table (str): Full BigQuery path to the training edge table - test_edge_table (str): Full BigQuery path to the test edge table - **kwargs: Additional configuration arguments - - Returns: - GowallaDataPreprocessorConfig: Configured preprocessor instance - - """ - return GowallaDataPreprocessorConfig( - train_edge_table=train_edge_table, test_edge_table=test_edge_table, **kwargs - )