Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
linechannel "github.com/nextlevelbuilder/goclaw/internal/channels/line"
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
"github.com/nextlevelbuilder/goclaw/internal/channels/zalo"
Expand Down Expand Up @@ -562,6 +563,7 @@ func runGateway() {
instanceLoader.RegisterFactory(channels.TypeZaloPersonal, zalopersonal.FactoryWithPendingStore(pgStores.PendingMessages))
instanceLoader.RegisterFactory(channels.TypeWhatsApp, whatsapp.Factory)
instanceLoader.RegisterFactory(channels.TypeSlack, slackchannel.FactoryWithPendingStore(pgStores.PendingMessages))
instanceLoader.RegisterFactory(channels.TypeLine, linechannel.Factory)
if err := instanceLoader.LoadAll(context.Background()); err != nil {
slog.Error("failed to load channel instances from DB", "error", err)
}
Expand Down
11 changes: 11 additions & 0 deletions cmd/gateway_channels_setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nextlevelbuilder/goclaw/internal/channels/discord"
"github.com/nextlevelbuilder/goclaw/internal/channels/feishu"
slackchannel "github.com/nextlevelbuilder/goclaw/internal/channels/slack"
linechannel "github.com/nextlevelbuilder/goclaw/internal/channels/line"
"github.com/nextlevelbuilder/goclaw/internal/channels/telegram"
"github.com/nextlevelbuilder/goclaw/internal/channels/whatsapp"
"github.com/nextlevelbuilder/goclaw/internal/channels/zalo"
Expand Down Expand Up @@ -95,6 +96,16 @@ func registerConfigChannels(cfg *config.Config, channelMgr *channels.Manager, ms
} else {
channelMgr.RegisterChannel(channels.TypeFeishu, f)
slog.Info("feishu/lark channel enabled (config)")

if cfg.Channels.Line.Enabled && cfg.Channels.Line.ChannelAccessToken != "" && instanceLoader == nil {
l, err := linechannel.New(cfg.Channels.Line, msgBus, pgStores.Pairing)
if err != nil {
slog.Error("failed to initialize line channel", "error", err)
} else {
channelMgr.RegisterChannel(channels.TypeLine, l)
slog.Info("line channel enabled (config)")
}
}
}
}
}
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
github.com/leaanthony/gosod v1.0.4 // indirect
github.com/leaanthony/slicer v1.6.0 // indirect
github.com/leaanthony/u v1.1.1 // indirect
github.com/line/line-bot-sdk-go/v7 v7.21.0 // indirect
github.com/lucasb-eyer/go-colorful v1.2.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@ github.com/leaanthony/u v1.1.1 h1:TUFjwDGlNX+WuwVEzDqQwC2lOv0P4uhTQw7CMFdiK7M=
github.com/leaanthony/u v1.1.1/go.mod h1:9+o6hejoRljvZ3BzdYlVL0JYCwtnAsVuN9pVTQcaRfI=
github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw=
github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/line/line-bot-sdk-go/v7 v7.21.0 h1:eeYMuAwaDV5DZNTRqDipNhzjT51HwEcM1PRPG+cqh4Y=
github.com/line/line-bot-sdk-go/v7 v7.21.0/go.mod h1:idpoxOZgtSd8JyhctMMpwg5LNgRAIL/QIxa5S0DXcMg=
github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY=
github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
Expand Down
1 change: 1 addition & 0 deletions internal/channels/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
TypeWhatsApp = "whatsapp"
TypeZaloOA = "zalo_oa"
TypeZaloPersonal = "zalo_personal"
TypeLine = "line"
)

// Channel defines the interface that all channel implementations must satisfy.
Expand Down
179 changes: 179 additions & 0 deletions internal/channels/line/audio.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package line

import (
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"time"

"github.com/line/line-bot-sdk-go/v7/linebot"
)

// audioContentTypeToExt maps LINE audio Content-Type headers to file extensions.
// Unknown types fall back to ".bin"; the km-meeting-pipeline.sh upload cron
// will skip non-audio extensions, so unknown files do not enter the pipeline.
var audioContentTypeToExt = map[string]string{
"audio/m4a": ".m4a",
"audio/mp4": ".m4a",
"audio/aac": ".aac",
"audio/mp3": ".mp3",
"audio/mpeg": ".mp3",
"audio/wav": ".wav",
"audio/wave": ".wav",
"audio/x-wav": ".wav",
"audio/ogg": ".ogg",
"audio/opus": ".opus",
}

