From a9041659efa62dfb4f77dd667a6901219ee7c9ee Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 18 Jun 2024 14:24:19 +0100 Subject: [PATCH 1/7] adding MimeNode type --- .../ragstack_knowledge_store/__init__.py | 3 +- .../ragstack_knowledge_store/graph_store.py | 113 +++++++++++------- 2 files changed, 74 insertions(+), 42 deletions(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/__init__.py b/libs/knowledge-store/ragstack_knowledge_store/__init__.py index 63f45f4d6..05274f28c 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/__init__.py +++ b/libs/knowledge-store/ragstack_knowledge_store/__init__.py @@ -1,5 +1,5 @@ from .embedding_model import EmbeddingModel -from .graph_store import GraphStore, Node, SetupMode, TextNode +from .graph_store import GraphStore, Node, SetupMode, TextNode, MimeNode from .knowledge_store import KnowledgeStore __all__ = [ @@ -9,4 +9,5 @@ "Node", "SetupMode", "TextNode", + "MimeNode", ] diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 0b5858e22..769457f65 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -41,6 +41,17 @@ class TextNode(Node): """Text contained by the node.""" +@dataclass +class MimeNode(Node): + mime_type: str = None + """Type of content, e.g. text/plain or image/png.""" + + mime_content: str = None + """Encoded content""" + + mime_encoding: str = None + """Encoding format""" + class SetupMode(Enum): SYNC = 1 ASYNC = 2 @@ -326,52 +337,72 @@ def add_nodes( self, nodes: Iterable[Node] = None, ) -> Iterable[str]: - texts = [] - metadatas = [] - for node in nodes: - if not isinstance(node, TextNode): - raise ValueError("Only adding TextNode is supported at the moment") - texts.append(node.text) - metadatas.append(node.metadata) - - text_embeddings = self._embedding.embed_texts(texts) - + # Organize nodes by MIME type + mime_buckets = {} ids = [] - tag_to_new_sources: Dict[str, List[Tuple[str, str]]] = {} - tag_to_new_targets: Dict[str, Dict[str, Tuple[str, List[float]]]] = {} + # Prepare nodes based on their type + for node in nodes: + if isinstance(node, MimeNode): + main_mime_type = node.mime_type.split('/')[0] # Split and take the first part, e.g., "image" from "image/png" + if main_mime_type not in mime_buckets: + mime_buckets[main_mime_type] = [] + mime_buckets[main_mime_type].append(node) + elif isinstance(node, TextNode): + if 'text' not in mime_buckets: + mime_buckets['text'] = [] + mime_buckets['text'].append(node) + else: + raise ValueError("Unsupported node type") + + # Process each MIME bucket + embeddings_dict = {} + for mime_type, nodes_list in mime_buckets.items(): + function_mime_type = 'document' if mime_type == 'text' else mime_type + method_name = f"embed_{function_mime_type}s" + if hasattr(self._embedding, method_name): + texts = [node.mime_content if isinstance(node, MimeNode) else node.text for node in nodes_list] + embeddings_dict[mime_type] = getattr(self._embedding, method_name)(texts) + else: + # If no bulk method, try to call a singular method for each content + singular_method_name = f"embed_{function_mime_type}" + if hasattr(self._embedding, singular_method_name): + embeddings = [] + for node in nodes_list: + embedding = getattr(self._embedding, singular_method_name)(node.mime_content if isinstance(node, MimeNode) else node.text) + embeddings.append(embedding) + embeddings_dict[mime_type] = embeddings + else: + raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}") + # Step 1: Add the nodes, collecting the tags and new sources / targets. + tag_to_new_sources = {} + tag_to_new_targets = {} with self._concurrent_queries() as cq: - tuples = zip(texts, text_embeddings, metadatas) - for text, text_embedding, metadata in tuples: - if CONTENT_ID not in metadata: - metadata[CONTENT_ID] = secrets.token_hex(8) - id = metadata[CONTENT_ID] - ids.append(id) - - link_to_tags = set() # link to these tags - link_from_tags = set() # link from these tags - - for tag in get_link_tags(metadata): - tag_str = f"{tag.kind}:{tag.tag}" - if tag.direction == "incoming" or tag.direction == "bidir": - # An incom`ing link should be linked *from* nodes with the given tag. - link_from_tags.add(tag_str) - tag_to_new_targets.setdefault(tag_str, dict())[id] = ( - tag.kind, - text_embedding, - ) - if tag.direction == "outgoing" or tag.direction == "bidir": - link_to_tags.add(tag_str) - tag_to_new_sources.setdefault(tag_str, list()).append( - (tag.kind, id) - ) - - cq.execute( - self._insert_passage, - (id, text, text_embedding, link_to_tags, link_from_tags), - ) + for mime_type, embeddings in embeddings_dict.items(): + for node, embedding in zip(mime_buckets[mime_type], embeddings): + if CONTENT_ID not in node.metadata: + node.metadata[CONTENT_ID] = secrets.token_hex(8) + node_id = node.metadata[CONTENT_ID] + ids.append(node_id) + + link_to_tags = set() + link_from_tags = set() + + for tag in get_link_tags(node.metadata): + tag_str = f"{tag.kind}:{tag.tag}" + if tag.direction in ["incoming", "bidir"]: + link_from_tags.add(tag_str) + tag_to_new_targets.setdefault(tag_str, {})[node_id] = (tag.kind, embedding) + if tag.direction in ["outgoing", "bidir"]: + link_to_tags.add(tag_str) + tag_to_new_sources.setdefault(tag_str, []).append((tag.kind, node_id)) + + cq.execute( + self._insert_passage, + (node_id, node.mime_content if isinstance(node, MimeNode) else node.text, embedding, link_to_tags, link_from_tags), + ) # Step 2: Query information about those tags to determine the edges to add. # Add edges as needed. From bb802e5bfe1cddbc92ac3e0d4aa481d85543b6e8 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 19 Jun 2024 17:06:52 +0100 Subject: [PATCH 2/7] consolidating MIME headers to Node, adding Cassandra --- .../ragstack_knowledge_store/__init__.py | 3 +- .../ragstack_knowledge_store/graph_store.py | 43 +++++++++---------- .../graph_store/__init__.py | 4 +- .../ragstack_langchain/graph_store/base.py | 21 ++++++--- .../graph_store/cassandra.py | 12 ++---- 5 files changed, 41 insertions(+), 42 deletions(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/__init__.py b/libs/knowledge-store/ragstack_knowledge_store/__init__.py index 05274f28c..63f45f4d6 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/__init__.py +++ b/libs/knowledge-store/ragstack_knowledge_store/__init__.py @@ -1,5 +1,5 @@ from .embedding_model import EmbeddingModel -from .graph_store import GraphStore, Node, SetupMode, TextNode, MimeNode +from .graph_store import GraphStore, Node, SetupMode, TextNode from .knowledge_store import KnowledgeStore __all__ = [ @@ -9,5 +9,4 @@ "Node", "SetupMode", "TextNode", - "MimeNode", ] diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 769457f65..a2ff339b9 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -34,24 +34,22 @@ class Node: """Metadata for the node. May contain information used to link this node with other nodes.""" + content: str = None + """Encoded content""" -@dataclass -class TextNode(Node): - text: str = None - """Text contained by the node.""" - - -@dataclass -class MimeNode(Node): mime_type: str = None """Type of content, e.g. text/plain or image/png.""" - mime_content: str = None - """Encoded content""" - mime_encoding: str = None """Encoding format""" +@dataclass +class TextNode(Node): + text: str = None + """Text contained by the node.""" + + mime_type = "text/plain" + class SetupMode(Enum): SYNC = 1 ASYNC = 2 @@ -343,37 +341,36 @@ def add_nodes( # Prepare nodes based on their type for node in nodes: - if isinstance(node, MimeNode): + if isinstance(node, TextNode): + if 'text' not in mime_buckets: + mime_buckets['text'] = [] + mime_buckets['text'].append(node) + if isinstance(node, Node) and node.mime_type: main_mime_type = node.mime_type.split('/')[0] # Split and take the first part, e.g., "image" from "image/png" if main_mime_type not in mime_buckets: mime_buckets[main_mime_type] = [] mime_buckets[main_mime_type].append(node) - elif isinstance(node, TextNode): - if 'text' not in mime_buckets: - mime_buckets['text'] = [] - mime_buckets['text'].append(node) else: raise ValueError("Unsupported node type") # Process each MIME bucket embeddings_dict = {} for mime_type, nodes_list in mime_buckets.items(): - function_mime_type = 'document' if mime_type == 'text' else mime_type - method_name = f"embed_{function_mime_type}s" + method_name = f"embed_{mime_type}s" if hasattr(self._embedding, method_name): - texts = [node.mime_content if isinstance(node, MimeNode) else node.text for node in nodes_list] + texts = [node.text if isinstance(node, TextNode) else node.content for node in nodes_list] embeddings_dict[mime_type] = getattr(self._embedding, method_name)(texts) else: # If no bulk method, try to call a singular method for each content - singular_method_name = f"embed_{function_mime_type}" + singular_method_name = f"embed_{mime_type}" if hasattr(self._embedding, singular_method_name): embeddings = [] for node in nodes_list: - embedding = getattr(self._embedding, singular_method_name)(node.mime_content if isinstance(node, MimeNode) else node.text) + embedding = getattr(self._embedding, singular_method_name)(node.text if isinstance(node, TextNode) else node.content) embeddings.append(embedding) embeddings_dict[mime_type] = embeddings else: - raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}") + raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}.") # Step 1: Add the nodes, collecting the tags and new sources / targets. @@ -401,7 +398,7 @@ def add_nodes( cq.execute( self._insert_passage, - (node_id, node.mime_content if isinstance(node, MimeNode) else node.text, embedding, link_to_tags, link_from_tags), + (node_id, node.text if isinstance(node, TextNode) else node.content, embedding, link_to_tags, link_from_tags), ) # Step 2: Query information about those tags to determine the edges to add. diff --git a/libs/langchain/ragstack_langchain/graph_store/__init__.py b/libs/langchain/ragstack_langchain/graph_store/__init__.py index eb90fc83e..cbf307c94 100644 --- a/libs/langchain/ragstack_langchain/graph_store/__init__.py +++ b/libs/langchain/ragstack_langchain/graph_store/__init__.py @@ -1,4 +1,4 @@ -from .base import GraphStore, Node, TextNode +from .base import GraphStore, Node from .cassandra import CassandraGraphStore -__all__ = ["CassandraGraphStore", "GraphStore", "Node", "TextNode"] +__all__ = ["CassandraGraphStore", "GraphStore", "Node"] diff --git a/libs/langchain/ragstack_langchain/graph_store/base.py b/libs/langchain/ragstack_langchain/graph_store/base.py index c28696cf8..336414e3e 100644 --- a/libs/langchain/ragstack_langchain/graph_store/base.py +++ b/libs/langchain/ragstack_langchain/graph_store/base.py @@ -39,11 +39,14 @@ class Node(Serializable): """Metadata for the node. May contain information used to link this node with other nodes.""" + content: str = None + """Encoded content""" -class TextNode(Node): - text: str - """Text contained by the node.""" + mime_type: str = None + """Type of content, e.g. text/plain or image/png.""" + mime_encoding: str = None + """Encoding format""" def _texts_to_nodes( texts: Iterable[str], @@ -61,10 +64,11 @@ def _texts_to_nodes( _id = next(ids_it) if ids_it else None except StopIteration: raise ValueError("texts iterable longer than ids") - yield TextNode( + yield Node( id=_id, metadata=_metadata, - text=text, + mime_type="text/plain", + content=text, ) if ids and _has_next(ids_it): raise ValueError("ids iterable longer than texts") @@ -81,10 +85,13 @@ def _documents_to_nodes( _id = next(ids_it) if ids_it else None except StopIteration: raise ValueError("documents iterable longer than ids") - yield TextNode( + + yield Node( id=_id, metadata=doc.metadata, - text=doc.page_content, + mime_type=doc.metadata.get('mime_type', 'text/plain'), + mime_encoding=doc.metadata.get('mime_encoding', None), + content=doc.page_content, ) if ids and _has_next(ids_it): raise ValueError("ids iterable longer than documents") diff --git a/libs/langchain/ragstack_langchain/graph_store/cassandra.py b/libs/langchain/ragstack_langchain/graph_store/cassandra.py index fe2fe5e63..3e0428053 100644 --- a/libs/langchain/ragstack_langchain/graph_store/cassandra.py +++ b/libs/langchain/ragstack_langchain/graph_store/cassandra.py @@ -11,7 +11,7 @@ from langchain_core.documents import Document from langchain_core.embeddings import Embeddings -from .base import GraphStore, Node, TextNode +from .base import GraphStore, Node from ragstack_knowledge_store import EmbeddingModel, graph_store @@ -96,14 +96,10 @@ def add_nodes( nodes: Iterable[Node] = None, **kwargs: Any, ): - _nodes = [] for node in nodes: - if not isinstance(node, TextNode): - raise ValueError("Only adding TextNode is supported at the moment") - _nodes.append( - graph_store.TextNode(id=node.id, text=node.text, metadata=node.metadata) - ) - return self.store.add_nodes(_nodes) + if not isinstance(node, Node): + raise ValueError("Only adding Node is supported at the moment") + return self.store.add_nodes(nodes) @classmethod def from_texts( From 8fee3fef18f0d184eef57f4c763510dd00e0ab00 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Tue, 18 Jun 2024 14:24:19 +0100 Subject: [PATCH 3/7] adding MimeNode type --- .../ragstack_knowledge_store/__init__.py | 3 +- .../ragstack_knowledge_store/graph_store.py | 113 +++++++++++------- 2 files changed, 74 insertions(+), 42 deletions(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/__init__.py b/libs/knowledge-store/ragstack_knowledge_store/__init__.py index 63f45f4d6..05274f28c 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/__init__.py +++ b/libs/knowledge-store/ragstack_knowledge_store/__init__.py @@ -1,5 +1,5 @@ from .embedding_model import EmbeddingModel -from .graph_store import GraphStore, Node, SetupMode, TextNode +from .graph_store import GraphStore, Node, SetupMode, TextNode, MimeNode from .knowledge_store import KnowledgeStore __all__ = [ @@ -9,4 +9,5 @@ "Node", "SetupMode", "TextNode", + "MimeNode", ] diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 0b5858e22..769457f65 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -41,6 +41,17 @@ class TextNode(Node): """Text contained by the node.""" +@dataclass +class MimeNode(Node): + mime_type: str = None + """Type of content, e.g. text/plain or image/png.""" + + mime_content: str = None + """Encoded content""" + + mime_encoding: str = None + """Encoding format""" + class SetupMode(Enum): SYNC = 1 ASYNC = 2 @@ -326,52 +337,72 @@ def add_nodes( self, nodes: Iterable[Node] = None, ) -> Iterable[str]: - texts = [] - metadatas = [] - for node in nodes: - if not isinstance(node, TextNode): - raise ValueError("Only adding TextNode is supported at the moment") - texts.append(node.text) - metadatas.append(node.metadata) - - text_embeddings = self._embedding.embed_texts(texts) - + # Organize nodes by MIME type + mime_buckets = {} ids = [] - tag_to_new_sources: Dict[str, List[Tuple[str, str]]] = {} - tag_to_new_targets: Dict[str, Dict[str, Tuple[str, List[float]]]] = {} + # Prepare nodes based on their type + for node in nodes: + if isinstance(node, MimeNode): + main_mime_type = node.mime_type.split('/')[0] # Split and take the first part, e.g., "image" from "image/png" + if main_mime_type not in mime_buckets: + mime_buckets[main_mime_type] = [] + mime_buckets[main_mime_type].append(node) + elif isinstance(node, TextNode): + if 'text' not in mime_buckets: + mime_buckets['text'] = [] + mime_buckets['text'].append(node) + else: + raise ValueError("Unsupported node type") + + # Process each MIME bucket + embeddings_dict = {} + for mime_type, nodes_list in mime_buckets.items(): + function_mime_type = 'document' if mime_type == 'text' else mime_type + method_name = f"embed_{function_mime_type}s" + if hasattr(self._embedding, method_name): + texts = [node.mime_content if isinstance(node, MimeNode) else node.text for node in nodes_list] + embeddings_dict[mime_type] = getattr(self._embedding, method_name)(texts) + else: + # If no bulk method, try to call a singular method for each content + singular_method_name = f"embed_{function_mime_type}" + if hasattr(self._embedding, singular_method_name): + embeddings = [] + for node in nodes_list: + embedding = getattr(self._embedding, singular_method_name)(node.mime_content if isinstance(node, MimeNode) else node.text) + embeddings.append(embedding) + embeddings_dict[mime_type] = embeddings + else: + raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}") + # Step 1: Add the nodes, collecting the tags and new sources / targets. + tag_to_new_sources = {} + tag_to_new_targets = {} with self._concurrent_queries() as cq: - tuples = zip(texts, text_embeddings, metadatas) - for text, text_embedding, metadata in tuples: - if CONTENT_ID not in metadata: - metadata[CONTENT_ID] = secrets.token_hex(8) - id = metadata[CONTENT_ID] - ids.append(id) - - link_to_tags = set() # link to these tags - link_from_tags = set() # link from these tags - - for tag in get_link_tags(metadata): - tag_str = f"{tag.kind}:{tag.tag}" - if tag.direction == "incoming" or tag.direction == "bidir": - # An incom`ing link should be linked *from* nodes with the given tag. - link_from_tags.add(tag_str) - tag_to_new_targets.setdefault(tag_str, dict())[id] = ( - tag.kind, - text_embedding, - ) - if tag.direction == "outgoing" or tag.direction == "bidir": - link_to_tags.add(tag_str) - tag_to_new_sources.setdefault(tag_str, list()).append( - (tag.kind, id) - ) - - cq.execute( - self._insert_passage, - (id, text, text_embedding, link_to_tags, link_from_tags), - ) + for mime_type, embeddings in embeddings_dict.items(): + for node, embedding in zip(mime_buckets[mime_type], embeddings): + if CONTENT_ID not in node.metadata: + node.metadata[CONTENT_ID] = secrets.token_hex(8) + node_id = node.metadata[CONTENT_ID] + ids.append(node_id) + + link_to_tags = set() + link_from_tags = set() + + for tag in get_link_tags(node.metadata): + tag_str = f"{tag.kind}:{tag.tag}" + if tag.direction in ["incoming", "bidir"]: + link_from_tags.add(tag_str) + tag_to_new_targets.setdefault(tag_str, {})[node_id] = (tag.kind, embedding) + if tag.direction in ["outgoing", "bidir"]: + link_to_tags.add(tag_str) + tag_to_new_sources.setdefault(tag_str, []).append((tag.kind, node_id)) + + cq.execute( + self._insert_passage, + (node_id, node.mime_content if isinstance(node, MimeNode) else node.text, embedding, link_to_tags, link_from_tags), + ) # Step 2: Query information about those tags to determine the edges to add. # Add edges as needed. From 1b81c2cf2c39a37785081e6979938aae374c25b7 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Wed, 19 Jun 2024 17:06:52 +0100 Subject: [PATCH 4/7] consolidating MIME headers to Node, adding Cassandra --- .../ragstack_knowledge_store/__init__.py | 3 +- .../ragstack_knowledge_store/graph_store.py | 43 +++++++++---------- .../graph_store/__init__.py | 4 +- .../ragstack_langchain/graph_store/base.py | 21 ++++++--- .../graph_store/cassandra.py | 12 ++---- 5 files changed, 41 insertions(+), 42 deletions(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/__init__.py b/libs/knowledge-store/ragstack_knowledge_store/__init__.py index 05274f28c..63f45f4d6 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/__init__.py +++ b/libs/knowledge-store/ragstack_knowledge_store/__init__.py @@ -1,5 +1,5 @@ from .embedding_model import EmbeddingModel -from .graph_store import GraphStore, Node, SetupMode, TextNode, MimeNode +from .graph_store import GraphStore, Node, SetupMode, TextNode from .knowledge_store import KnowledgeStore __all__ = [ @@ -9,5 +9,4 @@ "Node", "SetupMode", "TextNode", - "MimeNode", ] diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 769457f65..a2ff339b9 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -34,24 +34,22 @@ class Node: """Metadata for the node. May contain information used to link this node with other nodes.""" + content: str = None + """Encoded content""" -@dataclass -class TextNode(Node): - text: str = None - """Text contained by the node.""" - - -@dataclass -class MimeNode(Node): mime_type: str = None """Type of content, e.g. text/plain or image/png.""" - mime_content: str = None - """Encoded content""" - mime_encoding: str = None """Encoding format""" +@dataclass +class TextNode(Node): + text: str = None + """Text contained by the node.""" + + mime_type = "text/plain" + class SetupMode(Enum): SYNC = 1 ASYNC = 2 @@ -343,37 +341,36 @@ def add_nodes( # Prepare nodes based on their type for node in nodes: - if isinstance(node, MimeNode): + if isinstance(node, TextNode): + if 'text' not in mime_buckets: + mime_buckets['text'] = [] + mime_buckets['text'].append(node) + if isinstance(node, Node) and node.mime_type: main_mime_type = node.mime_type.split('/')[0] # Split and take the first part, e.g., "image" from "image/png" if main_mime_type not in mime_buckets: mime_buckets[main_mime_type] = [] mime_buckets[main_mime_type].append(node) - elif isinstance(node, TextNode): - if 'text' not in mime_buckets: - mime_buckets['text'] = [] - mime_buckets['text'].append(node) else: raise ValueError("Unsupported node type") # Process each MIME bucket embeddings_dict = {} for mime_type, nodes_list in mime_buckets.items(): - function_mime_type = 'document' if mime_type == 'text' else mime_type - method_name = f"embed_{function_mime_type}s" + method_name = f"embed_{mime_type}s" if hasattr(self._embedding, method_name): - texts = [node.mime_content if isinstance(node, MimeNode) else node.text for node in nodes_list] + texts = [node.text if isinstance(node, TextNode) else node.content for node in nodes_list] embeddings_dict[mime_type] = getattr(self._embedding, method_name)(texts) else: # If no bulk method, try to call a singular method for each content - singular_method_name = f"embed_{function_mime_type}" + singular_method_name = f"embed_{mime_type}" if hasattr(self._embedding, singular_method_name): embeddings = [] for node in nodes_list: - embedding = getattr(self._embedding, singular_method_name)(node.mime_content if isinstance(node, MimeNode) else node.text) + embedding = getattr(self._embedding, singular_method_name)(node.text if isinstance(node, TextNode) else node.content) embeddings.append(embedding) embeddings_dict[mime_type] = embeddings else: - raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}") + raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}.") # Step 1: Add the nodes, collecting the tags and new sources / targets. @@ -401,7 +398,7 @@ def add_nodes( cq.execute( self._insert_passage, - (node_id, node.mime_content if isinstance(node, MimeNode) else node.text, embedding, link_to_tags, link_from_tags), + (node_id, node.text if isinstance(node, TextNode) else node.content, embedding, link_to_tags, link_from_tags), ) # Step 2: Query information about those tags to determine the edges to add. diff --git a/libs/langchain/ragstack_langchain/graph_store/__init__.py b/libs/langchain/ragstack_langchain/graph_store/__init__.py index eb90fc83e..cbf307c94 100644 --- a/libs/langchain/ragstack_langchain/graph_store/__init__.py +++ b/libs/langchain/ragstack_langchain/graph_store/__init__.py @@ -1,4 +1,4 @@ -from .base import GraphStore, Node, TextNode +from .base import GraphStore, Node from .cassandra import CassandraGraphStore -__all__ = ["CassandraGraphStore", "GraphStore", "Node", "TextNode"] +__all__ = ["CassandraGraphStore", "GraphStore", "Node"] diff --git a/libs/langchain/ragstack_langchain/graph_store/base.py b/libs/langchain/ragstack_langchain/graph_store/base.py index c28696cf8..336414e3e 100644 --- a/libs/langchain/ragstack_langchain/graph_store/base.py +++ b/libs/langchain/ragstack_langchain/graph_store/base.py @@ -39,11 +39,14 @@ class Node(Serializable): """Metadata for the node. May contain information used to link this node with other nodes.""" + content: str = None + """Encoded content""" -class TextNode(Node): - text: str - """Text contained by the node.""" + mime_type: str = None + """Type of content, e.g. text/plain or image/png.""" + mime_encoding: str = None + """Encoding format""" def _texts_to_nodes( texts: Iterable[str], @@ -61,10 +64,11 @@ def _texts_to_nodes( _id = next(ids_it) if ids_it else None except StopIteration: raise ValueError("texts iterable longer than ids") - yield TextNode( + yield Node( id=_id, metadata=_metadata, - text=text, + mime_type="text/plain", + content=text, ) if ids and _has_next(ids_it): raise ValueError("ids iterable longer than texts") @@ -81,10 +85,13 @@ def _documents_to_nodes( _id = next(ids_it) if ids_it else None except StopIteration: raise ValueError("documents iterable longer than ids") - yield TextNode( + + yield Node( id=_id, metadata=doc.metadata, - text=doc.page_content, + mime_type=doc.metadata.get('mime_type', 'text/plain'), + mime_encoding=doc.metadata.get('mime_encoding', None), + content=doc.page_content, ) if ids and _has_next(ids_it): raise ValueError("ids iterable longer than documents") diff --git a/libs/langchain/ragstack_langchain/graph_store/cassandra.py b/libs/langchain/ragstack_langchain/graph_store/cassandra.py index fe2fe5e63..3e0428053 100644 --- a/libs/langchain/ragstack_langchain/graph_store/cassandra.py +++ b/libs/langchain/ragstack_langchain/graph_store/cassandra.py @@ -11,7 +11,7 @@ from langchain_core.documents import Document from langchain_core.embeddings import Embeddings -from .base import GraphStore, Node, TextNode +from .base import GraphStore, Node from ragstack_knowledge_store import EmbeddingModel, graph_store @@ -96,14 +96,10 @@ def add_nodes( nodes: Iterable[Node] = None, **kwargs: Any, ): - _nodes = [] for node in nodes: - if not isinstance(node, TextNode): - raise ValueError("Only adding TextNode is supported at the moment") - _nodes.append( - graph_store.TextNode(id=node.id, text=node.text, metadata=node.metadata) - ) - return self.store.add_nodes(_nodes) + if not isinstance(node, Node): + raise ValueError("Only adding Node is supported at the moment") + return self.store.add_nodes(nodes) @classmethod def from_texts( From 1222740ab6d236481db232baaa99fefec8fb2391 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Thu, 20 Jun 2024 14:29:35 +0100 Subject: [PATCH 5/7] incorporate EmbeddingModel abstraction --- .../embedding_model.py | 52 ++++++++++++++++--- .../ragstack_knowledge_store/graph_store.py | 10 ++-- .../graph_store/cassandra.py | 29 +++-------- .../graph_store/embedding_adapter.py | 12 +++++ 4 files changed, 70 insertions(+), 33 deletions(-) create mode 100644 libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py diff --git a/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py b/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py index 549814976..105e48973 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py +++ b/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py @@ -1,22 +1,60 @@ -from abc import ABC, abstractmethod -from typing import List - +from abc import ABC +from typing import List, Any, Optional class EmbeddingModel(ABC): """Embedding model.""" - @abstractmethod + def __init__(self, embeddings:Any, other_methods:Optional[List[str]]=None): + self.embeddings = embeddings + self.method_cache = {} + + if other_methods is None: + other_methods = [] + + base_methods = ['embed_texts', 'aembed_texts', 'embed_query', 'aembed_query'] + extended_methods = ['embed_images', 'aembed_images', 'embed_image', 'aembed_image'] + + for method_name in base_methods: + self.method_cache[method_name] = True + + for method_name in extended_methods + other_methods: + self.method_cache[method_name] = hasattr(embeddings, method_name) + + def does_implement(self, method_name: str) -> bool: + """Check if the method is implemented.""" + return self.method_cache.get(method_name, False) + + def implements(self) -> List[str]: + """List of methods that are implemented""" + return [method for method, implemented in self.method_cache.items() if implemented] + + def invoke(self, method_name: str, *args, **kwargs): + """Invoke a synchronous method if it's implemented.""" + if self.does_implement(method_name): + return getattr(self.embeddings, method_name)(*args, **kwargs) + else: + raise NotImplementedError(f"{self.embeddings.__class__} does not implement {method_name}") + + async def ainvoke(self, method_name: str, *args, **kwargs): + """Invoke an asynchronous method if it's implemented.""" + if self.does_implement(method_name): + return await getattr(self.embeddings, method_name)(*args, **kwargs) + else: + raise NotImplementedError(f"{self.embeddings.__class__} does not implement {method_name}") + def embed_texts(self, texts: List[str]) -> List[List[float]]: """Embed texts.""" + return self.invoke('embed_texts', texts) - @abstractmethod def embed_query(self, text: str) -> List[float]: """Embed query text.""" + return self.invoke('embed_query', text) - @abstractmethod async def aembed_texts(self, texts: List[str]) -> List[List[float]]: """Embed texts.""" + return await self.ainvoke('aembed_texts', texts) - @abstractmethod async def aembed_query(self, text: str) -> List[float]: """Embed query text.""" + return await self.ainvoke('aembed_query', text) + diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index a2ff339b9..7e53f2f46 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -357,20 +357,20 @@ def add_nodes( embeddings_dict = {} for mime_type, nodes_list in mime_buckets.items(): method_name = f"embed_{mime_type}s" - if hasattr(self._embedding, method_name): + if self._embedding.does_implement(method_name): texts = [node.text if isinstance(node, TextNode) else node.content for node in nodes_list] - embeddings_dict[mime_type] = getattr(self._embedding, method_name)(texts) + embeddings_dict[mime_type] = self._embedding.invoke(method_name, texts) else: # If no bulk method, try to call a singular method for each content singular_method_name = f"embed_{mime_type}" - if hasattr(self._embedding, singular_method_name): + if self._embedding.does_implement(singular_method_name): embeddings = [] for node in nodes_list: - embedding = getattr(self._embedding, singular_method_name)(node.text if isinstance(node, TextNode) else node.content) + embedding = self._embedding.invoke(singular_method_name, node.text if isinstance(node, TextNode) else node.content) embeddings.append(embedding) embeddings_dict[mime_type] = embeddings else: - raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}.") + raise NotImplementedError(f"No embedding method available for MIME type: {mime_type}, implemented methods: {self._embedding.implements()}.") # Step 1: Add the nodes, collecting the tags and new sources / targets. diff --git a/libs/langchain/ragstack_langchain/graph_store/cassandra.py b/libs/langchain/ragstack_langchain/graph_store/cassandra.py index 3e0428053..ebc799c1d 100644 --- a/libs/langchain/ragstack_langchain/graph_store/cassandra.py +++ b/libs/langchain/ragstack_langchain/graph_store/cassandra.py @@ -12,25 +12,8 @@ from langchain_core.embeddings import Embeddings from .base import GraphStore, Node -from ragstack_knowledge_store import EmbeddingModel, graph_store - - -class _EmbeddingModelAdapter(EmbeddingModel): - def __init__(self, embeddings: Embeddings): - self.embeddings = embeddings - - def embed_texts(self, texts: List[str]) -> List[List[float]]: - return self.embeddings.embed_documents(texts) - - def embed_query(self, text: str) -> List[float]: - return self.embeddings.embed_query(text) - - async def aembed_texts(self, texts: List[str]) -> List[List[float]]: - return await self.embeddings.aembed_documents(texts) - - async def aembed_query(self, text: str) -> List[float]: - return await self.embeddings.aembed_query(text) - +from .embedding_adapter import EmbeddingAdapter +from ragstack_knowledge_store import graph_store def _row_to_document(row) -> Document: return Document( @@ -78,7 +61,7 @@ def __init__( _setup_mode = getattr(graph_store.SetupMode, setup_mode.name) self.store = graph_store.GraphStore( - embedding=_EmbeddingModelAdapter(embedding), + embedding=EmbeddingAdapter(embedding), node_table=node_table, edge_table=edge_table, session=session, @@ -96,10 +79,14 @@ def add_nodes( nodes: Iterable[Node] = None, **kwargs: Any, ): + _nodes = [] for node in nodes: if not isinstance(node, Node): raise ValueError("Only adding Node is supported at the moment") - return self.store.add_nodes(nodes) + _nodes.append( + graph_store.Node(id=node.id, content=node.content, mime_type=node.mime_type, mime_encoding=node.mime_encoding, metadata=node.metadata) + ) + return self.store.add_nodes(_nodes) @classmethod def from_texts( diff --git a/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py b/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py new file mode 100644 index 000000000..c98968786 --- /dev/null +++ b/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py @@ -0,0 +1,12 @@ +from typing import List +from ragstack_knowledge_store import EmbeddingModel + +class EmbeddingAdapter(EmbeddingModel): + def __init__(self, embeddings): + super().__init__(embeddings, other_methods=['embed_documents,aembed_documents']) + + def embed_texts(self, texts: List[str]) -> List[List[float]]: + return self.invoke('embed_documents', texts) + + async def aembed_texts(self, texts: List[str]) -> List[List[float]]: + return await self.ainvoke('aembed_documents', texts) From f950df1d792fde6ac5e554ae21854cea6605f7c5 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Thu, 20 Jun 2024 16:41:45 +0100 Subject: [PATCH 6/7] fixing text embeddings --- .../embedding_model.py | 43 +++++++++++-------- .../graph_store/embedding_adapter.py | 9 ++-- 2 files changed, 28 insertions(+), 24 deletions(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py b/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py index 105e48973..24f5429e3 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py +++ b/libs/knowledge-store/ragstack_knowledge_store/embedding_model.py @@ -4,43 +4,50 @@ class EmbeddingModel(ABC): """Embedding model.""" - def __init__(self, embeddings:Any, other_methods:Optional[List[str]]=None): + def __init__(self, embeddings: Any, method_map: Optional[dict] = None, other_methods: Optional[List[str]] = None): self.embeddings = embeddings - self.method_cache = {} - - if other_methods is None: - other_methods = [] + self.method_name = {} + method_map = method_map if method_map else {} + other_methods = other_methods if other_methods else [] base_methods = ['embed_texts', 'aembed_texts', 'embed_query', 'aembed_query'] extended_methods = ['embed_images', 'aembed_images', 'embed_image', 'aembed_image'] + + # Combining all method names, including those mapped + all_methods = set(base_methods + extended_methods + other_methods + list(method_map.values())) - for method_name in base_methods: - self.method_cache[method_name] = True - - for method_name in extended_methods + other_methods: - self.method_cache[method_name] = hasattr(embeddings, method_name) + for method in all_methods: + mapped_method = method_map.get(method) + if hasattr(embeddings, method): + self.method_name[method] = method + elif hasattr(embeddings, mapped_method) if mapped_method else False: + self.method_name[method] = mapped_method + else: + self.method_name[method] = None def does_implement(self, method_name: str) -> bool: """Check if the method is implemented.""" - return self.method_cache.get(method_name, False) + return self.method_name.get(method_name) is not None def implements(self) -> List[str]: """List of methods that are implemented""" - return [method for method, implemented in self.method_cache.items() if implemented] + return [method for method, impl in self.method_name.items() if impl is not None] def invoke(self, method_name: str, *args, **kwargs): """Invoke a synchronous method if it's implemented.""" - if self.does_implement(method_name): - return getattr(self.embeddings, method_name)(*args, **kwargs) + target_method = self.method_name.get(method_name) + if target_method and hasattr(self.embeddings, target_method): + return getattr(self.embeddings, target_method)(*args, **kwargs) else: - raise NotImplementedError(f"{self.embeddings.__class__} does not implement {method_name}") + raise NotImplementedError(f"{self.embeddings.__class__.__name__} does not implement {target_method}") async def ainvoke(self, method_name: str, *args, **kwargs): """Invoke an asynchronous method if it's implemented.""" - if self.does_implement(method_name): - return await getattr(self.embeddings, method_name)(*args, **kwargs) + target_method = self.method_name.get(method_name) + if target_method and hasattr(self.embeddings, target_method): + return await getattr(self.embeddings, target_method)(*args, **kwargs) else: - raise NotImplementedError(f"{self.embeddings.__class__} does not implement {method_name}") + raise NotImplementedError(f"{self.embeddings.__class__.__name__} does not implement {target_method}") def embed_texts(self, texts: List[str]) -> List[List[float]]: """Embed texts.""" diff --git a/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py b/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py index c98968786..ed4dec78e 100644 --- a/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py +++ b/libs/langchain/ragstack_langchain/graph_store/embedding_adapter.py @@ -3,10 +3,7 @@ class EmbeddingAdapter(EmbeddingModel): def __init__(self, embeddings): - super().__init__(embeddings, other_methods=['embed_documents,aembed_documents']) + super().__init__(embeddings, + method_map={'embed_texts': 'embed_documents', + 'aembed_texts': 'aembed_documents'}) - def embed_texts(self, texts: List[str]) -> List[List[float]]: - return self.invoke('embed_documents', texts) - - async def aembed_texts(self, texts: List[str]) -> List[List[float]]: - return await self.ainvoke('aembed_documents', texts) From 3046ce7fd97cbca7ab320c82e726dbf3b9707b50 Mon Sep 17 00:00:00 2001 From: Phil Miesle Date: Mon, 24 Jun 2024 10:55:39 +0100 Subject: [PATCH 7/7] preserve incoming node_id --- libs/knowledge-store/ragstack_knowledge_store/graph_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py index 7e53f2f46..8054aed88 100644 --- a/libs/knowledge-store/ragstack_knowledge_store/graph_store.py +++ b/libs/knowledge-store/ragstack_knowledge_store/graph_store.py @@ -380,7 +380,7 @@ def add_nodes( for mime_type, embeddings in embeddings_dict.items(): for node, embedding in zip(mime_buckets[mime_type], embeddings): if CONTENT_ID not in node.metadata: - node.metadata[CONTENT_ID] = secrets.token_hex(8) + node.metadata[CONTENT_ID] = node.id if node.id else secrets.token_hex(8) node_id = node.metadata[CONTENT_ID] ids.append(node_id)