diff --git a/cmd/gateway.go b/cmd/gateway.go index ff39bb2ee..222f35dd9 100644 --- a/cmd/gateway.go +++ b/cmd/gateway.go @@ -129,6 +129,8 @@ func runGateway() { defer traceCollector.Stop() // OTel OTLP export: compiled via build tags. Build with 'go build -tags otel' to enable. initOTelExporter(context.Background(), cfg, traceCollector) + // LangSmith export: compiled via build tags. Build with 'go build -tags langsmith' to enable. + initLangSmithExporter(context.Background(), cfg, traceCollector) } if snapshotWorker != nil { defer snapshotWorker.Stop() diff --git a/cmd/gateway_langsmith.go b/cmd/gateway_langsmith.go new file mode 100644 index 000000000..647e76489 --- /dev/null +++ b/cmd/gateway_langsmith.go @@ -0,0 +1,42 @@ +//go:build langsmith + +package cmd + +import ( + "context" + "log/slog" + + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/tracing" + "github.com/nextlevelbuilder/goclaw/internal/tracing/langsmithexport" +) + +// initLangSmithExporter creates and wires the LangSmith exporter when +// the API key is configured. Only compiled with -tags langsmith. +func initLangSmithExporter(_ context.Context, cfg *config.Config, collector *tracing.Collector) { + if collector == nil { + return + } + if cfg.LangSmith.APIKey == "" { + slog.Debug("LangSmith export available but not enabled (set LANGSMITH_API_KEY)") + return + } + + exp, err := langsmithexport.New(langsmithexport.Config{ + APIKey: cfg.LangSmith.APIKey, + Project: cfg.LangSmith.Project, + APIUrl: cfg.LangSmith.APIUrl, + }) + if err != nil { + slog.Warn("failed to create LangSmith exporter", "error", err) + return + } + + collector.AddExporter(exp) + + project := cfg.LangSmith.Project + if project == "" { + project = "default" + } + slog.Info("LangSmith export enabled", "project", project) +} diff --git a/cmd/gateway_langsmith_noop.go b/cmd/gateway_langsmith_noop.go new file mode 100644 index 000000000..1ee35567b --- /dev/null +++ b/cmd/gateway_langsmith_noop.go @@ -0,0 +1,15 @@ +//go:build !langsmith + +package cmd + +import ( + "context" + + "github.com/nextlevelbuilder/goclaw/internal/config" + "github.com/nextlevelbuilder/goclaw/internal/tracing" +) + +// initLangSmithExporter is a no-op when built without the "langsmith" tag. +// Build with `go build -tags langsmith` to enable LangSmith export. +func initLangSmithExporter(_ context.Context, _ *config.Config, _ *tracing.Collector) { +} diff --git a/go.mod b/go.mod index 456ae5f06..773c081d9 100644 --- a/go.mod +++ b/go.mod @@ -22,11 +22,11 @@ require ( github.com/titanous/json5 v1.0.0 github.com/wailsapp/wails/v2 v2.11.0 github.com/zalando/go-keyring v0.2.8 - go.opentelemetry.io/otel v1.40.0 + go.opentelemetry.io/otel v1.42.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 - go.opentelemetry.io/otel/sdk v1.40.0 - go.opentelemetry.io/otel/trace v1.40.0 + go.opentelemetry.io/otel/sdk v1.42.0 + go.opentelemetry.io/otel/trace v1.42.0 golang.org/x/time v0.14.0 modernc.org/sqlite v1.47.0 tailscale.com v1.94.2 @@ -84,6 +84,7 @@ require ( github.com/jsimonetti/rtnetlink v1.4.0 // indirect github.com/labstack/echo/v4 v4.13.3 // indirect github.com/labstack/gommon v0.4.2 // indirect + github.com/langchain-ai/langsmith-go v0.2.2 // indirect github.com/leaanthony/go-ansi-parser v1.6.1 // indirect github.com/leaanthony/gosod v1.0.4 // indirect github.com/leaanthony/slicer v1.6.0 // indirect @@ -114,6 +115,10 @@ require ( github.com/tailscale/peercred v0.0.0-20250107143737-35a0c7bd7edc // indirect github.com/tailscale/web-client-prebuilt v0.0.0-20250124233751-d4cd19a26976 // indirect github.com/tailscale/wireguard-go v0.0.0-20250716170648-1d0488a3d7da // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect github.com/tkrajina/go-reflector v0.5.8 // indirect github.com/valyala/fasttemplate v1.2.2 // indirect github.com/wailsapp/go-webview2 v1.0.22 // indirect @@ -154,7 +159,7 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/klauspost/compress v1.18.2 // indirect + github.com/klauspost/compress v1.18.4 // indirect github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/lib/pq v1.10.9 github.com/mark3labs/mcp-go v0.44.0 @@ -172,7 +177,7 @@ require ( github.com/ysmood/leakless v0.9.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 // indirect - go.opentelemetry.io/otel/metric v1.40.0 // indirect + go.opentelemetry.io/otel/metric v1.42.0 // indirect go.opentelemetry.io/proto/otlp v1.9.0 // indirect golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect golang.org/x/crypto v0.47.0 // indirect @@ -183,6 +188,6 @@ require ( golang.org/x/text v0.33.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260128011058-8636f8732409 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 // indirect - google.golang.org/grpc v1.78.0 // indirect + google.golang.org/grpc v1.79.3 // indirect google.golang.org/protobuf v1.36.11 // indirect ) diff --git a/go.sum b/go.sum index 00c05f1d6..65a627f69 100644 --- a/go.sum +++ b/go.sum @@ -251,6 +251,8 @@ github.com/jsimonetti/rtnetlink v1.4.0 h1:Z1BF0fRgcETPEa0Kt0MRk3yV5+kF1FWTni6KUF github.com/jsimonetti/rtnetlink v1.4.0/go.mod h1:5W1jDvWdnthFJ7fxYX1GMK07BUpI4oskfOqvPteYS6E= github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk= github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= +github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c= +github.com/klauspost/compress v1.18.4/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= github.com/kortschak/wol v0.0.0-20200729010619-da482cc4850a h1:+RR6SqnTkDLWyICxS1xpjCi/3dhyV+TgZwA6Ww3KncQ= @@ -267,6 +269,8 @@ github.com/labstack/echo/v4 v4.13.3 h1:pwhpCPrTl5qry5HRdM5FwdXnhXSLSY+WE+YQSeCaa github.com/labstack/echo/v4 v4.13.3/go.mod h1:o90YNEeQWjDozo584l7AwhJMHN0bOC4tAfg+Xox9q5g= github.com/labstack/gommon v0.4.2 h1:F8qTUNXgG1+6WQmqoUWnz8WiEU60mXVVw0P4ht1WRA0= github.com/labstack/gommon v0.4.2/go.mod h1:QlUFxVM+SNXhDL/Z7YhocGIBYOiwB0mXm1+1bAPHPyU= +github.com/langchain-ai/langsmith-go v0.2.2 h1:WVNUR9dhnuieIaXrKxmZWva6TX1DV/RHCVgc67wbAbs= +github.com/langchain-ai/langsmith-go v0.2.2/go.mod h1:xdfOA0EBT7KF9ylz+gGq8EM6srRkv2PtpazQ+4oraWk= github.com/leaanthony/debme v1.2.1 h1:9Tgwf+kjcrbMQ4WnPcEIUcQuIZYqdWftzZkBr+i/oOc= github.com/leaanthony/debme v1.2.1/go.mod h1:3V+sCm5tYAgQymvSOfYQ5Xx2JCr+OXiD9Jkw3otUjiA= github.com/leaanthony/go-ansi-parser v1.6.1 h1:xd8bzARK3dErqkPFtoF9F3/HgN8UQk0ed1YDKpEz01A= @@ -415,6 +419,16 @@ github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e h1:zOGKqN5D5hHhiYUp github.com/tailscale/xnet v0.0.0-20240729143630-8497ac4dab2e/go.mod h1:orPd6JZXXRyuDusYilywte7k094d7dycXXU5YnWsrwg= github.com/tc-hib/winres v0.3.1 h1:CwRjEGrKdbi5CvZ4ID+iyVhgyfatxFoizjPhzez9Io4= github.com/tc-hib/winres v0.3.1/go.mod h1:C/JaNhH3KBvhNKVbvdlDWkbMDO9H4fKKDaN7/07SSuk= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= github.com/titanous/json5 v1.0.0 h1:hJf8Su1d9NuI/ffpxgxQfxh/UiBFZX7bMPid0rIL/7s= github.com/titanous/json5 v1.0.0/go.mod h1:7JH1M8/LHKc6cyP5o5g3CSaRj+mBrIimTxzpvmckH8c= github.com/tkrajina/go-reflector v0.5.8 h1:yPADHrwmUbMq4RGEyaOUpz2H90sRsETNVpjzo3DLVQQ= @@ -475,6 +489,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0 h1:ssfIgGN go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.64.0/go.mod h1:GQ/474YrbE4Jx8gZ4q5I4hrhUzM6UPzyrqJYV2AqPoQ= go.opentelemetry.io/otel v1.40.0 h1:oA5YeOcpRTXq6NN7frwmwFR0Cn3RhTVZvXsP4duvCms= go.opentelemetry.io/otel v1.40.0/go.mod h1:IMb+uXZUKkMXdPddhwAHm6UfOwJyh4ct1ybIlV14J0g= +go.opentelemetry.io/otel v1.42.0 h1:lSQGzTgVR3+sgJDAU/7/ZMjN9Z+vUip7leaqBKy4sho= +go.opentelemetry.io/otel v1.42.0/go.mod h1:lJNsdRMxCUIWuMlVJWzecSMuNjE7dOYyWlqOXWkdqCc= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0 h1:QKdN8ly8zEMrByybbQgv8cWBcdAarwmIPZ6FThrWXJs= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.40.0/go.mod h1:bTdK1nhqF76qiPoCCdyFIV+N/sRHYXYCTQc+3VCi3MI= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.40.0 h1:DvJDOPmSWQHWywQS6lKL+pb8s3gBLOZUtw4N+mavW1I= @@ -483,12 +499,19 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0 h1:wVZXI go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.40.0/go.mod h1:khvBS2IggMFNwZK/6lEeHg/W57h/IX6J4URh57fuI40= go.opentelemetry.io/otel/metric v1.40.0 h1:rcZe317KPftE2rstWIBitCdVp89A2HqjkxR3c11+p9g= go.opentelemetry.io/otel/metric v1.40.0/go.mod h1:ib/crwQH7N3r5kfiBZQbwrTge743UDc7DTFVZrrXnqc= +go.opentelemetry.io/otel/metric v1.42.0 h1:2jXG+3oZLNXEPfNmnpxKDeZsFI5o4J+nz6xUlaFdF/4= +go.opentelemetry.io/otel/metric v1.42.0/go.mod h1:RlUN/7vTU7Ao/diDkEpQpnz3/92J9ko05BIwxYa2SSI= go.opentelemetry.io/otel/sdk v1.40.0 h1:KHW/jUzgo6wsPh9At46+h4upjtccTmuZCFAc9OJ71f8= go.opentelemetry.io/otel/sdk v1.40.0/go.mod h1:Ph7EFdYvxq72Y8Li9q8KebuYUr2KoeyHx0DRMKrYBUE= +go.opentelemetry.io/otel/sdk v1.42.0 h1:LyC8+jqk6UJwdrI/8VydAq/hvkFKNHZVIWuslJXYsDo= +go.opentelemetry.io/otel/sdk v1.42.0/go.mod h1:rGHCAxd9DAph0joO4W6OPwxjNTYWghRWmkHuGbayMts= go.opentelemetry.io/otel/sdk/metric v1.40.0 h1:mtmdVqgQkeRxHgRv4qhyJduP3fYJRMX4AtAlbuWdCYw= go.opentelemetry.io/otel/sdk/metric v1.40.0/go.mod h1:4Z2bGMf0KSK3uRjlczMOeMhKU2rhUqdWNoKcYrtcBPg= +go.opentelemetry.io/otel/sdk/metric v1.42.0 h1:D/1QR46Clz6ajyZ3G8SgNlTJKBdGp84q9RKCAZ3YGuA= go.opentelemetry.io/otel/trace v1.40.0 h1:WA4etStDttCSYuhwvEa8OP8I5EWu24lkOzp+ZYblVjw= go.opentelemetry.io/otel/trace v1.40.0/go.mod h1:zeAhriXecNGP/s2SEG3+Y8X9ujcJOTqQ5RgdEJcawiA= +go.opentelemetry.io/otel/trace v1.42.0 h1:OUCgIPt+mzOnaUTpOQcBiM/PLQ/Op7oq6g4LenLmOYY= +go.opentelemetry.io/otel/trace v1.42.0/go.mod h1:f3K9S+IFqnumBkKhRJMeaZeNk9epyhnCmQh/EysQCdc= go.opentelemetry.io/proto/otlp v1.9.0 h1:l706jCMITVouPOqEnii2fIAuO3IVGBRPV5ICjceRb/A= go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= @@ -559,6 +582,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409 h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20260128011058-8636f8732409/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= google.golang.org/grpc v1.78.0 h1:K1XZG/yGDJnzMdd/uZHAkVqJE+xIDOcmdSFZkBUicNc= google.golang.org/grpc v1.78.0/go.mod h1:I47qjTo4OKbMkjA/aOOwxDIiPSBofUtQUI5EfpWvW7U= +google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= +google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/internal/config/config.go b/internal/config/config.go index e995915de..c0499d7ba 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,6 +51,7 @@ type Config struct { Tts TtsConfig `json:"tts"` Cron CronConfig `json:"cron"` Telemetry TelemetryConfig `json:"telemetry"` + LangSmith LangSmithConfig `json:"langsmith"` Tailscale TailscaleConfig `json:"tailscale"` Bindings []AgentBinding `json:"bindings,omitempty"` mu sync.RWMutex @@ -316,6 +317,15 @@ type TelemetryConfig struct { ModelPricing map[string]*ModelPricing `json:"model_pricing,omitempty"` // cost per model, key = "provider/model" or just "model" } +// LangSmithConfig configures the LangSmith tracing exporter. +// When APIKey is set (or LANGSMITH_API_KEY env var), spans are exported to +// LangSmith as runs for AI-specific observability. +type LangSmithConfig struct { + APIKey string `json:"api_key,omitempty"` // LangSmith API key (required to enable) + Project string `json:"project,omitempty"` // project name (default: "default") + APIUrl string `json:"api_url,omitempty"` // API URL override (default: LangSmith cloud) +} + // CronConfig configures the cron job system. type CronConfig struct { MaxRetries int `json:"max_retries,omitempty"` // max retry attempts on failure (default 3, 0 = no retry) @@ -389,6 +399,7 @@ func (c *Config) ReplaceFrom(src *Config) { c.Tts = src.Tts c.Cron = src.Cron c.Telemetry = src.Telemetry + c.LangSmith = src.LangSmith c.Tailscale = src.Tailscale c.Bindings = src.Bindings } diff --git a/internal/config/config_load.go b/internal/config/config_load.go index 38b3319e4..fb64cda62 100644 --- a/internal/config/config_load.go +++ b/internal/config/config_load.go @@ -203,6 +203,11 @@ func (c *Config) applyEnvOverrides() { c.Telemetry.Insecure = v == "true" || v == "1" } + // LangSmith (uses standard LangSmith env var names) + envStr("LANGSMITH_API_KEY", &c.LangSmith.APIKey) + envStr("LANGSMITH_PROJECT", &c.LangSmith.Project) + envStr("LANGSMITH_ENDPOINT", &c.LangSmith.APIUrl) + // Owner IDs from env (comma-separated, whitespace-trimmed) if v := os.Getenv("GOCLAW_OWNER_IDS"); v != "" { var ids []string diff --git a/internal/config/config_secrets.go b/internal/config/config_secrets.go index 6e6cf938b..62cfa4a2a 100644 --- a/internal/config/config_secrets.go +++ b/internal/config/config_secrets.go @@ -64,6 +64,9 @@ func (c *Config) MaskedCopy() *Config { // Mask Tailscale auth key maskNonEmpty(&cp.Tailscale.AuthKey) + // Mask LangSmith API key + maskNonEmpty(&cp.LangSmith.APIKey) + return cp } @@ -113,6 +116,9 @@ func (c *Config) StripSecrets() { // Tailscale auth key c.Tailscale.AuthKey = "" + + // LangSmith API key + c.LangSmith.APIKey = "" } // StripMaskedSecrets strips only fields that still contain the mask value "***". @@ -168,6 +174,9 @@ func (c *Config) StripMaskedSecrets() { // Tailscale auth key stripIfMasked(&c.Tailscale.AuthKey) + + // LangSmith API key + stripIfMasked(&c.LangSmith.APIKey) } // ApplyDBSecrets overlays secrets from the config_secrets table onto the config. diff --git a/internal/tracing/collector.go b/internal/tracing/collector.go index a6ba4ec21..d99775979 100644 --- a/internal/tracing/collector.go +++ b/internal/tracing/collector.go @@ -21,16 +21,31 @@ const ( ) // SpanExporter is implemented by backends that receive span data alongside -// the PostgreSQL store (e.g. OpenTelemetry OTLP). Keeping this as an -// interface lets the OTel dependency live in a separate sub-package that can -// be swapped out by commenting one import line. +// the PostgreSQL store (e.g. OpenTelemetry OTLP, LangSmith). Keeping this +// as an interface lets each dependency live in a separate sub-package gated +// by build tags. type SpanExporter interface { ExportSpans(ctx context.Context, spans []store.SpanData) Shutdown(ctx context.Context) error } -// spanUpdate represents a deferred span field update, buffered alongside new -// spans and applied during the same flush cycle (after batch INSERT). +// SpanUpdateExporter is an optional extension of SpanExporter for exporters +// that need to receive two-phase span updates (e.g. LangSmith RunUpdate). +// Exporters that implement this interface will receive deferred span updates +// during the flush cycle, after the initial spans have been exported. +type SpanUpdateExporter interface { + ExportSpanUpdates(ctx context.Context, updates []SpanUpdate) +} + +// SpanUpdate is the exported form of a deferred span field update, passed to +// SpanUpdateExporter implementations during the flush cycle. +type SpanUpdate struct { + SpanID uuid.UUID + TraceID uuid.UUID + Updates map[string]any +} + +// spanUpdate is the internal buffered form used by the collector channel. type spanUpdate struct { SpanID uuid.UUID TraceID uuid.UUID @@ -55,8 +70,8 @@ type Collector struct { dirtyTraces map[uuid.UUID]struct{} dirtyTracesMu sync.Mutex - verbose bool // when true, LLM spans include full input messages - exporter SpanExporter // optional external exporter (nil = disabled) + verbose bool // when true, LLM spans include full input messages + exporters []SpanExporter // optional external exporters (OTel, LangSmith, etc.) // OnFlush is called after each flush cycle with the trace IDs that had // their aggregates updated. Used to broadcast realtime trace events. @@ -91,10 +106,16 @@ func (c *Collector) PreviewMaxLen() int { return previewMaxLen } -// SetExporter attaches an external span exporter (e.g. OpenTelemetry OTLP). -// When set, spans are exported to the external backend during each flush cycle. +// SetExporter replaces all exporters with a single one (backward compat). +// Prefer AddExporter for multi-exporter setups. func (c *Collector) SetExporter(exp SpanExporter) { - c.exporter = exp + c.exporters = []SpanExporter{exp} +} + +// AddExporter appends an exporter to the list. Multiple exporters receive +// the same batch of spans during each flush cycle (fan-out). +func (c *Collector) AddExporter(exp SpanExporter) { + c.exporters = append(c.exporters, exp) } // Start begins the background flush loop. @@ -109,12 +130,14 @@ func (c *Collector) Stop() { close(c.stopCh) c.wg.Wait() - // Shutdown external exporter (flushes remaining spans) - if c.exporter != nil { + // Shutdown all external exporters (flushes remaining spans). + if len(c.exporters) > 0 { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - if err := c.exporter.Shutdown(ctx); err != nil { - slog.Warn("tracing: span exporter shutdown failed", "error", err) + for _, exp := range c.exporters { + if err := exp.Shutdown(ctx); err != nil { + slog.Warn("tracing: span exporter shutdown failed", "error", err) + } } } @@ -263,9 +286,9 @@ done: slog.Debug("tracing: flushed spans", "count", len(spans)) } - // Export to external backend (non-blocking — errors logged, not propagated) - if c.exporter != nil { - c.exporter.ExportSpans(ctx, spans) + // Export to external backends (non-blocking — errors logged, not propagated). + for _, exp := range c.exporters { + exp.ExportSpans(ctx, spans) } } @@ -290,6 +313,17 @@ doneUpdates: } } slog.Debug("tracing: applied span updates", "count", len(updates)) + + // Fan out span updates to exporters that support two-phase tracing. + exported := make([]SpanUpdate, len(updates)) + for i, u := range updates { + exported[i] = SpanUpdate{SpanID: u.SpanID, TraceID: u.TraceID, Updates: u.Updates} + } + for _, exp := range c.exporters { + if sue, ok := exp.(SpanUpdateExporter); ok { + sue.ExportSpanUpdates(ctx, exported) + } + } } // Update aggregates for dirty traces diff --git a/internal/tracing/langsmithexport/exporter.go b/internal/tracing/langsmithexport/exporter.go new file mode 100644 index 000000000..37c34d0e8 --- /dev/null +++ b/internal/tracing/langsmithexport/exporter.go @@ -0,0 +1,282 @@ +package langsmithexport + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/google/uuid" + langsmith "github.com/langchain-ai/langsmith-go" + + "github.com/nextlevelbuilder/goclaw/internal/store" + "github.com/nextlevelbuilder/goclaw/internal/tracing" +) + +const ( + // pendingTTL is how long we keep "running" spans in the pending map + // before cleaning them up as orphans. + pendingTTL = 5 * time.Minute + pendingPruneFreq = 2 * time.Minute +) + +// pendingEntry stores metadata needed when a root span's two-phase update arrives. +// Only used for root spans; child spans use childBuf instead. +type pendingEntry struct { + createdAt time.Time + dottedOrder string + langsmithRunID uuid.UUID // may differ from span ID (root spans use TraceID) +} + +// runInfo tracks a LangSmith run's remapped ID and dotted_order so child +// spans can build proper hierarchy and parent references. +type runInfo struct { + langsmithID uuid.UUID + dottedOrder string + createdAt time.Time +} + +// childBufEntry stores a pre-built RunCreate for a child "running" span. +// Instead of sending POST+PATCH (which fails when PATCH arrives in a +// different sink batch without parent_run_id), we buffer until the +// update arrives, then send a single complete POST. +type childBufEntry struct { + rc *langsmith.RunCreate + createdAt time.Time +} + +// Config configures the LangSmith exporter. +type Config struct { + APIKey string // required + Project string // default: "default" + APIUrl string // default: LangSmith cloud +} + +// Exporter sends GoClaw spans to LangSmith as runs via the multipart +// ingestion API. Implements tracing.SpanExporter and tracing.SpanUpdateExporter. +type Exporter struct { + client *langsmith.TracingClient + + // pending tracks root "running" spans (two-phase POST+PATCH). + // Key: GoClaw spanID, value: pendingEntry. + pending sync.Map + + // childBuf buffers child "running" spans for deferred single-shot POST. + // Key: GoClaw spanID, value: childBufEntry. + childBuf sync.Map + + // runMap tracks every emitted run's LangSmith ID and dotted_order so + // child spans arriving in later batches can build proper hierarchy. + // Key: GoClaw spanID, value: runInfo. + runMap sync.Map + + pruneOnce sync.Once + pruneWg sync.WaitGroup + stopCh chan struct{} +} + +// New creates a LangSmith exporter with the given config. +func New(cfg Config) (*Exporter, error) { + if cfg.Project == "" { + cfg.Project = "default" + } + + opts := []langsmith.TracingOption{ + langsmith.WithTracingAPIKey(cfg.APIKey), + langsmith.WithTracingProject(cfg.Project), + } + if cfg.APIUrl != "" { + opts = append(opts, langsmith.WithTracingAPIURL(cfg.APIUrl)) + } + + client, err := langsmith.NewTracingClient(context.Background(), opts...) + if err != nil { + return nil, err + } + + return &Exporter{ + client: client, + stopCh: make(chan struct{}), + }, nil +} + +// ExportSpans converts GoClaw SpanData to LangSmith runs and sends them. +// Called by the Collector during flush alongside the PostgreSQL batch insert. +func (e *Exporter) ExportSpans(ctx context.Context, spans []store.SpanData) { + if e == nil || len(spans) == 0 { + return + } + + e.startPruneLoop() + + // First pass: register root span ID mappings so children in the same + // batch can resolve their parent's LangSmith ID and dotted_order. + for _, s := range spans { + if s.ParentSpanID == nil && s.ID != s.TraceID { + dottedOrder := formatDottedOrder(s.StartTime, s.TraceID) + e.runMap.Store(s.ID, runInfo{ + langsmithID: s.TraceID, + dottedOrder: dottedOrder, + createdAt: time.Now(), + }) + } + } + + for _, s := range spans { + // Resolve parent's LangSmith ID and dotted_order for hierarchy. + var parentDottedOrder string + var parentLangsmithID *uuid.UUID + if s.ParentSpanID != nil { + if ri, ok := e.runMap.Load(*s.ParentSpanID); ok { + info := ri.(runInfo) + parentDottedOrder = info.dottedOrder + parentLangsmithID = &info.langsmithID + } + } + + rc := spanToRunCreate(s, parentDottedOrder, parentLangsmithID) + + // Track this run for future children and two-phase updates. + e.runMap.Store(s.ID, runInfo{ + langsmithID: rc.ID, + dottedOrder: rc.DottedOrder, + createdAt: time.Now(), + }) + + if isRunningSpan(s) { + if s.ParentSpanID != nil { + // Child running span: buffer for deferred single-shot POST. + // Standalone PATCHes for children fail LangSmith validation + // because RunUpdate has no ParentRunID field. + e.childBuf.Store(s.ID, childBufEntry{ + rc: rc, + createdAt: time.Now(), + }) + continue + } + // Root running span: two-phase POST+PATCH is safe + // (run_id == trace_id, single-part dotted_order). + e.pending.Store(s.ID, pendingEntry{ + createdAt: time.Now(), + dottedOrder: rc.DottedOrder, + langsmithRunID: rc.ID, + }) + } + + if err := e.client.CreateRun(rc); err != nil { + slog.Warn("langsmith: failed to create run", + "span_id", s.ID, "name", s.Name, "error", err) + } + } +} + +// ExportSpanUpdates converts deferred span updates to LangSmith RunUpdates. +// Called by the Collector after DB span updates are applied. +func (e *Exporter) ExportSpanUpdates(ctx context.Context, updates []tracing.SpanUpdate) { + if e == nil || len(updates) == 0 { + return + } + + for _, u := range updates { + // Buffered child span: apply update and send a single complete POST. + if entry, ok := e.childBuf.LoadAndDelete(u.SpanID); ok { + if cb, ok := entry.(childBufEntry); ok { + applySpanUpdate(cb.rc, u) + if err := e.client.CreateRun(cb.rc); err != nil { + slog.Warn("langsmith: failed to create completed child run", + "span_id", u.SpanID, "error", err) + } + continue + } + } + + // Root span: send PATCH (run_id == trace_id, single-part DO → valid). + ru := spanUpdateToRunUpdate(u) + if entry, ok := e.pending.LoadAndDelete(u.SpanID); ok { + if pe, ok := entry.(pendingEntry); ok { + ru.DottedOrder = pe.dottedOrder + if pe.langsmithRunID != uuid.Nil { + ru.ID = pe.langsmithRunID + } + } + } + + if err := e.client.UpdateRun(ru); err != nil { + slog.Warn("langsmith: failed to update run", + "span_id", u.SpanID, "error", err) + } + } +} + +// Shutdown gracefully shuts down the LangSmith exporter, flushing remaining runs. +func (e *Exporter) Shutdown(ctx context.Context) error { + if e == nil { + return nil + } + close(e.stopCh) + e.pruneWg.Wait() + slog.Info("langsmith exporter shutting down") + e.client.Close() + return nil +} + +// startPruneLoop starts a background goroutine (once) to clean up orphaned +// pending entries that never received an update. +func (e *Exporter) startPruneLoop() { + e.pruneOnce.Do(func() { + e.pruneWg.Add(1) + go func() { + defer e.pruneWg.Done() + ticker := time.NewTicker(pendingPruneFreq) + defer ticker.Stop() + for { + select { + case <-ticker.C: + e.pruneStale() + case <-e.stopCh: + return + } + } + }() + }) +} + +// pruneStale removes expired entries from pending, childBuf, and runMap. +// Orphaned child buffers are flushed as incomplete runs (better than lost). +func (e *Exporter) pruneStale() { + cutoff := time.Now().Add(-pendingTTL) + var pruned int + e.pending.Range(func(key, value any) bool { + if pe, ok := value.(pendingEntry); ok && pe.createdAt.Before(cutoff) { + e.pending.Delete(key) + pruned++ + } + return true + }) + // Flush orphaned child buffers as incomplete runs. + e.childBuf.Range(func(key, value any) bool { + if cb, ok := value.(childBufEntry); ok && cb.createdAt.Before(cutoff) { + e.childBuf.Delete(key) + if err := e.client.CreateRun(cb.rc); err != nil { + slog.Warn("langsmith: failed to flush orphaned child run", "error", err) + } + pruned++ + } + return true + }) + e.runMap.Range(func(key, value any) bool { + if ri, ok := value.(runInfo); ok && ri.createdAt.Before(cutoff) { + e.runMap.Delete(key) + } + return true + }) + if pruned > 0 { + slog.Debug("langsmith: pruned stale entries", "count", pruned) + } +} + +// Compile-time interface checks. +var ( + _ tracing.SpanExporter = (*Exporter)(nil) + _ tracing.SpanUpdateExporter = (*Exporter)(nil) +) diff --git a/internal/tracing/langsmithexport/mapping.go b/internal/tracing/langsmithexport/mapping.go new file mode 100644 index 000000000..aa18cc8ff --- /dev/null +++ b/internal/tracing/langsmithexport/mapping.go @@ -0,0 +1,320 @@ +package langsmithexport + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + langsmith "github.com/langchain-ai/langsmith-go" + + "github.com/nextlevelbuilder/goclaw/internal/store" + "github.com/nextlevelbuilder/goclaw/internal/tracing" +) + +// mapRunType converts a GoClaw span type to a LangSmith run type. +func mapRunType(spanType string) string { + switch spanType { + case store.SpanTypeLLMCall: + return "llm" + case store.SpanTypeToolCall: + return "tool" + case store.SpanTypeAgent: + return "chain" + case store.SpanTypeEmbedding: + return "llm" + default: + return "chain" + } +} + +// formatDottedOrder generates a LangSmith dotted-order string from a timestamp +// and UUID. This format is required for proper run tree hierarchy in the UI. +// Format: YYYYMMDDTHHMMSSffffffZ +func formatDottedOrder(t time.Time, id uuid.UUID) string { + return fmt.Sprintf("%s%06dZ%s", + t.UTC().Format("20060102T150405"), + t.UTC().Nanosecond()/1000, + id.String(), + ) +} + +// spanToRunCreate converts a GoClaw SpanData into a LangSmith RunCreate. +// parentDottedOrder is the parent's dotted_order for building child hierarchy; +// empty for root runs. parentLangsmithID remaps ParentSpanID when the parent's +// LangSmith run ID differs from its GoClaw span ID (root span remapping). +func spanToRunCreate(s store.SpanData, parentDottedOrder string, parentLangsmithID *uuid.UUID) *langsmith.RunCreate { + // LangSmith requires root run_id == trace_id. For root spans (no parent), + // use TraceID as the run ID so both constraints are satisfied. + runID := s.ID + if s.ParentSpanID == nil { + runID = s.TraceID + } + + // Build dotted_order: root = "", child = ".". + segment := formatDottedOrder(s.StartTime, runID) + dottedOrder := segment + if parentDottedOrder != "" { + dottedOrder = parentDottedOrder + "." + segment + } + + rc := &langsmith.RunCreate{ + ID: runID, + TraceID: s.TraceID, + Name: s.Name, + RunType: mapRunType(s.SpanType), + StartTime: s.StartTime, + DottedOrder: dottedOrder, + } + + if s.ParentSpanID != nil { + if parentLangsmithID != nil { + rc.ParentRunID = parentLangsmithID + } else { + rc.ParentRunID = s.ParentSpanID + } + } + + // Build inputs based on span type. + rc.Inputs = buildInputs(s) + + // Build extra with invocation_params + metadata (matches langsmith-go convention). + rc.Extra = buildExtra(s) + + // If span already has end_time (single-shot complete span), set outputs too. + if s.EndTime != nil { + rc.EndTime = *s.EndTime + rc.Outputs = buildOutputs(s) + if s.Status == store.SpanStatusError && s.Error != "" { + rc.Error = s.Error + } + } + + return rc +} + +// buildInputs constructs the Inputs field following LangSmith conventions. +// LLM runs use {"messages": [...]}, tool runs use {"tool_name": ..., "input": ...}, +// chain/agent runs use {"input": ...}. +func buildInputs(s store.SpanData) map[string]any { + inputs := make(map[string]any) + switch s.SpanType { + case store.SpanTypeLLMCall: + if s.InputPreview != "" { + inputs["messages"] = s.InputPreview + } + case store.SpanTypeToolCall: + if s.ToolName != "" { + inputs["tool_name"] = s.ToolName + } + if s.InputPreview != "" { + inputs["input"] = s.InputPreview + } + default: + if s.InputPreview != "" { + inputs["input"] = s.InputPreview + } + } + return inputs +} + +// buildExtra constructs the Extra field following langsmith-go test conventions. +// Uses "invocation_params" for model config and "metadata" for additional context. +func buildExtra(s store.SpanData) map[string]any { + extra := make(map[string]any) + + // Model invocation params (matches langsmith-go integration test pattern). + if s.Model != "" || s.Provider != "" { + params := make(map[string]any) + if s.Model != "" { + params["model"] = s.Model + } + if s.Provider != "" { + params["provider"] = s.Provider + } + extra["invocation_params"] = params + } + + // Span metadata forwarded as-is. + if len(s.Metadata) > 0 { + var meta map[string]any + if err := json.Unmarshal(s.Metadata, &meta); err == nil { + extra["metadata"] = meta + } + } + + return extra +} + +// spanUpdateToRunUpdate converts a deferred span update into a LangSmith RunUpdate. +func spanUpdateToRunUpdate(u tracing.SpanUpdate) *langsmith.RunUpdate { + ru := &langsmith.RunUpdate{ + ID: u.SpanID, + TraceID: u.TraceID, + } + + outputs := make(map[string]any) + usage := make(map[string]any) + + for k, v := range u.Updates { + switch k { + case "end_time": + if t, ok := v.(time.Time); ok { + ru.EndTime = t + } + case "status": + // Mapped via error field below. + case "error": + if s, ok := v.(string); ok && s != "" { + ru.Error = s + } + case "output_preview": + if s, ok := v.(string); ok { + outputs["content"] = s + } + case "input_tokens": + if n, ok := toInt(v); ok { + usage["prompt_tokens"] = n + } + case "output_tokens": + if n, ok := toInt(v); ok { + usage["completion_tokens"] = n + } + case "finish_reason": + if s, ok := v.(string); ok { + outputs["finish_reason"] = s + } + case "total_cost": + if f, ok := toFloat(v); ok { + outputs["total_cost"] = f + } + } + } + + // Assemble usage following LangSmith convention (prompt_tokens/completion_tokens/total_tokens). + if len(usage) > 0 { + pt, _ := toInt(usage["prompt_tokens"]) + ct, _ := toInt(usage["completion_tokens"]) + usage["total_tokens"] = pt + ct + outputs["usage"] = usage + } + + if len(outputs) > 0 { + ru.Outputs = outputs + } + + return ru +} + +// buildOutputs extracts outputs from a completed SpanData. +func buildOutputs(s store.SpanData) map[string]any { + out := make(map[string]any) + if s.OutputPreview != "" { + out["content"] = s.OutputPreview + } + if s.InputTokens > 0 || s.OutputTokens > 0 { + out["usage"] = map[string]int{ + "prompt_tokens": s.InputTokens, + "completion_tokens": s.OutputTokens, + "total_tokens": s.InputTokens + s.OutputTokens, + } + } + if s.FinishReason != "" { + out["finish_reason"] = s.FinishReason + } + if s.TotalCost != nil { + out["total_cost"] = *s.TotalCost + } + return out +} + +// toInt converts various numeric types to int. +func toInt(v any) (int, bool) { + switch n := v.(type) { + case int: + return n, true + case int64: + return int(n), true + case float64: + return int(n), true + default: + return 0, false + } +} + +// toFloat converts various numeric types to float64. +func toFloat(v any) (float64, bool) { + switch n := v.(type) { + case float64: + return n, true + case float32: + return float64(n), true + case int: + return float64(n), true + case int64: + return float64(n), true + default: + return 0, false + } +} + +// applySpanUpdate applies a deferred span update to a buffered RunCreate, +// producing a complete single-shot POST. Used for child spans to avoid +// standalone PATCH operations (which lack parent_run_id and fail validation). +func applySpanUpdate(rc *langsmith.RunCreate, u tracing.SpanUpdate) { + outputs := make(map[string]any) + if rc.Outputs != nil { + for k, v := range rc.Outputs { + outputs[k] = v + } + } + usage := make(map[string]any) + + for k, v := range u.Updates { + switch k { + case "end_time": + if t, ok := v.(time.Time); ok { + rc.EndTime = t + } + case "error": + if s, ok := v.(string); ok && s != "" { + rc.Error = s + } + case "output_preview": + if s, ok := v.(string); ok { + outputs["content"] = s + } + case "input_tokens": + if n, ok := toInt(v); ok { + usage["prompt_tokens"] = n + } + case "output_tokens": + if n, ok := toInt(v); ok { + usage["completion_tokens"] = n + } + case "finish_reason": + if s, ok := v.(string); ok { + outputs["finish_reason"] = s + } + case "total_cost": + if f, ok := toFloat(v); ok { + outputs["total_cost"] = f + } + } + } + + if len(usage) > 0 { + pt, _ := toInt(usage["prompt_tokens"]) + ct, _ := toInt(usage["completion_tokens"]) + usage["total_tokens"] = pt + ct + outputs["usage"] = usage + } + if len(outputs) > 0 { + rc.Outputs = outputs + } +} + +// isRunningSpan returns true if the span is in "running" state (start phase of two-phase). +func isRunningSpan(s store.SpanData) bool { + return s.Status == store.SpanStatusRunning && s.EndTime == nil +}