// extensionForAudio returns a file extension for the given audio Content-Type.
// Returns ".bin" and false for unknown types so the caller can log a warning.
func extensionForAudio(contentType string) (string, bool) {
if ext, ok := audioContentTypeToExt[contentType]; ok {
return ext, true
}
return ".bin", false
}

// senderShort returns up to 8 leading chars from the LINE user ID.
// Falls back to "unknown" if userID is empty (group messages where the
// sender ID is unavailable).
func senderShort(userID string) string {
if userID == "" {
return "unknown"
}
if len(userID) <= 8 {
return userID
}
return userID[:8]
}

// audioSidecar is the JSON written next to each ingested audio file so the
// km pipeline can preserve LINE provenance through the rest of the pipeline.
type audioSidecar struct {
SourceType string `json:"source_type"`
LineUserID string `json:"line_user_id"`
LineChatID string `json:"line_chat_id"`
LineMessageID string `json:"line_message_id"`
OriginalContentType string `json:"original_content_type"`
ReceivedAt string `json:"received_at"`
}

// downloadAudioContent fetches a LINE audio message body to a temp file and
// returns the file path plus the response Content-Type. The caller is
// responsible for moving / renaming the file out of the temp area.
func (c *Channel) downloadAudioContent(messageID string) (string, string, error) {
resp, err := c.bot.GetMessageContent(messageID).Do()
if err != nil {
return "", "", fmt.Errorf("get audio content: %w", err)
}
defer resp.Content.Close()

tmp, err := os.CreateTemp("", "line-audio-*")
if err != nil {
return "", "", fmt.Errorf("create temp file: %w", err)
}

if _, err := io.Copy(tmp, resp.Content); err != nil {
tmp.Close()
os.Remove(tmp.Name())
return "", "", fmt.Errorf("write audio: %w", err)
}
if err := tmp.Close(); err != nil {
os.Remove(tmp.Name())
return "", "", fmt.Errorf("close temp file: %w", err)
}

return tmp.Name(), resp.ContentType, nil
}

// ingestLineAudio downloads an AudioMessage, names it per the meeting-pipeline
// convention, and moves it into /data/km/meetings/inbox/. A sidecar JSON is
// written alongside so the downstream cron knows the LINE provenance.
//
// This function deliberately does NOT call HandleMessage — audio messages have
// no text content for the GoClaw agent and would just confuse it. The
// downstream km-meeting-pipeline.sh cron will pick up the file and run it
// through ffmpeg compression + nlm transcription independently.
func (c *Channel) ingestLineAudio(msg *linebot.AudioMessage, userID, chatID string) error {
tmpPath, contentType, err := c.downloadAudioContent(msg.ID)
if err != nil {
return fmt.Errorf("download: %w", err)
}

ext, known := extensionForAudio(contentType)
if !known {
slog.Warn("LINE audio: unknown content type, falling back to .bin",
"content_type", contentType, "message_id", msg.ID)
}

now := time.Now()
fileName := fmt.Sprintf("%s_%s_line_%s%s",
now.Format("20060102"),
now.Format("150405"),
senderShort(userID),
ext,
)
finalPath := filepath.Join(meetingsInboxDir, fileName)

if err := os.MkdirAll(meetingsInboxDir, 0o755); err != nil {
os.Remove(tmpPath)
return fmt.Errorf("ensure inbox dir: %w", err)
}

if err := os.Rename(tmpPath, finalPath); err != nil {
// Cross-device rename can fail (e.g. /tmp on tmpfs, /data on disk).
// Fall back to copy + remove.
if cerr := copyFile(tmpPath, finalPath); cerr != nil {
os.Remove(tmpPath)
return fmt.Errorf("move to inbox: %w", cerr)
}
os.Remove(tmpPath)
}

sidecar := audioSidecar{
SourceType: "line_audio",
LineUserID: userID,
LineChatID: chatID,
LineMessageID: msg.ID,
OriginalContentType: contentType,
ReceivedAt: now.UTC().Format(time.RFC3339),
}
sidecarPath := filepath.Join(meetingsInboxDir, fileName+".source.json")
if data, jerr := json.MarshalIndent(sidecar, "", " "); jerr == nil {
if werr := os.WriteFile(sidecarPath, data, 0o644); werr != nil {
slog.Warn("LINE audio: failed to write sidecar",
"err", werr, "path", sidecarPath)
}
}

slog.Info("LINE audio: ingested",
"file", fileName,
"content_type", contentType,
"sender", senderShort(userID),
"chat", chatID,
)
return nil
}

