Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions observability-logs-openobserve/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ helm upgrade --install observability-logs-openobserve \
oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \
--create-namespace \
--namespace openchoreo-observability-plane \
--version 0.4.1
--version 0.4.2
```

To switch to HA mode, disable the standalone chart and enable the distributed chart:
Expand All @@ -47,7 +47,7 @@ To switch to HA mode, disable the standalone chart and enable the distributed ch
helm upgrade --install observability-logs-openobserve \
oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \
--namespace openchoreo-observability-plane \
--version 0.4.1 \
--version 0.4.2 \
--reuse-values \
--set openobserve-standalone.enabled=false \
--set openobserve.enabled=true
Expand All @@ -66,7 +66,7 @@ to start collecting logs from the cluster and publish them to OpenObserve:
helm upgrade observability-logs-openobserve \
oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \
--namespace openchoreo-observability-plane \
--version 0.4.1 \
--version 0.4.2 \
--reuse-values \
--set fluent-bit.enabled=true
```
Expand All @@ -81,7 +81,7 @@ helm upgrade --install observability-logs-openobserve \
oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \
--create-namespace \
--namespace openchoreo-observability-plane \
--version 0.4.1 \
--version 0.4.2 \
--set fluent-bit.enabled=true \
--set openobserve-standalone.enabled=false \
--set openObserveSetup.enabled=false \
Expand Down
4 changes: 2 additions & 2 deletions observability-logs-openobserve/helm/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ apiVersion: v2
name: observability-logs-openobserve
description: A Helm chart for OpenChoreo Logs Module for OpenObserve
type: application
version: 0.4.1
appVersion: "0.4.1"
version: 0.4.2
appVersion: "0.4.2"
keywords:
- openobserve
- openchoreo
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

84 changes: 42 additions & 42 deletions observability-logs-openobserve/internal/api/gen/server.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions observability-logs-openobserve/internal/openobserve/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,19 @@ func (c *Client) executeSearchQuery(ctx context.Context, queryJSON []byte) (*Ope
return &openObserveResp, nil
}

// extractTotalCount extracts the total count from a count query response.
// The response is expected to have hits[0].total as the count value.
func extractTotalCount(resp *OpenObserveResponse) int {
if len(resp.Hits) > 0 {
if total, ok := resp.Hits[0]["total"]; ok {
if v, ok := total.(float64); ok {
return int(v)
}
}
}
return 0
}

func (c *Client) GetComponentLogs(ctx context.Context, params ComponentLogsParams) (*ComponentLogsResult, error) {
queryJSON, err := generateComponentLogsQuery(params, c.stream, c.logger)
if err != nil {
Expand All @@ -209,9 +222,19 @@ func (c *Client) GetComponentLogs(ctx context.Context, params ComponentLogsParam
logs = append(logs, entry)
}

// Execute a separate count query to get the true total number of matching logs
countQueryJSON, err := generateComponentLogsCountQuery(params, c.stream, c.logger)
if err != nil {
return nil, fmt.Errorf("failed to generate component logs count query: %w", err)
}
countResp, err := c.executeSearchQuery(ctx, countQueryJSON)
if err != nil {
return nil, fmt.Errorf("failed to execute component logs count query: %w", err)
}

return &ComponentLogsResult{
Logs: logs,
TotalCount: openObserveResp.Total,
TotalCount: extractTotalCount(countResp),
Took: openObserveResp.Took,
}, nil
}
Expand Down Expand Up @@ -239,9 +262,19 @@ func (c *Client) GetWorkflowLogs(ctx context.Context, params WorkflowLogsParams)
logs = append(logs, entry)
}

// Execute a separate count query to get the true total number of matching workflow logs
countQueryJSON, err := generateWorkflowLogsCountQuery(params, c.stream, c.logger)
if err != nil {
return nil, fmt.Errorf("failed to generate workflow logs count query: %w", err)
}
countResp, err := c.executeSearchQuery(ctx, countQueryJSON)
if err != nil {
return nil, fmt.Errorf("failed to execute workflow logs count query: %w", err)
}

return &WorkflowLogsResult{
Logs: logs,
TotalCount: openObserveResp.Total,
TotalCount: extractTotalCount(countResp),
Took: openObserveResp.Took,
}, nil
}
Expand Down
50 changes: 50 additions & 0 deletions observability-logs-openobserve/internal/openobserve/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,38 @@
package openobserve

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"
)

// isCountQuery checks if the request body contains a count query (size=0 and SELECT count).
func isCountQuery(r *http.Request) bool {
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
return false
}
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

var body map[string]interface{}
if err := json.Unmarshal(bodyBytes, &body); err != nil {
return false
}
query, ok := body["query"].(map[string]interface{})
if !ok {
return false
}
size, _ := query["size"].(float64)
sql, _ := query["sql"].(string)
return size == 0 && strings.Contains(strings.ToLower(sql), "count")
}

func newTestClient(serverURL string) *Client {
return NewClient(serverURL, "default", "default", "admin", "token", testLogger())
}
Expand Down Expand Up @@ -50,6 +74,19 @@ func TestGetComponentLogs(t *testing.T) {
t.Error("missing or incorrect basic auth")
}

