diff --git a/.github/workflows/docker-publish.yaml b/.github/workflows/docker-publish.yaml new file mode 100644 index 0000000..b99e6f2 --- /dev/null +++ b/.github/workflows/docker-publish.yaml @@ -0,0 +1,52 @@ +name: Build and push Docker image + +on: + push: + branches: + - '**' + tags: + - 'v*' + +permissions: + contents: read + packages: write + +jobs: + build-and-push: + runs-on: ubuntu-latest + + steps: + - name: Checkout + uses: actions/checkout@v4 + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Log in to GHCR + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Compute image tags + id: vars + run: | + # Lowercase the repo for GHCR + IMAGE="ghcr.io/${GITHUB_REPOSITORY,,}" + + if [[ "${GITHUB_REF_TYPE}" == "tag" ]]; then + TAG_NAME="${GITHUB_REF_NAME}" # e.g. v1.2.3 + echo "tags=${IMAGE}:${TAG_NAME},${IMAGE}:latest" >> $GITHUB_OUTPUT + else + SHORT_SHA=$(echo "${GITHUB_SHA}" | cut -c1-7) + echo "tags=${IMAGE}:sha-${SHORT_SHA}" >> $GITHUB_OUTPUT + fi + + - name: Build and push + uses: docker/build-push-action@v6 + with: + context: . + file: ./Dockerfile + push: true + tags: ${{ steps.vars.outputs.tags }} diff --git a/README.md b/README.md index 6c8790b..4d14546 100644 --- a/README.md +++ b/README.md @@ -2,33 +2,129 @@ The tool exports the event costs from the Canton participant nodes and process them. -### Work in progress - -This tool only logs the costs to the standard output. I'm working on other exporters. - ### Important The tool heavily relies on the Kubernetes API because it gets the logs from the pod directly running the Canton participant node. -### Installation +### Usage -```shell -docker build -t {LOCATION_WHERE_THE_CLUSTER_CAN_PULL_FROM}/cantcost:latest . -docker push {LOCATION_WHERE_THE_CLUSTER_CAN_PULL_FROM}/cantcost:latest +Each DEBUG EventCost log lines has a trace_id field. This is also returned as part of the ledger's transaction submission response's traceparent. You can connect them together to get more insights about the cost of a specific transaction. + +### Exporters + +The concept of the exporters that you can define how you want to export the cost events. That can be HTTP, Database write, or just stdout. Right now the only exporter implemented is HTTP. But we are open to discuss other use-cases. + +#### HTTP Exporter + +The HTTP exporter sends the cost events as JSON payload to a defined HTTP endpoint. You can configure the endpoint URL and Authorization header. + +The payload structure is as follows (we removed the recipients for simplicity): + +```json +{ + "@timestamp": "2025-12-03T17:06:09.044Z", + "message": "", + "logger_name": "c.d.c.s.t.TrafficStateController:participant=participant/psid=IndexedPhysicalSynchronizer(global-domain::1220be58c29e::34-0,2)", + "thread_name": "canton-env-ec-1840", + "level": "DEBUG", + "span_id": "38a01e31b05296ed", + "span_parent_id": "9ab6e7e46d7807a7", + "trace_id": "8400687f8dbbef675fb7b6e4661f461d", + "span_name": "SequencerClient.sendAsync", + "cost_details": { + "event_cost": 14097, + "cost_multiplier": 4, + "group_to_members_size": { + "0": 14 + }, + "envelopes_cost": [ + { + "write_cost": 2483, + "read_cost": 13, + "final_cost": 2496, + "recipients": [ + { + "type": "MediatorGroupRecipient", + "member": "", + "group_id": 0 + } + ] + }, + { + "write_cost": 146, + "read_cost": 1, + "final_cost": 147, + "recipients": [] + }, + { + "write_cost": 3468, + "read_cost": 4, + "final_cost": 3472, + "recipients": [] + }, + { + "write_cost": 2440, + "read_cost": 3, + "final_cost": 2443, + "recipients": [] + }, + { + "write_cost": 2741, + "read_cost": 5, + "final_cost": 2746, + "recipients": [] + }, + { + "write_cost": 2788, + "read_cost": 5, + "final_cost": 2793, + "recipients": [] + } + ] + } +} ``` -After the image push you should change the values in the zarf/deployment/devnet/manifest.yaml file. +You can get more details about the cost event structure from the internal/parser/parser_test.go file. + +To set up the HTTP exporter you need to set the following environment variables: + +- EXPORTER_TYPE=http +- HTTP_EXPORTER_URL= +- HTTP_EXPORTER_AUTH_HEADER= -- Change the image location in the `spec.containers.image` field. +### Message + +The message is the raw log line from the Canton participant node. You can get it in the exporter if you switch this environment variable: + +- INCLUDE_MESSAGE=true + +### Installation + +#### Deploy it + +You should change the values in the zarf/deployment/devnet/manifest.yaml file. Note: this is just an example, because the Kubernetes service account needs proper RBAC permissions to read the pod logs. + +- Change the image location in the `spec.containers.image` field. This can be a predefined one from us or your own build. - Change the namespace everywhere for your desired namespace. - Change the TARGET_DEPLOYMENT environment variable to the deployment name of your Canton participant node. +- Set up an exporter properly. ```shell kubectl apply -f zarf/deployment/devnet/manifest.yaml ``` +#### Build it for yourself + +```shell +docker build -t {LOCATION_WHERE_THE_CLUSTER_CAN_PULL_FROM}/cantcost:latest . +docker push {LOCATION_WHERE_THE_CLUSTER_CAN_PULL_FROM}/cantcost:latest +``` + + ### Project layout - internal/catcher: Setups a pod log streamer and call the callback to process a log line one by one. - internal/parser: Parses the log lines and extract the cost events. This is the tricky part, because the log lines are Scala object serialized and wrapped into structured JSON logging. +- internal/exporter: Defines the exporter interface and HTTP exporter implementation. - bin/main.go: The main entry point of the application. Everything glues together here. You can change the export logic here in the callback function. diff --git a/bin/main.go b/bin/main.go index 3e8615b..e523842 100644 --- a/bin/main.go +++ b/bin/main.go @@ -2,14 +2,13 @@ package main import ( "context" - "fmt" "log/slog" "os" "strings" - "time" "github.com/DLC-link/cantcost/internal/catcher" "github.com/DLC-link/cantcost/internal/env" + "github.com/DLC-link/cantcost/internal/exporters" "github.com/DLC-link/cantcost/internal/parser" slogcontext "github.com/PumpkinSeed/slog-context" ) @@ -25,6 +24,20 @@ func main() { ), ) env.Print() + + var exporter = exporters.New() + if env.GetExporterType() == "http" { + httpExporter := exporters.NewHTTPExporter( + env.GetHTTPExporterURL(), + env.GetHTTPExporterAuthHeader(), + env.GetHTTPExporterBatchSize(), + ) + exporter.AddExporter(httpExporter) + slog.Info("HTTP exporter configured", + slog.String("url", env.GetHTTPExporterURL()), + ) + } + ctx := context.Background() err := catcher.Stream(ctx, func(ctx context.Context, line string) error { if strings.Contains(strings.ToLower(line), "eventcost") { @@ -33,12 +46,10 @@ func main() { slog.ErrorContext(ctx, "Failed to parse log line", slog.Any("error", err)) return err } - fmt.Printf("%s TraceID: %s, EventCostDetails: TotalCost=%d, GroupToMembersSize=%v\n", - parsedLine.Timestamp.Format(time.TimeOnly), - parsedLine.TraceID, - parsedLine.CostDetails.EventCost, - parsedLine.CostDetails.GroupToMembersSize) - fmt.Println("----------------------------") + if err := exporter.Export(ctx, &parsedLine); err != nil { + slog.ErrorContext(ctx, "Failed to export parsed line", slog.Any("error", err)) + return err + } } return nil }) diff --git a/internal/env/env.go b/internal/env/env.go index a831103..0e34b82 100644 --- a/internal/env/env.go +++ b/internal/env/env.go @@ -3,12 +3,19 @@ package env import ( "log/slog" "os" + "strconv" ) const ( - targetDeployment = "TARGET_DEPLOYMENT" - targetContainer = "TARGET_CONTAINER" - targetNamespace = "TARGET_NAMESPACE" + targetDeployment = "TARGET_DEPLOYMENT" + targetContainer = "TARGET_CONTAINER" + targetNamespace = "TARGET_NAMESPACE" + exporterType = "EXPORTER_TYPE" + httpExporterURL = "HTTP_EXPORTER_URL" + httpExporterAuthHeader = "HTTP_EXPORTER_AUTH_HEADER" + httpExporterBatchSize = "HTTP_EXPORTER_BATCH_SIZE" + + incluseMessage = "INCLUDE_MESSAGE" logLevel = "LOG_LEVEL" ) @@ -53,6 +60,47 @@ func GetTargetNamespace() string { return "default" } +func GetExporterType() string { + if v := os.Getenv(exporterType); v != "" { + return v + } + return "http" +} + +func GetHTTPExporterURL() string { + if v := os.Getenv(httpExporterURL); v != "" { + return v + } + return "" +} + +func GetHTTPExporterAuthHeader() string { + if v := os.Getenv(httpExporterAuthHeader); v != "" { + return v + } + return "" +} + +func GetHTTPExporterBatchSize() int { + if v := os.Getenv(httpExporterBatchSize); v != "" { + strconvV, err := strconv.Atoi(v) + if err == nil { + return strconvV + } + } + return 10 +} + +func GetIncludeMessage() bool { + if v := os.Getenv(incluseMessage); v != "" { + boolV, err := strconv.ParseBool(v) + if err == nil { + return boolV + } + } + return false +} + func Print() { slog.Info("Environment Variables") slog.Info("TARGET_DEPLOYMENT", slog.String("value", GetTargetDeployment())) diff --git a/internal/exporters/http.go b/internal/exporters/http.go new file mode 100644 index 0000000..a7df19c --- /dev/null +++ b/internal/exporters/http.go @@ -0,0 +1,79 @@ +package exporters + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "log/slog" + "net/http" + "sync" + + "github.com/DLC-link/cantcost/internal/parser" +) + +var _ Exporter = (*HTTP)(nil) + +type HTTP struct { + URL string `json:"url"` + AuthorizationHeader string `json:"authorization_header"` + BatchSize int `json:"batch_size,omitempty"` + + batch []*parser.Line + mutex *sync.Mutex +} + +type HTTPRequest struct { + Count int `json:"count"` + Lines []*parser.MessageLine `json:"lines"` +} + +func NewHTTPExporter(url string, authHeader string, batchSize int) *HTTP { + if batchSize <= 0 { + batchSize = 10 + } + return &HTTP{ + URL: url, + AuthorizationHeader: authHeader, + BatchSize: batchSize, + batch: make([]*parser.Line, 0), + mutex: &sync.Mutex{}, + } +} + +func (h *HTTP) Export(ctx context.Context, line *parser.Line) error { + var request = HTTPRequest{ + Count: 1, + Lines: []*parser.MessageLine{line.ToMessageLine()}, + } + + data, err := json.Marshal(request) + if err != nil { + return err + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, h.URL, bytes.NewBuffer(data)) + if err != nil { + return err + } + req.Header.Add("Authorization", h.AuthorizationHeader) + req.Header.Add("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + if resp.Body != nil { + defer resp.Body.Close() + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + if resp.StatusCode == http.StatusUnprocessableEntity { + slog.ErrorContext(ctx, "HTTP exporter received 422 Unprocessable Entity. Check if the log line format matches the expected schema.", slog.String("data", string(data))) + } + return errors.New("failed to export log line, status code: " + resp.Status) + } + + return nil +} diff --git a/internal/exporters/iface.go b/internal/exporters/iface.go new file mode 100644 index 0000000..4a256d8 --- /dev/null +++ b/internal/exporters/iface.go @@ -0,0 +1,34 @@ +package exporters + +import ( + "context" + + "github.com/DLC-link/cantcost/internal/parser" +) + +type Exporter interface { + Export(ctx context.Context, line *parser.Line) error +} + +type Exporters struct { + exporters []Exporter +} + +func New(exporters ...Exporter) *Exporters { + return &Exporters{ + exporters: exporters, + } +} + +func (e *Exporters) Export(ctx context.Context, line *parser.Line) error { + for _, exporter := range e.exporters { + if err := exporter.Export(ctx, line); err != nil { + return err + } + } + return nil +} + +func (e *Exporters) AddExporter(exporter Exporter) { + e.exporters = append(e.exporters, exporter) +} diff --git a/internal/parser/parser.go b/internal/parser/parser.go index 8f41e2f..cae72f8 100644 --- a/internal/parser/parser.go +++ b/internal/parser/parser.go @@ -7,29 +7,31 @@ import ( "strconv" "strings" "time" + + "github.com/DLC-link/cantcost/internal/env" ) // Recipient represents either a MemberRecipient or MediatorGroupRecipient type Recipient struct { - Type string // "MemberRecipient" or "MediatorGroupRecipient" - Member string // For MemberRecipient (e.g., "PAR::iBTC-validator-1::1220fa8543db...") - GroupID int // For MediatorGroupRecipient + Type string `json:"type"` + Member string `json:"member"` + GroupID int `json:"group_id"` } // EnvelopeCostDetails represents the cost details for an envelope type EnvelopeCostDetails struct { - WriteCost int - ReadCost int - FinalCost int - Recipients []Recipient + WriteCost int `json:"write_cost"` + ReadCost int `json:"read_cost"` + FinalCost int `json:"final_cost"` + Recipients []Recipient `json:"recipients"` } // EventCostDetails represents the parsed cost details from the log message type EventCostDetails struct { - EventCost int - CostMultiplier int - GroupToMembersSize map[int]int // MediatorGroupRecipient group -> member count - EnvelopesCost []EnvelopeCostDetails + EventCost int `json:"event_cost"` + CostMultiplier int `json:"cost_multiplier"` + GroupToMembersSize map[int]int `json:"group_to_members_size"` + EnvelopesCost []EnvelopeCostDetails `json:"envelopes_cost"` } type Line struct { @@ -51,6 +53,25 @@ type Line struct { CostDetails *EventCostDetails `json:"-"` } +type MessageLine struct { + // DockerTimestamp is the timestamp from the Docker log prefix + DockerTimestamp time.Time `json:"-"` + + // Fields from the JSON payload + Timestamp time.Time `json:"@timestamp"` + Message string `json:"message"` + LoggerName string `json:"logger_name"` + ThreadName string `json:"thread_name"` + Level string `json:"level"` + SpanID string `json:"span_id"` + SpanParentID string `json:"span_parent_id"` + TraceID string `json:"trace_id"` + SpanName string `json:"span_name"` + + // Parsed from Message + CostDetails *EventCostDetails `json:"cost_details"` +} + func ProcessLine(line string) (Line, error) { // Find the first space which separates the Docker timestamp from the JSON payload spaceIdx := strings.Index(line, " ") @@ -87,6 +108,25 @@ func ProcessLine(line string) (Line, error) { return l, nil } +func (l *Line) ToMessageLine() *MessageLine { + message := &MessageLine{ + DockerTimestamp: l.DockerTimestamp, + Timestamp: l.Timestamp, + LoggerName: l.LoggerName, + ThreadName: l.ThreadName, + Level: l.Level, + SpanID: l.SpanID, + SpanParentID: l.SpanParentID, + TraceID: l.TraceID, + SpanName: l.SpanName, + CostDetails: l.CostDetails, + } + if env.GetIncludeMessage() { + message.Message = l.Message + } + return message +} + func parseEventCostDetails(message string) (*EventCostDetails, error) { details := &EventCostDetails{ GroupToMembersSize: make(map[int]int), diff --git a/internal/parser/parser_test.go b/internal/parser/parser_test.go index d840677..c7ebaeb 100644 --- a/internal/parser/parser_test.go +++ b/internal/parser/parser_test.go @@ -1,6 +1,8 @@ package parser import ( + "encoding/json" + "fmt" "testing" ) @@ -81,5 +83,7 @@ func TestProcessLine(t *testing.T) { if l.CostDetails.CostMultiplier != line.expected.CostDetails.CostMultiplier { t.Errorf("CostMultiplier mismatch: got %d, want %d", l.CostDetails.CostMultiplier, line.expected.CostDetails.CostMultiplier) } + d, _ := json.Marshal(l.ToMessageLine()) + fmt.Println(string(d)) } } diff --git a/zarf/deployment/devnet/manifest.yaml b/zarf/deployment/devnet/manifest.yaml index 8988a5a..ce69101 100644 --- a/zarf/deployment/devnet/manifest.yaml +++ b/zarf/deployment/devnet/manifest.yaml @@ -16,7 +16,7 @@ spec: serviceAccountName: cantcost-sa containers: - name: cantcost - image: {image} + image: ghcr.io/dlc-link/cantcost:version imagePullPolicy: IfNotPresent env: # Deployment whose pods' logs you want to read @@ -27,6 +27,12 @@ spec: valueFrom: fieldRef: fieldPath: metadata.namespace + - name: EXPORTER_TYPE + value: "http" + - name: HTTP_EXPORTER_URL + value: "https://targetdomain/anything" + - name: HTTP_EXPORTER_AUTH_HEADER + value: "{secret}" --- apiVersion: v1 kind: ServiceAccount