forked from apurvsinghgautam/robin
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm.py
More file actions
166 lines (132 loc) · 6.67 KB
/
llm.py
File metadata and controls
166 lines (132 loc) · 6.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import re
import openai
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from llm_utils import _llm_config_map, _common_llm_params
from config import OPENAI_API_KEY, ANTHROPIC_API_KEY, GOOGLE_API_KEY
import warnings
warnings.filterwarnings("ignore")
def get_llm(model_choice):
model_choice_lower = model_choice.lower()
# Look up the configuration in the map
config = _llm_config_map.get(model_choice_lower)
if config is None: # Extra error check
# Provide a helpful error message listing supported models
supported_models = list(_llm_config_map.keys())
raise ValueError(
f"Unsupported LLM model: '{model_choice}'. "
f"Supported models (case-insensitive match) are: {', '.join(supported_models)}"
)
# Extract the necessary information from the configuration
llm_class = config["class"]
model_specific_params = config["constructor_params"]
# Combine common parameters with model-specific parameters
# Model-specific parameters will override common ones if there are any conflicts
all_params = {**_common_llm_params, **model_specific_params}
# Create the LLM instance using the gathered parameters
llm_instance = llm_class(**all_params)
return llm_instance
def refine_query(llm, user_input):
system_prompt = """
You are a Cybercrime Threat Intelligence Expert. Your task is to refine the provided user query that needs to be sent to darkweb search engines.
Rules:
1. Analyze the user query and think about how it can be improved to use as search engine query
2. Refine the user query by adding or removing words so that it returns the best result from dark web search engines
3. Don't use any logical operators (AND, OR, etc.)
4. Output just the user query and nothing else
INPUT:
"""
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{query}")]
)
chain = prompt_template | llm | StrOutputParser()
return chain.invoke({"query": user_input})
def filter_results(llm, query, results):
if not results:
return []
system_prompt = """
You are a Cybercrime Threat Intelligence Expert. You are given a dark web search query and a list of search results in the form of index, link and title.
Your task is select the Top 20 relevant results that best match the search query for user to investigate more.
Rule:
1. Output ONLY atmost top 20 indices (comma-separated list) no more than that that best match the input query
Search Query: {query}
Search Results:
"""
final_str = _generate_final_string(results)
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{results}")]
)
chain = prompt_template | llm | StrOutputParser()
try:
result_indices = chain.invoke({"query": query, "results": final_str})
except openai.RateLimitError as e:
print(
f"Rate limit error: {e} \n Truncating to Web titles only with 30 characters"
)
final_str = _generate_final_string(results, truncate=True)
result_indices = chain.invoke({"query": query, "results": final_str})
# Select top_k results using original (non-truncated) results
top_results = [
results[i - 1]
for i in [int(item.strip()) for item in result_indices.split(",")]
]
return top_results
def _generate_final_string(results, truncate=False):
"""
Generate a formatted string from the search results for LLM processing.
"""
if truncate:
# Use only the first 35 characters of the title
max_title_length = 30
# Do not use link at all
max_link_length = 0
final_str = []
for i, res in enumerate(results):
# Truncate link at .onion for display
truncated_link = re.sub(r"(?<=\.onion).*", "", res["link"])
title = re.sub(r"[^0-9a-zA-Z\-\.]", " ", res["title"])
if truncated_link == "" and title == "":
continue
if truncate:
# Truncate title to max_title_length characters
title = (
title[:max_title_length] + "..."
if len(title) > max_title_length
else title
)
# Truncate link to max_link_length characters
truncated_link = (
truncated_link[:max_link_length] + "..."
if len(truncated_link) > max_link_length
else truncated_link
)
final_str.append(f"{i+1}. {truncated_link} - {title}")
return "\n".join(s for s in final_str)
def generate_summary(llm, query, content):
system_prompt = """
You are an Cybercrime Threat Intelligence Expert tasked with generating context-based technical investigative insights from dark web osint search engine results.
Rules:
1. Analyze the Darkweb OSINT data provided using links and their raw text.
2. Output the Source Links referenced for the analysis.
3. Provide a detailed, contextual, evidence-based technical analysis of the data.
4. Provide intellgience artifacts along with their context visible in the data.
5. The artifacts can include indicators like name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, TTPs, etc.
6. Generate 3-5 key insights based on the data.
7. Each insight should be specific, actionable, context-based, and data-driven.
8. Include suggested next steps and queries for investigating more on the topic.
9. Be objective and analytical in your assessment.
10. Ignore not safe for work texts from the analysis
Output Format:
1. Input Query: {query}
2. Source Links Referenced for Analysis - this heading will include all source links used for the analysis
3. Investigation Artifacts - this heading will include all technical artifacts identified including name, email, phone, cryptocurrency addresses, domains, darkweb markets, forum names, threat actor information, malware names, etc.
4. Key Insights
5. Next Steps - this includes next investigative steps including search queries to search more on a specific artifacts for example or any other topic.
Format your response in a structured way with clear section headings.
INPUT:
"""
prompt_template = ChatPromptTemplate(
[("system", system_prompt), ("user", "{content}")]
)
chain = prompt_template | llm | StrOutputParser()
return chain.invoke({"query": query, "content": content})