// copyFile is a fallback for cross-device rename failures.
func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()

out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()

if _, err := io.Copy(out, in); err != nil {
return err
}
return out.Sync()
}
89 changes: 89 additions & 0 deletions internal/channels/line/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package line

import (
"context"
"log/slog"
"net/http"
"sync"
"time"

"github.com/line/line-bot-sdk-go/v7/linebot"

"github.com/nextlevelbuilder/goclaw/internal/bus"
"github.com/nextlevelbuilder/goclaw/internal/channels"
"github.com/nextlevelbuilder/goclaw/internal/config"
"github.com/nextlevelbuilder/goclaw/internal/store"
)

// replyTokenEntry caches a reply token with its receive time.
type replyTokenEntry struct {
token string
receivedAt time.Time
}

// Channel implements the LINE Messaging API channel.
type Channel struct {
*channels.BaseChannel
bot *linebot.Client
cfg config.LineConfig
pairingService store.PairingStore
replyTokens sync.Map // chatID → replyTokenEntry
}

// New creates a new LINE channel.
func New(cfg config.LineConfig, msgBus *bus.MessageBus, pairingSvc store.PairingStore) (*Channel, error) {
bot, err := linebot.New(cfg.ChannelSecret, cfg.ChannelAccessToken)
if err != nil {
return nil, err
}

base := channels.NewBaseChannel("line", msgBus, cfg.AllowFrom)
base.ValidatePolicy(cfg.DMPolicy, cfg.GroupPolicy)

return &Channel{
BaseChannel: base,
bot: bot,
cfg: cfg,
pairingService: pairingSvc,
}, nil
}

// Type returns the channel type.
func (c *Channel) Type() string { return "line" }

// Start begins listening. Webhook mode — nothing to poll.
func (c *Channel) Start(_ context.Context) error {
c.SetRunning(true)
slog.Info("LINE channel started (webhook mode)")
return nil
}

// Stop shuts down the channel.
func (c *Channel) Stop(_ context.Context) error {
c.SetRunning(false)
slog.Info("LINE channel stopped")
return nil
}

// WebhookHandler returns the HTTP path and handler for LINE webhook callbacks.
func (c *Channel) WebhookHandler() (string, http.Handler) {
return "/webhook/line", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
events, err := c.bot.ParseRequest(r)
if err != nil {
if err == linebot.ErrInvalidSignature {
slog.Warn("LINE webhook: invalid signature")
w.WriteHeader(http.StatusBadRequest)
} else {
slog.Error("LINE webhook: parse error", "err", err)
w.WriteHeader(http.StatusInternalServerError)
}
return
}

for _, event := range events {
go c.handleEvent(event)
}

w.WriteHeader(http.StatusOK)
})
}
19 changes: 19 additions & 0 deletions internal/channels/line/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package line

const (
maxTextLength = 5000
maxReplyMessages = 5
replyTokenTTL = 25 // seconds, buffer before 30s expiry
loadingSeconds = 60
loadingAPIURL = "https://api.line.me/v2/bot/chat/loading/start"

// km-meeting-pipeline integration: where AudioMessage and ingest-gdrive
// downloads land. The km-meeting-pipeline.sh upload cron picks files up
// from here.
meetingsInboxDir = "/data/km/meetings/inbox"

// Default location of km-meeting-pipeline.sh on the production VPS.
// Override at runtime via KM_MEETING_PIPELINE_SCRIPT env var
// (used by dev hosts and integration tests).
meetingsPipelineScript = "/home/ubuntu/odoo_dev/esmith-specs/scripts/km-meeting-pipeline.sh"
)
Loading