Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
442341c
analyzer skeleton
yampelo Nov 14, 2019
1b7f705
Begins working on rule system, adds field lookups.
yampelo Nov 15, 2019
47cd6ad
Adds tests for selecting a NodeWithProps in NetworkX
yampelo Nov 15, 2019
63e24b1
Adds conditionals for matching
yampelo Nov 15, 2019
7f78477
Fixes not_null wrapper
yampelo Nov 15, 2019
f82339e
Adds operator overloading to lookups
yampelo Nov 15, 2019
39c3dcf
Moves statements to work on nx.Graph objects instead of NetworkX Back…
yampelo Nov 15, 2019
3cbae4f
EdgeByProps: Adds statement to return subgraph that contains a matchi…
yampelo Nov 15, 2019
f2dc414
Filter node and return ancestors/descendants/all reachable.
yampelo Nov 15, 2019
024b7b5
Moves test graphs to fixture files
yampelo Nov 15, 2019
6adb2e1
ChainedStatement: Adds ability to perform statement1 | statement2
yampelo Nov 15, 2019
26ac070
FindProcess: adds processs queries
yampelo Nov 15, 2019
46c3dad
Splits Node/Edge statements into seperate files
yampelo Nov 16, 2019
56d1e2c
Adds statement chaining using >> or << operators
yampelo Nov 16, 2019
e01203d
adds intermediate statements, allowing to chain actions
yampelo Nov 16, 2019
349d2da
classmethod -> staticmethod
yampelo Nov 17, 2019
7a71a65
Analyzer: Class to execute statements
yampelo Nov 17, 2019
1fc6e3d
Fixes unit tests
yampelo Nov 17, 2019
db98490
Tests edges with tree structures graphs
yampelo Nov 17, 2019
f6b6027
Renames Statement as Query
yampelo Nov 17, 2019
e2205a5
Adds FindProcess.that_was_launched
yampelo Nov 17, 2019
22e7043
Adds query factory for Files
yampelo Nov 17, 2019
460e1d9
All queries can now be intermediary by default.
yampelo Nov 18, 2019
cfea7bd
FindFile: finishes file queries
yampelo Nov 18, 2019
55404f3
SummaryQuery: adds ability to summarize information gathered
yampelo Nov 18, 2019
b93049e
Merge branch 'master' into analyzer-framework
yampelo Nov 23, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added beagle/analyzers/__init__.py
Empty file.
52 changes: 52 additions & 0 deletions beagle/analyzers/base_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from typing import Any, Type, cast

import networkx as nx

from beagle.backends import Backend, NetworkX
from beagle.common import logger

from .queries.base_query import Query
from .queries.summary import SummaryQuery


class Analyzer(object):
def __init__(self, name: str, query: Query, description: str = None, score: int = None):
self.name = name
self.description = description
self.score = score

# Make sure we get the start.
while query.upstream_query is not None:
query = query.upstream_query

self.query: Query = query

def run(self, backend: Type[Backend]) -> Any:

if isinstance(backend, NetworkX):
backend = cast(NetworkX, backend)
return self.run_networkx(backend.G)

def run_networkx(self, G: nx.Graph) -> nx.Graph:
logger.info(f"Running analyzer {self.name}")

# H is a copy of our original graph.
H = G.copy()

current_query = self.query

while current_query is not None:
# Run the query.
if isinstance(current_query, SummaryQuery):
# SummaryQueries get the original graph.
H = current_query.execute_networkx(G.copy())
else:
H = current_query.execute_networkx(H)

# Get the next query, and execute
current_query = current_query.downstream_query

if len(H.nodes()) > 0:
logger.info(f"Analyzer query returned a matching subgraph.")

return H
24 changes: 24 additions & 0 deletions beagle/analyzers/queries/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from networkx import nx
from .base_query import Query, PropsDict
from .edge import EdgeByProps, EdgeByPropsAncestors, EdgeByPropsDescendants, EdgeByPropsReachable


def make_edge_query(
edge_type: str, descendants=True, ancestors=False, reachable=False, edge_props: PropsDict = {}
) -> Query:
if reachable or (descendants and reachable):
return EdgeByPropsReachable(edge_type=edge_type, edge_props=edge_props)
elif descendants:
return EdgeByPropsDescendants(edge_type=edge_type, edge_props=edge_props)
elif ancestors:
return EdgeByPropsAncestors(edge_type=edge_type, edge_props=edge_props)
else:
return EdgeByProps(edge_type=edge_type, edge_props=edge_props)


class FactoryMixin(object):
"""Mixin to prevent Query Factories from calling execute methods.
"""

def execute_networkx(self, G: nx.graph):
raise UserWarning("Query factories cannot be called directly")
203 changes: 203 additions & 0 deletions beagle/analyzers/queries/base_query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from typing import Any, Dict, List, Set, Tuple, Union

import networkx as nx

from beagle.nodes import Node

from .lookups import Exact, FieldLookup