if isCountQuery(r) {
resp := OpenObserveResponse{
Took: 1,
Total: 1,
Hits: []map[string]interface{}{
{"total": float64(2)},
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
return
}

resp := OpenObserveResponse{
Took: 42,
Total: 2,
Expand Down Expand Up @@ -137,6 +174,19 @@ func TestGetComponentLogs_ServerError(t *testing.T) {

func TestGetWorkflowLogs(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if isCountQuery(r) {
resp := OpenObserveResponse{
Took: 1,
Total: 1,
Hits: []map[string]interface{}{
{"total": float64(1)},
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
return
}

resp := OpenObserveResponse{
Took: 10,
Total: 1,
Expand Down
105 changes: 105 additions & 0 deletions observability-logs-openobserve/internal/openobserve/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,111 @@ func parseDurationMinutes(duration string) (int, error) {
}
}

// generateComponentLogsCountQuery generates a count query to get the true total of matching component logs.
func generateComponentLogsCountQuery(params ComponentLogsParams, stream string, logger *slog.Logger) ([]byte, error) {
if params.Namespace == "" {
return nil, fmt.Errorf("namespace is required for component log queries")
}

var conditions []string

conditions = append(conditions, "kubernetes_labels_openchoreo_dev_namespace = '"+escapeSQLString(params.Namespace)+"'")

if params.ProjectID != "" {
conditions = append(conditions, "kubernetes_labels_openchoreo_dev_project_uid = '"+escapeSQLString(params.ProjectID)+"'")
}
if params.EnvironmentID != "" {
conditions = append(conditions, "kubernetes_labels_openchoreo_dev_environment_uid = '"+escapeSQLString(params.EnvironmentID)+"'")
}
if len(params.ComponentIDs) > 0 {
componentConditions := make([]string, len(params.ComponentIDs))
for i, id := range params.ComponentIDs {
componentConditions[i] = "kubernetes_labels_openchoreo_dev_component_uid = '" + escapeSQLString(id) + "'"
}
conditions = append(conditions, "("+strings.Join(componentConditions, " OR ")+")")
}
if params.SearchPhrase != "" {
conditions = append(conditions, "log LIKE '%"+escapeSQLString(params.SearchPhrase)+"%'")
}
if len(params.LogLevels) > 0 {
levelConditions := make([]string, len(params.LogLevels))
for i, level := range params.LogLevels {
levelConditions[i] = "logLevel = '" + escapeSQLString(level) + "'"
}
conditions = append(conditions, "("+strings.Join(levelConditions, " OR ")+")")
}

sql := "SELECT count(*) as total FROM " + quoteIdentifier(stream)
if len(conditions) > 0 {
sql += " WHERE " + strings.Join(conditions, " AND ")
}

query := map[string]interface{}{
"query": map[string]interface{}{
"sql": sql,
"start_time": params.StartTime.UnixMicro(),
"end_time": params.EndTime.UnixMicro(),
"from": 0,
"size": 0,
},
}

if logger.Enabled(nil, slog.LevelDebug) {
if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil {
fmt.Printf("Generated count query for component logs:\n")
fmt.Println(string(prettyJSON))
}
}

return json.Marshal(query)
}

// generateWorkflowLogsCountQuery generates a count query to get the true total of matching workflow logs.
func generateWorkflowLogsCountQuery(params WorkflowLogsParams, stream string, logger *slog.Logger) ([]byte, error) {
var conditions []string

if params.Namespace != "" {
conditions = append(conditions, "kubernetes_namespace_name = 'workflows-"+escapeSQLString(params.Namespace)+"'")
}
if params.WorkflowRunName != "" {
conditions = append(conditions, "kubernetes_labels_workflows_argoproj_io_workflow = '"+escapeSQLString(params.WorkflowRunName)+"'")
}
if params.SearchPhrase != "" {
conditions = append(conditions, "log LIKE '%"+escapeSQLString(params.SearchPhrase)+"%'")
}
if len(params.LogLevels) > 0 {
levelConditions := make([]string, len(params.LogLevels))
for i, level := range params.LogLevels {
levelConditions[i] = "logLevel = '" + escapeSQLString(level) + "'"
}
conditions = append(conditions, "("+strings.Join(levelConditions, " OR ")+")")
}

sql := "SELECT count(*) as total FROM " + quoteIdentifier(stream)
if len(conditions) > 0 {
sql += " WHERE " + strings.Join(conditions, " AND ")
}

query := map[string]interface{}{
"query": map[string]interface{}{
"sql": sql,
"start_time": params.StartTime.UnixMicro(),
"end_time": params.EndTime.UnixMicro(),
"from": 0,
"size": 0,
},
}

if logger.Enabled(nil, slog.LevelDebug) {
if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil {
fmt.Printf("Generated count query for workflow logs:\n")
fmt.Println(string(prettyJSON))
}
}

return json.Marshal(query)
}

// generateAlertConfig generates an OpenObserve alert configuration as JSON
func generateAlertConfig(params LogAlertParams, streamName string, logger *slog.Logger) ([]byte, error) {
query := fmt.Sprintf(
Expand Down
Loading
Loading