diff --git a/cmd/gateway.go b/cmd/gateway.go index ff39bb2ee..ceded7826 100644 --- a/cmd/gateway.go +++ b/cmd/gateway.go @@ -21,6 +21,7 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/channels/discord" "github.com/nextlevelbuilder/goclaw/internal/channels/feishu" slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack" + linechannel "github.com/nextlevelbuilder/goclaw/internal/channels/line" "github.com/nextlevelbuilder/goclaw/internal/channels/telegram" "github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp" "github.com/nextlevelbuilder/goclaw/internal/channels/zalo" @@ -562,6 +563,7 @@ func runGateway() { instanceLoader.RegisterFactory(channels.TypeZaloPersonal, zalopersonal.FactoryWithPendingStore(pgStores.PendingMessages)) instanceLoader.RegisterFactory(channels.TypeWhatsApp, whatsapp.Factory) instanceLoader.RegisterFactory(channels.TypeSlack, slackchannel.FactoryWithPendingStore(pgStores.PendingMessages)) + instanceLoader.RegisterFactory(channels.TypeLine, linechannel.Factory) if err := instanceLoader.LoadAll(context.Background()); err != nil { slog.Error("failed to load channel instances from DB", "error", err) } diff --git a/cmd/gateway_channels_setup.go b/cmd/gateway_channels_setup.go index c60b34b06..2dc6fec1d 100644 --- a/cmd/gateway_channels_setup.go +++ b/cmd/gateway_channels_setup.go @@ -14,6 +14,7 @@ import ( "github.com/nextlevelbuilder/goclaw/internal/channels/discord" "github.com/nextlevelbuilder/goclaw/internal/channels/feishu" slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack" + linechannel "github.com/nextlevelbuilder/goclaw/internal/channels/line" "github.com/nextlevelbuilder/goclaw/internal/channels/telegram" "github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp" "github.com/nextlevelbuilder/goclaw/internal/channels/zalo" @@ -95,6 +96,16 @@ func registerConfigChannels(cfg *config.Config, channelMgr *channels.Manager, ms } else { channelMgr.RegisterChannel(channels.TypeFeishu, f) slog.Info("feishu/lark channel enabled (config)") + + if cfg.Channels.Line.Enabled && cfg.Channels.Line.ChannelAccessToken != "" && instanceLoader == nil { + l, err := linechannel.New(cfg.Channels.Line, msgBus, pgStores.Pairing) + if err != nil { + slog.Error("failed to initialize line channel", "error", err) + } else { + channelMgr.RegisterChannel(channels.TypeLine, l) + slog.Info("line channel enabled (config)") + } + } } } } diff --git a/go.mod b/go.mod index 456ae5f06..4051175f5 100644 --- a/go.mod +++ b/go.mod @@ -88,6 +88,7 @@ require ( github.com/leaanthony/gosod v1.0.4 // indirect github.com/leaanthony/slicer v1.6.0 // indirect github.com/leaanthony/u v1.1.1 // indirect + github.com/line/line-bot-sdk-go/v7 v7.21.0 // indirect github.com/lucasb-eyer/go-colorful v1.2.0 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index 00c05f1d6..8ddfd1c08 100644 --- a/go.sum +++ b/go.sum @@ -279,6 +279,8 @@ github.com/leaanthony/u v1.1.1 h1:TUFjwDGlNX+WuwVEzDqQwC2lOv0P4uhTQw7CMFdiK7M= github.com/leaanthony/u v1.1.1/go.mod h1:9+o6hejoRljvZ3BzdYlVL0JYCwtnAsVuN9pVTQcaRfI= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/line/line-bot-sdk-go/v7 v7.21.0 h1:eeYMuAwaDV5DZNTRqDipNhzjT51HwEcM1PRPG+cqh4Y= +github.com/line/line-bot-sdk-go/v7 v7.21.0/go.mod h1:idpoxOZgtSd8JyhctMMpwg5LNgRAIL/QIxa5S0DXcMg= github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= diff --git a/internal/channels/channel.go b/internal/channels/channel.go index c4bf45703..d861ce09d 100644 --- a/internal/channels/channel.go +++ b/internal/channels/channel.go @@ -63,6 +63,7 @@ const ( TypeWhatsApp = "whatsapp" TypeZaloOA = "zalo_oa" TypeZaloPersonal = "zalo_personal" + TypeLine = "line" ) // Channel defines the interface that all channel implementations must satisfy. diff --git a/internal/channels/line/audio.go b/internal/channels/line/audio.go new file mode 100644 index 000000000..12ce3fd36 --- /dev/null +++ b/internal/channels/line/audio.go @@ -0,0 +1,179 @@ +package line + +import ( + "encoding/json" + "fmt" + "io" + "log/slog" + "os" + "path/filepath" + "time" + + "github.com/line/line-bot-sdk-go/v7/linebot" +) + +// audioContentTypeToExt maps LINE audio Content-Type headers to file extensions. +// Unknown types fall back to ".bin"; the km-meeting-pipeline.sh upload cron +// will skip non-audio extensions, so unknown files do not enter the pipeline. +var audioContentTypeToExt = map[string]string{ + "audio/m4a": ".m4a", + "audio/mp4": ".m4a", + "audio/aac": ".aac", + "audio/mp3": ".mp3", + "audio/mpeg": ".mp3", + "audio/wav": ".wav", + "audio/wave": ".wav", + "audio/x-wav": ".wav", + "audio/ogg": ".ogg", + "audio/opus": ".opus", +} + +// extensionForAudio returns a file extension for the given audio Content-Type. +// Returns ".bin" and false for unknown types so the caller can log a warning. +func extensionForAudio(contentType string) (string, bool) { + if ext, ok := audioContentTypeToExt[contentType]; ok { + return ext, true + } + return ".bin", false +} + +// senderShort returns up to 8 leading chars from the LINE user ID. +// Falls back to "unknown" if userID is empty (group messages where the +// sender ID is unavailable). +func senderShort(userID string) string { + if userID == "" { + return "unknown" + } + if len(userID) <= 8 { + return userID + } + return userID[:8] +} + +// audioSidecar is the JSON written next to each ingested audio file so the +// km pipeline can preserve LINE provenance through the rest of the pipeline. +type audioSidecar struct { + SourceType string `json:"source_type"` + LineUserID string `json:"line_user_id"` + LineChatID string `json:"line_chat_id"` + LineMessageID string `json:"line_message_id"` + OriginalContentType string `json:"original_content_type"` + ReceivedAt string `json:"received_at"` +} + +// downloadAudioContent fetches a LINE audio message body to a temp file and +// returns the file path plus the response Content-Type. The caller is +// responsible for moving / renaming the file out of the temp area. +func (c *Channel) downloadAudioContent(messageID string) (string, string, error) { + resp, err := c.bot.GetMessageContent(messageID).Do() + if err != nil { + return "", "", fmt.Errorf("get audio content: %w", err) + } + defer resp.Content.Close() + + tmp, err := os.CreateTemp("", "line-audio-*") + if err != nil { + return "", "", fmt.Errorf("create temp file: %w", err) + } + + if _, err := io.Copy(tmp, resp.Content); err != nil { + tmp.Close() + os.Remove(tmp.Name()) + return "", "", fmt.Errorf("write audio: %w", err) + } + if err := tmp.Close(); err != nil { + os.Remove(tmp.Name()) + return "", "", fmt.Errorf("close temp file: %w", err) + } + + return tmp.Name(), resp.ContentType, nil +} + +// ingestLineAudio downloads an AudioMessage, names it per the meeting-pipeline +// convention, and moves it into /data/km/meetings/inbox/. A sidecar JSON is +// written alongside so the downstream cron knows the LINE provenance. +// +// This function deliberately does NOT call HandleMessage — audio messages have +// no text content for the GoClaw agent and would just confuse it. The +// downstream km-meeting-pipeline.sh cron will pick up the file and run it +// through ffmpeg compression + nlm transcription independently. +func (c *Channel) ingestLineAudio(msg *linebot.AudioMessage, userID, chatID string) error { + tmpPath, contentType, err := c.downloadAudioContent(msg.ID) + if err != nil { + return fmt.Errorf("download: %w", err) + } + + ext, known := extensionForAudio(contentType) + if !known { + slog.Warn("LINE audio: unknown content type, falling back to .bin", + "content_type", contentType, "message_id", msg.ID) + } + + now := time.Now() + fileName := fmt.Sprintf("%s_%s_line_%s%s", + now.Format("20060102"), + now.Format("150405"), + senderShort(userID), + ext, + ) + finalPath := filepath.Join(meetingsInboxDir, fileName) + + if err := os.MkdirAll(meetingsInboxDir, 0o755); err != nil { + os.Remove(tmpPath) + return fmt.Errorf("ensure inbox dir: %w", err) + } + + if err := os.Rename(tmpPath, finalPath); err != nil { + // Cross-device rename can fail (e.g. /tmp on tmpfs, /data on disk). + // Fall back to copy + remove. + if cerr := copyFile(tmpPath, finalPath); cerr != nil { + os.Remove(tmpPath) + return fmt.Errorf("move to inbox: %w", cerr) + } + os.Remove(tmpPath) + } + + sidecar := audioSidecar{ + SourceType: "line_audio", + LineUserID: userID, + LineChatID: chatID, + LineMessageID: msg.ID, + OriginalContentType: contentType, + ReceivedAt: now.UTC().Format(time.RFC3339), + } + sidecarPath := filepath.Join(meetingsInboxDir, fileName+".source.json") + if data, jerr := json.MarshalIndent(sidecar, "", " "); jerr == nil { + if werr := os.WriteFile(sidecarPath, data, 0o644); werr != nil { + slog.Warn("LINE audio: failed to write sidecar", + "err", werr, "path", sidecarPath) + } + } + + slog.Info("LINE audio: ingested", + "file", fileName, + "content_type", contentType, + "sender", senderShort(userID), + "chat", chatID, + ) + return nil +} + +// copyFile is a fallback for cross-device rename failures. +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + if _, err := io.Copy(out, in); err != nil { + return err + } + return out.Sync() +} diff --git a/internal/channels/line/channel.go b/internal/channels/line/channel.go new file mode 100644 index 000000000..16fd6d613 --- /dev/null +++ b/internal/channels/line/channel.go @@ -0,0 +1,89 @@ +package line + +import ( + "context" + "log/slog" + "net/http" + "sync" + "time" + + "github.com/line/line-bot-sdk-go/v7/linebot" + + "github.com/nextlevelbuilder/goclaw/internal/bus" + "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/store" +) + +// replyTokenEntry caches a reply token with its receive time. +type replyTokenEntry struct { + token string + receivedAt time.Time +} + +// Channel implements the LINE Messaging API channel. +type Channel struct { + *channels.BaseChannel + bot *linebot.Client + cfg config.LineConfig + pairingService store.PairingStore + replyTokens sync.Map // chatID → replyTokenEntry +} + +// New creates a new LINE channel. +func New(cfg config.LineConfig, msgBus *bus.MessageBus, pairingSvc store.PairingStore) (*Channel, error) { + bot, err := linebot.New(cfg.ChannelSecret, cfg.ChannelAccessToken) + if err != nil { + return nil, err + } + + base := channels.NewBaseChannel("line", msgBus, cfg.AllowFrom) + base.ValidatePolicy(cfg.DMPolicy, cfg.GroupPolicy) + + return &Channel{ + BaseChannel: base, + bot: bot, + cfg: cfg, + pairingService: pairingSvc, + }, nil +} + +// Type returns the channel type. +func (c *Channel) Type() string { return "line" } + +// Start begins listening. Webhook mode — nothing to poll. +func (c *Channel) Start(_ context.Context) error { + c.SetRunning(true) + slog.Info("LINE channel started (webhook mode)") + return nil +} + +// Stop shuts down the channel. +func (c *Channel) Stop(_ context.Context) error { + c.SetRunning(false) + slog.Info("LINE channel stopped") + return nil +} + +// WebhookHandler returns the HTTP path and handler for LINE webhook callbacks. +func (c *Channel) WebhookHandler() (string, http.Handler) { + return "/webhook/line", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + events, err := c.bot.ParseRequest(r) + if err != nil { + if err == linebot.ErrInvalidSignature { + slog.Warn("LINE webhook: invalid signature") + w.WriteHeader(http.StatusBadRequest) + } else { + slog.Error("LINE webhook: parse error", "err", err) + w.WriteHeader(http.StatusInternalServerError) + } + return + } + + for _, event := range events { + go c.handleEvent(event) + } + + w.WriteHeader(http.StatusOK) + }) +} diff --git a/internal/channels/line/constants.go b/internal/channels/line/constants.go new file mode 100644 index 000000000..9b4f4979d --- /dev/null +++ b/internal/channels/line/constants.go @@ -0,0 +1,19 @@ +package line + +const ( + maxTextLength = 5000 + maxReplyMessages = 5 + replyTokenTTL = 25 // seconds, buffer before 30s expiry + loadingSeconds = 60 + loadingAPIURL = "https://api.line.me/v2/bot/chat/loading/start" + + // km-meeting-pipeline integration: where AudioMessage and ingest-gdrive + // downloads land. The km-meeting-pipeline.sh upload cron picks files up + // from here. + meetingsInboxDir = "/data/km/meetings/inbox" + + // Default location of km-meeting-pipeline.sh on the production VPS. + // Override at runtime via KM_MEETING_PIPELINE_SCRIPT env var + // (used by dev hosts and integration tests). + meetingsPipelineScript = "/home/ubuntu/odoo_dev/esmith-specs/scripts/km-meeting-pipeline.sh" +) diff --git a/internal/channels/line/factory.go b/internal/channels/line/factory.go new file mode 100644 index 000000000..ba23c3b0f --- /dev/null +++ b/internal/channels/line/factory.go @@ -0,0 +1,70 @@ +package line + +import ( + "encoding/json" + "fmt" + + "github.com/nextlevelbuilder/goclaw/internal/bus" + "github.com/nextlevelbuilder/goclaw/internal/channels" + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/store" +) + +// lineCreds maps the credentials JSON from the channel_instances table. +type lineCreds struct { + ChannelAccessToken string `json:"channel_access_token"` + ChannelSecret string `json:"channel_secret"` +} + +// lineInstanceConfig maps the non-secret config JSONB from the channel_instances table. +type lineInstanceConfig struct { + AllowFrom []string `json:"allow_from,omitempty"` + DMPolicy string `json:"dm_policy,omitempty"` + GroupPolicy string `json:"group_policy,omitempty"` +} + +// Factory creates a LINE channel from DB instance data. +func Factory(name string, creds json.RawMessage, cfg json.RawMessage, + msgBus *bus.MessageBus, pairingSvc store.PairingStore) (channels.Channel, error) { + + var c lineCreds + if len(creds) > 0 { + if err := json.Unmarshal(creds, &c); err != nil { + return nil, fmt.Errorf("decode line credentials: %w", err) + } + } + if c.ChannelAccessToken == "" || c.ChannelSecret == "" { + return nil, fmt.Errorf("line channel_access_token and channel_secret are required") + } + + var ic lineInstanceConfig + if len(cfg) > 0 { + if err := json.Unmarshal(cfg, &ic); err != nil { + return nil, fmt.Errorf("decode line config: %w", err) + } + } + + lineCfg := config.LineConfig{ + Enabled: true, + ChannelAccessToken: c.ChannelAccessToken, + ChannelSecret: c.ChannelSecret, + AllowFrom: ic.AllowFrom, + DMPolicy: ic.DMPolicy, + GroupPolicy: ic.GroupPolicy, + } + + ch, err := New(lineCfg, msgBus, pairingSvc) + if err != nil { + return nil, err + } + ch.SetName(name) + return ch, nil +} + +// FactoryWithPendingStore returns a ChannelFactory with persistent history support. +func FactoryWithPendingStore(_ store.PendingMessageStore) channels.ChannelFactory { + return func(name string, creds json.RawMessage, cfg json.RawMessage, + msgBus *bus.MessageBus, pairingSvc store.PairingStore) (channels.Channel, error) { + return Factory(name, creds, cfg, msgBus, pairingSvc) + } +} diff --git a/internal/channels/line/format.go b/internal/channels/line/format.go new file mode 100644 index 000000000..ee791aef6 --- /dev/null +++ b/internal/channels/line/format.go @@ -0,0 +1,38 @@ +package line + +import "strings" + +// splitMessage splits text into chunks of at most maxLen bytes, +// preferring to break at newline boundaries. +func splitMessage(text string, maxLen int) []string { + if len(text) <= maxLen { + return []string{text} + } + + var parts []string + for len(text) > 0 { + if len(text) <= maxLen { + parts = append(parts, text) + break + } + // Try to split at the last newline within maxLen. + cut := maxLen + if idx := strings.LastIndex(text[:maxLen], "\n"); idx > 0 { + cut = idx + 1 + } + parts = append(parts, text[:cut]) + text = text[cut:] + } + return parts +} + +// formatForLINE removes markdown formatting that LINE does not render. +func formatForLINE(text string) string { + // Remove bold markers: **text** → text + text = strings.ReplaceAll(text, "**", "") + // Remove italic markers: *text* → text (single asterisks remaining after bold removal) + // Skip — single asterisks are ambiguous and may be intentional list bullets. + // Remove inline code: `text` → text + text = strings.ReplaceAll(text, "`", "") + return text +} diff --git a/internal/channels/line/gdrive.go b/internal/channels/line/gdrive.go new file mode 100644 index 000000000..cd851ce91 --- /dev/null +++ b/internal/channels/line/gdrive.go @@ -0,0 +1,167 @@ +package line + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "log/slog" + "os" + "os/exec" + "regexp" +) + +// gdriveURLRegex finds GDrive shared file URLs anywhere in a text body. +// Captures everything from "http(s)://drive.google.com/file/d/{id}" up to +// the next whitespace, so query strings like ?usp=drivesdk are included +// in the URL passed to the script. +// +// The script does its own file_id extraction; we forward the full URL so +// any user-facing error message can quote what they actually pasted. +var gdriveURLRegex = regexp.MustCompile( + `https?://drive\.google\.com/file/d/[A-Za-z0-9_-]+(?:[/?][^\s]*)?`, +) + +// extractGdriveURLs returns all GDrive URLs found in a message body. +// Returns nil (not empty slice) when none are found so callers can +// early-return cheaply on the common no-link case. +func extractGdriveURLs(text string) []string { + if text == "" { + return nil + } + return gdriveURLRegex.FindAllString(text, -1) +} + +// ingestGdriveResult mirrors the JSON shape that km-meeting-pipeline.sh +// emits on stdout. Both success and failure are JSON; only the populated +// fields differ. +// +// Success: {"status":"ok","file":"...","method":"...","size":N,"file_id":"..."} +// Failure: {"error":"","file_id":"...","message":"..."} +type ingestGdriveResult struct { + Status string `json:"status,omitempty"` + Error string `json:"error,omitempty"` + FileID string `json:"file_id,omitempty"` + Message string `json:"message,omitempty"` + File string `json:"file,omitempty"` + Method string `json:"method,omitempty"` + Size int64 `json:"size,omitempty"` +} + +// gdriveErrorMessages maps error types from km-meeting-pipeline.sh to +// user-facing Traditional Chinese reply text. Unknown error types fall +// through to a generic message in replyGdriveError. +var gdriveErrorMessages = map[string]string{ + "invalid_url": "無法解析這個 GDrive 連結,請貼完整的 https://drive.google.com/file/d/... 連結", + "not_accessible": "無法存取此連結。請確認檔案已分享給 stanleykao72@gmail.com,或將分享權限改為「知道連結的人皆可檢視」", + "download_failed": "下載失敗,請稍後再試或聯絡管理員", +} + +// ingestGdriveLinks processes every GDrive URL in a LINE message body. +// Each URL is handed to km-meeting-pipeline.sh ingest-gdrive in sequence +// (not in parallel — the script does I/O-bound rclone/gdown work and +// running them concurrently buys little). On failure, replies to the +// user with an actionable Chinese message. Successes are deliberately +// silent (per design D Open Q1 — avoid LINE notification spam). +// +// Safe to call as `go c.ingestGdriveLinks(...)` from the webhook handler; +// the LINE event return path is not blocked by ingestion latency. +// +// Note on reply token race: the agent's HandleMessage path will also try +// to use the cached reply token. Whichever finishes first wins; the loser +// silently falls back to PushMessage. Both messages reach the user. +func (c *Channel) ingestGdriveLinks(text, chatID string) { + urls := extractGdriveURLs(text) + if len(urls) == 0 { + return + } + + scriptPath := getMeetingPipelineScript() + + for _, url := range urls { + slog.Info("LINE: ingesting GDrive link", "url", url, "chat", chatID) + result, err := runIngestGdrive(scriptPath, url) + if err != nil { + slog.Error("LINE: ingest-gdrive runner error", + "err", err, "url", url, "chat", chatID) + c.replyGdriveError(chatID, "download_failed", err.Error()) + continue + } + if result.Status == "ok" { + slog.Info("LINE: GDrive ingest success", + "file", result.File, + "method", result.Method, + "size", result.Size, + "chat", chatID, + ) + continue + } + // Non-ok result: result.Error is one of invalid_url / + // not_accessible / download_failed. + slog.Warn("LINE: GDrive ingest failed", + "error", result.Error, + "message", result.Message, + "file_id", result.FileID, + "chat", chatID, + ) + c.replyGdriveError(chatID, result.Error, result.Message) + } +} + +// runIngestGdrive executes the meeting-pipeline script and parses its +// stdout JSON. The script's contract is "always emit JSON on stdout, even +// on exit code 1", so a non-zero exit is not necessarily a runner error. +func runIngestGdrive(scriptPath, url string) (*ingestGdriveResult, error) { + cmd := exec.Command("bash", scriptPath, "ingest-gdrive", url) + out, runErr := cmd.Output() + + // Even on exit 1, the script should have written JSON to stdout. + // Only when stdout is empty do we treat runErr as authoritative. + if len(bytes.TrimSpace(out)) == 0 { + if runErr != nil { + var ee *exec.ExitError + if errors.As(runErr, &ee) { + return nil, fmt.Errorf("script exited %d with no stdout (stderr: %s)", + ee.ExitCode(), string(ee.Stderr)) + } + return nil, fmt.Errorf("script execution failed: %w", runErr) + } + return nil, errors.New("script returned empty stdout") + } + + var result ingestGdriveResult + if jerr := json.Unmarshal(bytes.TrimSpace(out), &result); jerr != nil { + return nil, fmt.Errorf("parse stdout JSON: %w (raw: %q)", jerr, string(out)) + } + return &result, nil +} + +// replyGdriveError sends a single LINE message explaining the failure. +// Uses the same reply-token-then-push fallback as the main agent send +// path via sendChunks. +func (c *Channel) replyGdriveError(chatID, errType, detail string) { + msg, ok := gdriveErrorMessages[errType] + if !ok { + msg = "GDrive 連結處理失敗:" + errType + } + // For generic download failures, append a short technical detail so + // the user has something to forward to maintainers. Cap length to + // keep the LINE bubble readable. + if errType == "download_failed" && detail != "" && len(detail) < 200 { + msg = msg + "\n(技術細節:" + detail + ")" + } + + if err := c.sendChunks(chatID, []string{msg}); err != nil { + slog.Error("LINE: failed to send GDrive error reply", + "err", err, "chat", chatID, "error_type", errType) + } +} + +// getMeetingPipelineScript returns the absolute path of km-meeting-pipeline.sh, +// honoring the KM_MEETING_PIPELINE_SCRIPT env var override. +func getMeetingPipelineScript() string { + if v := os.Getenv("KM_MEETING_PIPELINE_SCRIPT"); v != "" { + return v + } + return meetingsPipelineScript +} diff --git a/internal/channels/line/gdrive_test.go b/internal/channels/line/gdrive_test.go new file mode 100644 index 000000000..7685207fe --- /dev/null +++ b/internal/channels/line/gdrive_test.go @@ -0,0 +1,89 @@ +package line + +import ( + "reflect" + "testing" +) + +func TestExtractGdriveURLs(t *testing.T) { + cases := []struct { + name string + in string + want []string + }{ + { + name: "single view URL", + in: "https://drive.google.com/file/d/1-quJG9lECE2LvznAvzWQ8D7CDSwAob9H/view", + want: []string{"https://drive.google.com/file/d/1-quJG9lECE2LvznAvzWQ8D7CDSwAob9H/view"}, + }, + { + name: "view + usp=sharing", + in: "https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR/view?usp=sharing", + want: []string{"https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR/view?usp=sharing"}, + }, + { + name: "view + usp=drivesdk", + in: "https://drive.google.com/file/d/1-quJG9lECE2LvznAvzWQ8D7CDSwAob9H/view?usp=drivesdk", + want: []string{"https://drive.google.com/file/d/1-quJG9lECE2LvznAvzWQ8D7CDSwAob9H/view?usp=drivesdk"}, + }, + { + name: "edit URL", + in: "https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR-stu_VWXYZ/edit", + want: []string{"https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR-stu_VWXYZ/edit"}, + }, + { + name: "URL inside Chinese text", + in: "你看一下 https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR/view 這個會議錄音", + want: []string{"https://drive.google.com/file/d/1abc_DEF-ghi_JKL-mno_PQR/view"}, + }, + { + name: "http (not https)", + in: "http://drive.google.com/file/d/1abcDEFghiJKLmnoPQRstuVWX/view", + want: []string{"http://drive.google.com/file/d/1abcDEFghiJKLmnoPQRstuVWX/view"}, + }, + { + name: "two URLs in one message", + in: "上午會議 https://drive.google.com/file/d/1AAAAAAAAAAAAAAAAAAAAAAAA/view " + + "下午會議 https://drive.google.com/file/d/1BBBBBBBBBBBBBBBBBBBBBBBB/view", + want: []string{ + "https://drive.google.com/file/d/1AAAAAAAAAAAAAAAAAAAAAAAA/view", + "https://drive.google.com/file/d/1BBBBBBBBBBBBBBBBBBBBBBBB/view", + }, + }, + { + name: "URL ends at newline", + in: "Top line https://drive.google.com/file/d/1XXXXXXXXXXXXXXXXXXXXXXXX/view\n" + + "Other content", + want: []string{"https://drive.google.com/file/d/1XXXXXXXXXXXXXXXXXXXXXXXX/view"}, + }, + { + name: "non-GDrive URL", + in: "https://example.com/file/d/12345", + want: nil, + }, + { + name: "GDrive folder URL (not file)", + in: "https://drive.google.com/drive/folders/1abc", + want: nil, + }, + { + name: "empty input", + in: "", + want: nil, + }, + { + name: "no URL at all", + in: "hello world", + want: nil, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := extractGdriveURLs(tc.in) + if !reflect.DeepEqual(got, tc.want) { + t.Errorf("extractGdriveURLs(%q)\n got: %v\n want: %v", tc.in, got, tc.want) + } + }) + } +} diff --git a/internal/channels/line/handlers.go b/internal/channels/line/handlers.go new file mode 100644 index 000000000..ec360ec15 --- /dev/null +++ b/internal/channels/line/handlers.go @@ -0,0 +1,158 @@ +package line + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log/slog" + "net/http" + "os" + "path/filepath" + "time" + + "github.com/line/line-bot-sdk-go/v7/linebot" +) + +// handleEvent dispatches a single LINE webhook event. +func (c *Channel) handleEvent(event *linebot.Event) { + if event.Type != linebot.EventTypeMessage { + return + } + + // Determine sender and chat IDs. + var userID, chatID, peerKind string + switch event.Source.Type { + case linebot.EventSourceTypeUser: + userID = event.Source.UserID + chatID = event.Source.UserID + peerKind = "direct" + case linebot.EventSourceTypeGroup: + userID = event.Source.UserID + chatID = event.Source.GroupID + peerKind = "group" + case linebot.EventSourceTypeRoom: + userID = event.Source.UserID + chatID = event.Source.RoomID + peerKind = "group" + default: + return + } + + senderID := "line:" + userID + + // Policy check. + if !c.CheckPolicy(peerKind, c.cfg.DMPolicy, c.cfg.GroupPolicy, senderID) { + slog.Debug("LINE: message rejected by policy", "sender", senderID, "peerKind", peerKind) + return + } + + // Send loading animation (best-effort). + go c.sendLoadingAnimation(chatID) + + // Cache reply token. + c.replyTokens.Store(chatID, replyTokenEntry{ + token: event.ReplyToken, + receivedAt: time.Now(), + }) + + var text string + var mediaFiles []string + + switch msg := event.Message.(type) { + case *linebot.TextMessage: + text = msg.Text + // Process any GDrive shared links in the message body asynchronously. + // The agent still sees the original text via HandleMessage below — a + // user message like "請整理 https://drive..." should still get an + // agent acknowledgment, with the actual file ingestion happening in + // parallel. Successes are silent; failures reply via LINE. + go c.ingestGdriveLinks(msg.Text, chatID) + case *linebot.ImageMessage: + path, err := c.downloadContent(msg.ID) + if err != nil { + slog.Error("LINE: failed to download image", "err", err) + return + } + mediaFiles = append(mediaFiles, path) + case *linebot.AudioMessage: + // Audio messages bypass the agent and go straight to the + // km-meeting-pipeline inbox. The downstream cron handles ffmpeg + // compression and nlm transcription independently. + if err := c.ingestLineAudio(msg, userID, chatID); err != nil { + slog.Error("LINE: failed to ingest audio", "err", err, "message_id", msg.ID) + } + return + default: + // Unsupported message type — ignore. + return + } + + metadata := map[string]string{ + "reply_token": event.ReplyToken, + } + + c.HandleMessage(senderID, chatID, text, mediaFiles, metadata, peerKind) +} + +// downloadContent downloads message content to a temp file and returns the path. +func (c *Channel) downloadContent(messageID string) (string, error) { + resp, err := c.bot.GetMessageContent(messageID).Do() + if err != nil { + return "", fmt.Errorf("get message content: %w", err) + } + defer resp.Content.Close() + + tmpFile, err := os.CreateTemp("", "line-media-*") + if err != nil { + return "", fmt.Errorf("create temp file: %w", err) + } + defer tmpFile.Close() + + if _, err := io.Copy(tmpFile, resp.Content); err != nil { + os.Remove(tmpFile.Name()) + return "", fmt.Errorf("write media: %w", err) + } + + // Rename with proper extension based on content type. + ext := ".jpg" // default + if ct := resp.ContentType; ct != "" { + switch { + case ct == "image/png": + ext = ".png" + case ct == "image/gif": + ext = ".gif" + } + } + finalPath := tmpFile.Name() + ext + if err := os.Rename(tmpFile.Name(), finalPath); err != nil { + return tmpFile.Name(), nil // fallback to original name + } + return filepath.Clean(finalPath), nil +} + +// sendLoadingAnimation sends a loading indicator to the chat via LINE API. +func (c *Channel) sendLoadingAnimation(chatID string) { + body, _ := json.Marshal(map[string]interface{}{ + "chatId": chatID, + "loadingSeconds": loadingSeconds, + }) + + req, err := http.NewRequest(http.MethodPost, loadingAPIURL, bytes.NewReader(body)) + if err != nil { + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+c.cfg.ChannelAccessToken) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + slog.Warn("LINE: loading animation request failed", "err", err) + return + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + slog.Warn("LINE: loading animation error", "status", resp.StatusCode, "body", string(respBody)) + } +} diff --git a/internal/channels/line/send.go b/internal/channels/line/send.go new file mode 100644 index 000000000..c3b5cbc15 --- /dev/null +++ b/internal/channels/line/send.go @@ -0,0 +1,76 @@ +package line + +import ( + "context" + "log/slog" + "time" + + "github.com/line/line-bot-sdk-go/v7/linebot" + + "github.com/nextlevelbuilder/goclaw/internal/bus" +) + +// Send delivers an outbound message to a LINE chat. +func (c *Channel) Send(_ context.Context, msg bus.OutboundMessage) error { + chatID := msg.ChatID + text := formatForLINE(msg.Content) + + if text == "" && len(msg.Media) == 0 { + return nil + } + + // Send text messages. + if text != "" { + chunks := splitMessage(text, maxTextLength) + if err := c.sendChunks(chatID, chunks); err != nil { + return err + } + } + + // Send media attachments. + for _, m := range msg.Media { + imgMsg := linebot.NewImageMessage(m.URL, m.URL) + if _, err := c.bot.PushMessage(chatID, imgMsg).Do(); err != nil { + slog.Error("LINE: failed to push media", "chatID", chatID, "err", err) + } + } + + return nil +} + +// sendChunks sends text chunks, using reply token if available and fresh. +func (c *Channel) sendChunks(chatID string, chunks []string) error { + // Try reply token first. + if entry, ok := c.replyTokens.LoadAndDelete(chatID); ok { + e := entry.(replyTokenEntry) + if time.Since(e.receivedAt).Seconds() < float64(replyTokenTTL) { + // Build up to maxReplyMessages. + var msgs []linebot.SendingMessage + for i, chunk := range chunks { + if i >= maxReplyMessages { + break + } + msgs = append(msgs, linebot.NewTextMessage(chunk)) + } + if _, err := c.bot.ReplyMessage(e.token, msgs...).Do(); err != nil { + slog.Warn("LINE: reply failed, falling back to push", "err", err) + } else { + // If we sent all chunks via reply, done. + if len(chunks) <= maxReplyMessages { + return nil + } + // Push remaining chunks. + chunks = chunks[maxReplyMessages:] + } + } + } + + // Push remaining chunks one by one. + for _, chunk := range chunks { + if _, err := c.bot.PushMessage(chatID, linebot.NewTextMessage(chunk)).Do(); err != nil { + slog.Error("LINE: push failed", "chatID", chatID, "err", err) + return err + } + } + return nil +} diff --git a/internal/config/config_channels.go b/internal/config/config_channels.go index db310a851..a1898cda5 100644 --- a/internal/config/config_channels.go +++ b/internal/config/config_channels.go @@ -20,6 +20,7 @@ type ChannelsConfig struct { Zalo ZaloConfig `json:"zalo"` ZaloPersonal ZaloPersonalConfig `json:"zalo_personal"` Feishu FeishuConfig `json:"feishu"` + Line LineConfig `json:"line"` PendingCompaction *PendingCompactionConfig `json:"pending_compaction,omitempty"` // global pending message compaction settings } @@ -509,3 +510,13 @@ func MergeChannelGroupQuotas(cfg *Config) { } } } + +// LineConfig defines configuration for the LINE Messaging API channel. +type LineConfig struct { + Enabled bool `json:"enabled"` + ChannelAccessToken string `json:"channel_access_token"` + ChannelSecret string `json:"channel_secret"` + AllowFrom FlexibleStringSlice `json:"allow_from"` + DMPolicy string `json:"dm_policy,omitempty"` + GroupPolicy string `json:"group_policy,omitempty"` +} diff --git a/internal/http/channel_instances.go b/internal/http/channel_instances.go index 6a92ae467..4ebc1c8ae 100644 --- a/internal/http/channel_instances.go +++ b/internal/http/channel_instances.go @@ -540,7 +540,7 @@ func (h *ChannelInstancesHandler) handleResolveContacts(w http.ResponseWriter, r // isValidChannelType checks if the channel type is supported. func isValidChannelType(ct string) bool { switch ct { - case "telegram", "discord", "slack", "whatsapp", "zalo_oa", "zalo_personal", "feishu": + case "telegram", "discord", "slack", "whatsapp", "zalo_oa", "zalo_personal", "feishu", "line": return true } return false