-
Notifications
You must be signed in to change notification settings - Fork 96
Expand file tree
/
Copy pathmain.py
More file actions
216 lines (179 loc) · 6.74 KB
/
main.py
File metadata and controls
216 lines (179 loc) · 6.74 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
from linebot.models import (
MessageEvent, TextSendMessage
)
from linebot.exceptions import (
InvalidSignatureError
)
from linebot.aiohttp_async_http_client import AiohttpAsyncHttpClient
from linebot import (
AsyncLineBotApi, WebhookParser
)
from fastapi import Request, FastAPI, HTTPException
import os
import sys
from io import BytesIO
import aiohttp
import uuid
from google.cloud import storage
# Import LangChain components with Vertex AI
from langchain_google_vertexai import ChatVertexAI
from langchain.schema.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
# get channel_secret and channel_access_token from your environment variable
channel_secret = os.getenv('ChannelSecret', None)
channel_access_token = os.getenv('ChannelAccessToken', None)
imgage_prompt = '''
Describe this image with scientific detail, reply in zh-TW:
'''
# Vertex AI needs a project ID and possibly authentication
google_project_id = os.getenv('GOOGLE_PROJECT_ID')
# Location for Vertex AI resources, e.g., "us-central1"
google_location = os.getenv('GOOGLE_LOCATION', 'us-central1')
# Google Cloud Storage bucket for image uploads
google_storage_bucket = os.getenv('GOOGLE_STORAGE_BUCKET', None)
if channel_secret is None:
print('Specify ChannelSecret as environment variable.')
sys.exit(1)
if channel_access_token is None:
print('Specify ChannelAccessToken as environment variable.')
sys.exit(1)
if google_project_id is None:
print('Specify GOOGLE_PROJECT_ID as environment variable.')
sys.exit(1)
if google_storage_bucket is None:
print('Specify GOOGLE_STORAGE_BUCKET as environment variable.')
sys.exit(1)
# Initialize the FastAPI app for LINEBot
app = FastAPI()
session = aiohttp.ClientSession()
async_http_client = AiohttpAsyncHttpClient(session)
line_bot_api = AsyncLineBotApi(channel_access_token, async_http_client)
parser = WebhookParser(channel_secret)
# Using a single, powerful multimodal model for both text and images.
# gemini-2.0-flash is a powerful, cost-effective model for multimodal tasks.
model = ChatVertexAI(
model_name="gemini-2.0-flash",
project=google_project_id,
location=google_location,
# Increased token limit for detailed image descriptions
max_output_tokens=2048
)
def upload_to_gcs(file_stream, file_name, bucket_name):
"""Uploads a file to the bucket."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(file_name)
blob.upload_from_file(file_stream, content_type='image/jpeg')
# Return the GCS URI
return f"gs://{bucket_name}/{file_name}"
except Exception as e:
print(f"Error uploading to GCS: {e}")
return None
def delete_from_gcs(bucket_name, blob_name):
"""Deletes a blob from the bucket."""
try:
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
blob.delete()
print(f"Blob {blob_name} deleted from bucket {bucket_name}.")
except Exception as e:
print(f"Error deleting from GCS: {e}")
@app.post("/")
async def handle_callback(request: Request):
signature = request.headers['X-Line-Signature']
# get request body as text
body = await request.body()
body = body.decode()
try:
events = parser.parse(body, signature)
except InvalidSignatureError:
raise HTTPException(status_code=400, detail="Invalid signature")
for event in events:
if not isinstance(event, MessageEvent):
continue
if (event.message.type == "text"):
# Process text message using LangChain with Vertex AI
msg = event.message.text
user_id = event.source.user_id
print(f"Received message: {msg} from user: {user_id}")
response = generate_text_with_langchain(
f'{msg}, reply in zh-TW:'
)
reply_msg = TextSendMessage(text=response)
await line_bot_api.reply_message(
event.reply_token,
reply_msg
)
elif (event.message.type == "image"):
user_id = event.source.user_id
print(f"Received image from user: {user_id}")
message_content = await line_bot_api.get_message_content(
event.message.id
)
# Asynchronously read all content chunks into a byte string
image_bytes = b''
async for chunk in message_content.iter_content():
image_bytes += chunk
# Create an in-memory binary stream from the bytes
image_stream = BytesIO(image_bytes)
# Reset the stream's pointer to the beginning for the upload
image_stream.seek(0)
file_name = f"{uuid.uuid4()}.jpg"
gcs_uri = None
# Default error message
response = "抱歉,處理您的圖片時發生錯誤。"
try:
gcs_uri = upload_to_gcs(
image_stream, file_name, google_storage_bucket)
if gcs_uri:
print(f"Image uploaded to {gcs_uri}")
response = generate_image_description(gcs_uri)
finally:
# Clean up the GCS file if it was uploaded
if gcs_uri:
delete_from_gcs(google_storage_bucket, file_name)
reply_msg = TextSendMessage(text=response)
await line_bot_api.reply_message(
event.reply_token,
reply_msg
)
else:
continue
return 'OK'
def generate_text_with_langchain(prompt):
"""
Generate a text completion using LangChain with Vertex AI model.
"""
# Create a chat prompt template with system instructions
prompt_template = ChatPromptTemplate.from_messages([
SystemMessage(
content="You are a helpful assistant that responds in "
"Traditional Chinese (zh-TW)."
),
HumanMessage(content=prompt)
])
# Format the prompt and call the model
formatted_prompt = prompt_template.format_messages()
response = model.invoke(formatted_prompt)
return response.content
def generate_image_description(image_uri):
"""
Generate a description for an image using LangChain with Vertex AI.
"""
# The prompt is already defined globally as imgage_prompt
message = HumanMessage(
content=[
{
"type": "text",
"text": imgage_prompt
},
{
"type": "image_url",
"image_url": {"url": image_uri}
},
]
)
response = model.invoke([message])
return response.content