PropsDict = Dict[str, Union[str, FieldLookup, Dict, None]]


def _str_to_exact(props: dict) -> Dict[str, Union[FieldLookup, Dict]]:
# Ensures strings become Exact, Works on nested dicts
for k, v in props.items():
if isinstance(v, str):
props[k] = Exact(v)
elif isinstance(v, dict):
props[k] = _str_to_exact(v)
elif v is None:
del props[k]

return props


class Query(object):
def __init__(self):
"""A query takes as input a graph, executes, and returns the next graph.

>>> G2 = query.execute_networkx(G)

Attributes
----------
result_nodes: Set[int]:
The set of node IDs which create the subgraph returned by the query.
result_edges: Set[Tuple[int, int, int]]:
The set of (u, v, k) tuples representing the edges which created the subgraph.
"""
# The resulting node IDs
self.result_nodes: Set[int] = set()

# The resulting edge IDs
self.result_edges: Set[Tuple[int, int, int]] = set()

# Set of queries that came before or after it.
self.downstream_query: Query = None
self.upstream_query: Query = None

self.upstream_nodes: Set[int] = set()
self.upstream_edges: Set[Tuple[int, int, int]] = set()

def get_upstream_results(self) -> Tuple[Set[int], Set[Tuple[int, int, int]]]:
return self.upstream_query.result_nodes, self.upstream_query.result_edges

def set_upstream_nodes(self): # pragma: no cover
self.upstream_nodes |= self.upstream_query.result_nodes
self.upstream_edges |= self.upstream_query.result_edges

def _test_values_with_lookups(
self,
value_to_test: Union[Node, Dict[str, Any]],
lookup_tests: Dict[str, Union[FieldLookup, Dict]],
) -> bool:
"""Tests a node or dictionary against a configuration of lookup_tests.

Parameters
----------
value_to_test : Union[Node, Dict[str, Any]]
The node or dict to test.
lookup_tests : Dict[str, FieldLookup]
The set of lookup_tests to test.

Returns
-------
bool
Did all of the tests pass?
"""

# Auto pass if no tests.s
if not lookup_tests:
return True

# Auto fail on empty value (given we have tests)
if not value_to_test:
return False

results: List[bool] = []

for attr_name, lookup in lookup_tests.items():
if isinstance(lookup, dict):
# recursivly check props against nested entrys (e.g is hashes dict in Process)
if isinstance(value_to_test, Node): # pragma: no cover
results.append(
self._test_values_with_lookups(
value_to_test=getattr(value_to_test, attr_name), lookup_tests=lookup
)
)
else:
results.append(
self._test_values_with_lookups(
value_to_test=value_to_test.get(attr_name, {}), lookup_tests=lookup
)
)
else:
if isinstance(value_to_test, Node):
results.append(lookup.test(getattr(value_to_test, attr_name)))
else:
results.append(lookup.test(value_to_test.get(attr_name)))

return any(results)

def execute_networkx(self, G: nx.Graph): # pragma: no cover
"""Execute a query against a `networkx` graph."""
raise NotImplementedError(f"NetworkX not supported for {self.__class__.__name__}")

def __rshift__(self, other: "Query") -> "Query":
"""Implements Self >> Other == self.downstream_query = other

Parameters
----------
other : Query
The other query to add.
"""
self.downstream_query = other
other.upstream_query = self
return other

def __lshift__(self, other: "Query") -> "Query":
"""Implements Self << Other == self.upstream_query = other

Parameters
----------
other : Query
The other query to add.
"""
other.downstream_query = self
self.upstream_query = other
return other

def __or__(self, other: "Query") -> "ChainedQuery":
"""Allows queries to be combined through the `|` operator.
The result of execution is the union of both subqueries.

>>> query1 = Query(...)
>>> query2 = Query(...)
>>> chained = query1 | query2


Parameters
----------
other: Query
The query to chain with.

Returns
-------
ChainedQuery
A chained query compromised of all three.
"""
return ChainedQuery(self, other)


class ChainedQuery(Query):
def __init__(self, *args: Query):
"""Executes multiple Querys, combining their outputs.

Parameters
----------
args: Query
One ore more queries
"""
self.queries = args
super().__init__()

def execute_networkx(self, G: nx.Graph) -> nx.Graph:
"""Executes multiple queries against a `nx.Graph` object, combining their outputs into one subgraph.

Parameters
----------
G : nx.Graph
Graph to execute queries against

Returns
-------
nx.Graph
Graph composed from the output graphs of the executed queries.
"""
# Get the subgraphs

subgraphs = []
for query in self.queries:
# Get the subgraphs
subgraphs.append(query.execute_networkx(G))

# add the reuslt_nodes, result_edges.
self.result_edges |= query.result_edges
self.result_nodes |= query.result_nodes

# Compose the subgraphs
H = subgraphs[0]
for subgraph in subgraphs[1:]:
H = nx.compose(H, subgraph)

return H
Loading