From 4dc5c77f2f64f34c62c64c47c6d5c33bc48a39ba Mon Sep 17 00:00:00 2001 From: Nilushan Costa Date: Fri, 20 Mar 2026 16:33:43 +0530 Subject: [PATCH] fix: calculate value of total field in OpenObserve logs, traces and spans response using a count query Signed-off-by: Nilushan Costa --- observability-logs-openobserve/README.md | 8 +- .../helm/Chart.yaml | 4 +- .../internal/api/gen/models.gen.go | 2 +- .../internal/api/gen/server.gen.go | 84 +++++++------- .../internal/openobserve/client.go | 37 +++++- .../internal/openobserve/client_test.go | 50 +++++++++ .../internal/openobserve/queries.go | 105 ++++++++++++++++++ observability-tracing-openobserve/README.md | 6 +- .../helm/Chart.yaml | 4 +- .../internal/api/gen/models.gen.go | 8 +- .../internal/api/gen/server.gen.go | 71 +++++------- .../internal/openobserve/client.go | 41 ++++++- .../internal/openobserve/client_test.go | 66 +++++++++++ .../internal/openobserve/queries.go | 73 ++++++++++++ 14 files changed, 450 insertions(+), 109 deletions(-) diff --git a/observability-logs-openobserve/README.md b/observability-logs-openobserve/README.md index 026ce0f..bcf8857 100644 --- a/observability-logs-openobserve/README.md +++ b/observability-logs-openobserve/README.md @@ -38,7 +38,7 @@ helm upgrade --install observability-logs-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \ --create-namespace \ --namespace openchoreo-observability-plane \ - --version 0.4.1 + --version 0.4.2 ``` To switch to HA mode, disable the standalone chart and enable the distributed chart: @@ -47,7 +47,7 @@ To switch to HA mode, disable the standalone chart and enable the distributed ch helm upgrade --install observability-logs-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \ --namespace openchoreo-observability-plane \ - --version 0.4.1 \ + --version 0.4.2 \ --reuse-values \ --set openobserve-standalone.enabled=false \ --set openobserve.enabled=true @@ -66,7 +66,7 @@ to start collecting logs from the cluster and publish them to OpenObserve: helm upgrade observability-logs-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \ --namespace openchoreo-observability-plane \ - --version 0.4.1 \ + --version 0.4.2 \ --reuse-values \ --set fluent-bit.enabled=true ``` @@ -81,7 +81,7 @@ helm upgrade --install observability-logs-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-logs-openobserve \ --create-namespace \ --namespace openchoreo-observability-plane \ - --version 0.4.1 \ + --version 0.4.2 \ --set fluent-bit.enabled=true \ --set openobserve-standalone.enabled=false \ --set openObserveSetup.enabled=false \ diff --git a/observability-logs-openobserve/helm/Chart.yaml b/observability-logs-openobserve/helm/Chart.yaml index cadc37a..655cde2 100644 --- a/observability-logs-openobserve/helm/Chart.yaml +++ b/observability-logs-openobserve/helm/Chart.yaml @@ -5,8 +5,8 @@ apiVersion: v2 name: observability-logs-openobserve description: A Helm chart for OpenChoreo Logs Module for OpenObserve type: application -version: 0.4.1 -appVersion: "0.4.1" +version: 0.4.2 +appVersion: "0.4.2" keywords: - openobserve - openchoreo diff --git a/observability-logs-openobserve/internal/api/gen/models.gen.go b/observability-logs-openobserve/internal/api/gen/models.gen.go index 8da565d..26c81de 100644 --- a/observability-logs-openobserve/internal/api/gen/models.gen.go +++ b/observability-logs-openobserve/internal/api/gen/models.gen.go @@ -314,7 +314,7 @@ type LogsQueryResponse struct { // TookMs The time taken to query the logs in milliseconds TookMs *int `json:"tookMs,omitempty"` - // Total The total number of logs queried + // Total The total number of matching log entries, capped at 1000 Total *int `json:"total,omitempty"` } diff --git a/observability-logs-openobserve/internal/api/gen/server.gen.go b/observability-logs-openobserve/internal/api/gen/server.gen.go index dd29210..08e5efa 100644 --- a/observability-logs-openobserve/internal/api/gen/server.gen.go +++ b/observability-logs-openobserve/internal/api/gen/server.gen.go @@ -863,48 +863,48 @@ func (sh *strictHandler) Health(w http.ResponseWriter, r *http.Request) { // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+waWXMUufmvqJRUbVLVnhkDedh5M8awTryYtSE8gCulaX3TrbVaaktqm8Hl/57S0deM", - "ei7MkQovMG5J331L9ziVRSkFCKPx9B7rNIeCuJ9HHJS5qDhcwE0F2thvpZIlKMPA7UiloMwwKVaXQJAZ", - "B2p/UtCpYqXfh9/nYHJQyOSAiMWAVMUBMY3qIwk2ixLwFM+k5EAEfkgwEwbULeGr8N7mgOpVJOfIsAKQ", - "keimArVAc7mMqQWvjWIis9At4cRIFYder1qolYY4TBBVgacfcGZwgjNjP3Hj/nGrNzjBAm7wVQS7yRXo", - "XHIaR98so1vCK1hLRYAtqmIGysK+Y4LKuzhgv7afzB4SrOCmYsqq+ANuVRcQdjTWEW+X11YScvYnpMZS", - "W4AhlBgSs7RgpO/YgJjOSxDHuVQgUbMZvTt90fCFEzyXqiAGT3FVMRozBBC3TElRbImos31nVIIUEEdg", - "V5xWNtqt3alLkq4B5JZXoaHjixjAUkmri214D1t35HvJbpwQunz0SFjRR9K3g5gJaVkpL4++AXn6okx5", - "qzcSwSdIK+Ndi8sMzYgG6oWmN7LiEayStLStMfGG1KQTRmMcdcKwLqXQ8DMO/4zDHSP8GUX/H6Po1oGv", - "AKNYGifEr/UtLnxbin21W6Vl9Z9Kk8wKsoBCqkX4M+ZQXyXmrobXeMR8D7NcyuvhoFmAdpQPSMYt9lV+", - "50HGVK4NMZWOw/JrQ6BqyeoqTUE7WSslVUSgg6wykdn8cLkQ6TC7JK0TxCqFfg0Zcg0C2R9DUTVVQIxL", - "DVVJ618izYnI3G8KHOzXmDVwoo0lEeiRGYiwrABtSFHWsrJHkF6INCZyS9pzkl6DoKcDjjbzy+j0Bfqb", - "9bC5kgWSM22T1IxxZhb1lr9vFyrs9zOZsZTwIZzcLzucNnRsCXlXA1pSjHaCtZGDMB5VQMx6jusQfyaz", - "E2G8s/bNhsMt8EFOkV+OaVtmw6dq14uc6yavaMByq42ByAyBIzwZSnWvBzNENNe5tGFyYlAGwtYbQGtM", - "MXK/JKMOIdmY9FIpDGEC1DBvzZZdGeok460k103e+6Paq0zYW35NRt+Kwzb/78hfKekwgn9VM1ACDGhU", - "Sron6F2qliWEO+DyJcpWsqrLmV3Z2bNg2tMCYqGwyT3bpqZO5GnQ2ZR4YLfi3cLvJRCV5pepLGFz0b6F", - "H60vcTeIf3PH7iHFetUTW7oM1yCusjmWdMCQ3DJKJYVupgaF7H/MtcrwiRQlt0jPn18e/Pvw4OzgyZN4", - "Hhmo7n6rCiIOFBBqu9yAs01ILYLfmdZMZKjmHs0ZcKrRL9oQZd6yAn5BRFD0Cwjq/oqRYZjha7ntYA6p", - "fEZoPXC1xRWpTC4V++yzu1QzRikI28ZK81JWwk9FxJyz1NRjN0H4pZPcyQ6l5JnM9B+2LB8c9wZGB9gR", - "1PevwUFWWpk1zpFgzgoWqsI5qbjB08PJJIlVAOQTK6oC+dbaImMGCm1bCQWmUlYyYY+DMUlwwUT4s0Fs", - "pZT5zpzL7MwWMY5DB8uz6pXx4uT5u1c4waevX57jBL8/uniNE3xycXF+EZ8h+A9EKbJw6mM3FZx6qEZV", - "YCs95+xvckV03B11PxpIAedzPP1wj/+qYI6n+C/jdmQ/DvP6cTSWPCTrD72X6nrO5V3vzJVrJpU5VxRU", - "TyVOHTimFbsfSXtgWf21IElzMiq3xqcGK2Bl9javpRDW4koak+5L/Wq9ewyFNy4zPVj0akcyA4pCpzev", - "OLcctPptrG8rRTc1+7LZWa1vBapW/zAkawtGyuvf9XBaDG1jM0QwNb9MoIJxzjSkUtBON9/xPiPN0DDR", - "LXX8vCvDCKxYSFthMKayvdqU71EuxLx1haH1mf4ugLioRF3P7Z/uH9w8eC7DBNqQ1NQU2Pzc1mxvgRQW", - "+7KcmEZHb06RLiFlc5YSN4SgMGcCtJOahapIanylR6wUM5uSCSWlAYWKShvEbL62xc9HYaQbQWe2HkR3", - "zOQOSoeS81BRjJArK+v6IiWcO4za5bFSMmFsSvkovEk707O5viCCZN0WXPtKJajYEVcPHUolbxkFimbe", - "JwpJKw6jjzZDcZZCCCFBXEclSXNAT0YTmzYUx1OcG1Pq6Xh8d3c3Im55JFU2Dmf1+Oz0+OT15cnBk9Fk", - "lJuCd+oNvMJzPe+wYQwdBfkdvTnFCb4Fpb1KDkeT0SQM4gUpGZ7ip6PJ6KntronJnYGNScnGt4djK5Nx", - "M+ErpY4Mdf5opdeIKVLU+cE+k+KU1ocsmdibImjzXNJFbWUgHCJSljyYzPhP7QdbPq5tinordc5D3+hD", - "mlYhzDumn0wmXwN/SCSOgL7kzgYTxkOCn21FTVPJ9upujDu18X41bjCxTp1qE/hWvPf7gwjfp+KWcEaR", - "aiE/mxw+Erc1cFt2B8aNvHbFdM1Ur95+PLbeLYF9Nnn6SDxdVi4ao4/VZPI0/bT47H4AyolGQqISlGNV", - "ugR9y+DOu6Oco2aAgOZSJuhNaKxnRCWoKTLQjHy2ifykM3yhtlyWpf3dkVzbmjye2F52Yf7jS6y+7RZ/", - "7Ymvw0Csc3pMw/bQkQePAnyLoCoKYqNoJ1xasogtJj+4SICv7MYQeQkvc3I49lckY5eEhiPwsRvYIyJ6", - "d7xiu0jsDzf3z18pHq88M9oqHh8+Lv7YFUpEi0etEMNVyJ7B+WuGy1+/Hf6OPAhXQOgCwSemjd7eYb+h", - "f9XO0LuICW525G8cNzja+N7+ZyPng/c1DibSsr5w3/f0On+473VfqRTZ0/TDLd8PaPrPvovpC2nQ3I3i", - "fkSrr41xrdUnOINI+ngFZsmKh9qdFTN+Bebb2XDvedR6ZSkwisHtT/P9HzFfZ4IbbLckihRgQGk3R9vh", - "MRCzO2xvi+u3RLiO8Hi5COnWgstTk6sEl1XEgd65ZxLxTLDJg/zZH7P8+u45KLw/+eGc+Ifzn9oC9yp6", - "6mdKg/3Fb0RQDhoZxbIMVPN0q80TJOh30Mw9iO6brS+w9FXXD5DQTNIFSolAM+uOC/TPy/PXyI8yE0Q0", - "omw+B2X722WKNSrIAmkQtLOpJAsuCdUjHJ2LfmP/WX7sNug7QaEod0L/6T4b3WcvA4/6Vw6Em9wSHS21", - "jnNIrx1Av3Hp1ddy2zBa9SMP/wtNrX+b0L5Ka+/FPXmLrR5nroj80lOPmEY1HKf1p19ApH8v2aNRliD8", - "hd4UpVII8A8cw+O4tc/vWiCVeCxWW0hLxuU1nVrVd8wmaPLKAQ0f71cHxHUpS3h7adGWMW5u9JDcD6dQ", - "f5nhRnqR88F2VyF0iY4dDNQ/XD38NwAA//8gQ0OrXDYAAA==", + "H4sIAAAAAAAC/+xa2XPbuBn/VzBoZ7adoSU5SR9Wb47jZN1646ydNA+JpwMRn0isQYAGQDuKx/97Bwcv", + "CdQV5+g0L4lMgN99/PAR9ziVRSkFCKPx9B7rNIeCuJ9HHJS5qDhcwE0F2thnpZIlKMPA7UiloMwwKVaX", + "QJAZB2p/UtCpYqXfh9/nYHJQyOSAiOWAVMUBMY3qVxJsFiXgKZ5JyYEI/JBgJgyoW8JX6b3NAdWrSM6R", + "YQUgI9FNBWqB5nKZU0teG8VEZqlbwYmRKk69XrVUKw1xmiCqAk8/4MzgBGfGPuLG/eNWb3CCBdzgqwh3", + "kyvQueQ0zr5ZRreEV7BWikBbVMUMlKV9xwSVd3HCfm0/mz0kWMFNxZR18Qfcui4w7HisY96urq0l5OxP", + "SI2VtgBDKDEkFmkhSN+xATOdlyCOc6lAomYzenf6otELJ3guVUEMnuKqYjQWCCBumZKi2JJRZ/vOrAQp", + "IM7ArjivbIxbu1OXJF1DyC2vUkPHFzGCpZLWF9voHrbuqPdS3DgjdPXoibDij6QfB7EQ0rJS3h79APLy", + "RZXyUW8kgk+QVsanFpcZmhEN1BtNb1TFM1gVaWlbE+KNqEmnjMY06pRhXUqh4Wcd/lmHO0H4s4r+P1bR", + "rQtfAUaxNC6IX+tHXHi2VPvqtErL6j+VJpk1ZAGFVIvwZyyhvkrNXS2v8Yr5Hma5lNfDRbMA7SQfsIxb", + "7Lv8zpOMuVwbYiodp+XXhkjVltVVmoJ2tlZKqohBB1VlIrP94XIh0mF1SVo3iFUJ/Roy5BoEsj+Gqmqq", + "gBjXGqqS1r9EmhORud8UONinsWjgRBsrItAjM1BhWQHakKKsbWVfQXoh0pjJrWjPSXoNgp4OJNrML6PT", + "F+hvNsPmShZIzrRtUjPGmVnUW/6+Xamwz89kxlLCh3hyv+x42tKxJeVdA2jJMdoZ1lYOwnjUAbHoOa5L", + "/JnMToTxydoPGw63wAc1RX455m2ZDb9Vp17kvW7zihYst9oEiMwQOMGToVb3erBDRHudaxsmJwZlICze", + "AFpzion7JR11iMnGppdKYQgToIZ1a7bsqlCnGW9luW7z3p/VXjBhb/s1HX0rDdv+v6N+paTDDP5VzUAJ", + "MKBRKemepHdBLUsMd+DlIcpWtqrhzK7q7AmY9oyAWClses+2ralTeRp2tiUe2K14t/J7CUSl+WUqS9gM", + "2rfIo/UQd4P5N5/YPaXYWfXEQpdhDOKQzbGkA4HkllEqKXQ7NShk/2PuqAyfSFFyy/T8+eXBvw8Pzg6e", + "PIn3kQF091tVEHGggFB7yg0824bUMvidac1Ehmrt0ZwBpxr9og1R5i0r4BdEBEW/gKDur5gYhhm+VtsO", + "59DKZ4TWA1cLrkhlcqnYZ9/dpZoxSkHYY6w0L2Ul/FREzDlLTT12E4RfOsud7AAlz2Sm/7CwfHDcGxQd", + "UEdQf34NCbJylFmTHAnmrGABFc5JxQ2eHk4mSQwBkE+sqArkj9aWGTNQaHuUUGAqZS0T9jgakwQXTIQ/", + "G8bWSpk/mXOZnVkQ4zR0tLyq3hkvTp6/e4UTfPr65TlO8Puji9c4wScXF+cX8RmCf0CUIgvnPnZTwamn", + "alQFFum5ZH+TK6Lj6aj71UAKOJ/j6Yd7/FcFczzFfxm3I/txmNePo7XkIVn/0nuprudc3vXeuXKHSWXO", + "FQXVc4lzB455xe5H0r6w7P7akKR5M2q3JqcGEbAye4fXUglreSVNSPetfrU+PYbKG5eZHgS92onMgKJw", + "0ptXnFsNWv820beVoxvMvhx21utbkardP0zJxoKR8vp3PdwWw7GxGSKYWl8mUME4ZxpSKWjnNN/JPiPN", + "0DDRLXXyvCAmzW1BrlsvA52glJQlUEQMsske4RErdSuKx1y51/Hle8CIWBavKLQeAdwFEheVqHHe/jDg", + "wc2J5zJMpg1JTS2B7dstlnsLpLDcl+3ENDp6c4p0CSmbs5S44QSFOROgndUsVUVS4xEgsVbMbGQQSkoD", + "ChWVNojZPm5B0UdhpBtNZxYnojtmckelI8l5QBoj5OBmjTtSwrnjqF1/KyUTxraaj8KHugtziwEKIkjW", + "PZprj2CCi51w9TCiVPKWUaBo5nOlkLTiMPpoOxdnKYTSEsx1VJI0B/RkZGO7UhxPcW5Mqafj8d3d3Yi4", + "5ZFU2Ti8q8dnp8cnry9PDp6MJqPcFLyDQ/CKzvUcxJY3dBTsd/TmFCf4FpT2LjkcTUaTMKAXpGR4ip+O", + "JqOn9tRNTO4CbExKNr49HFubjJvJXyl1ZNjzR2u9xkwRsOcH/kyKU1q/ZMXEPhRBm+eSLuooA+EYkbLk", + "IWTGf2o/8PL1blM1XME/D/2gD+1bhfLvlH4ymXwN/qHBOAH6ljsbbCQPCX62lTQNwu3hcYw7mHk/7BtC", + "rINfbWPfSvf+uSGi96m4JZxRpFrKzyaHj6RtTdzC8aC4kdcOZNdK9XD446n1bonss8nTR9LpsnLVGH2s", + "JpOn6afFZ/cDUE40EhKVoJyq0jXuWwZ3Ph3lHDWDBTSXMkFvwoF7RlSCGvCBZuSzbfAnnaEMtTBalvZ3", + "x3LtkeXxzPayS/MfXxL17Sny1575OgrETlSPGdieOvLkUaBvGVRFQWwV7ZRLKxaxIPODqwT4ym4MlZfw", + "MieHY//pZOya0HAFPnaDfERE79uv2K4S+5eb79JfqR6vXD/aqh4fPi7/2KeViBePWiOGTyR7FuevWS5/", + "/Xb8O/YgXAGhCwSfmDZ6+4T9hvlVJ0PvA01IsyP/JXJDoo3v7X+2cj74XONgIkfZF+75nlnnX+5n3VeC", + "InuGfvj69wOG/rPvEvpCGjR3I7ofMerrYFwb9QnOINI+XoFZiuKh485KGL8C8+1iuHdtar2zFBjF4PZn", + "+P6PhK8LwQ2xWxJFCjCgtJuv7XBJiNkd9myL6ztGuK7weBmEdLHg8tTkKsFlFUmgd+76RLwTbMog/+6P", + "Cb++ew8K91J+uCT+4fKnjsC9QE99fWnwfPEbEZSDRkaxLAPVXOlq+wQJ/h0Mc0+ie5frCyJ9NfUDJTST", + "dIFSItDMpuMC/fPy/DXyo8wEEY0om89B2fPtssQaFWSBNAja2VSSBZeE6hGOzkW/cf4sX4IbzJ3gUJQ7", + "o/9Mn43ps1eAR/MrB8JNboWOQq3jHNJrR9BvXLoNtnxsGK3mkaf/haHW/5rQ3lZrv5d78RZbXdpcMfml", + "lx4xjWo6zutPv0BIf4+yJ6MsQfgPfVOUSiHAX3wMl+bWXstriVTisVRtKS0Fl/d0al3fCZvgyStHNDy8", + "Xx0Q11CW8PajRQtj3NzoIbkfbqH+Y4Yb6UXeD7G7SqErdOzFIP3D1cN/AwAA//+wJDwhdDYAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/observability-logs-openobserve/internal/openobserve/client.go b/observability-logs-openobserve/internal/openobserve/client.go index f081f3c..0d4876a 100644 --- a/observability-logs-openobserve/internal/openobserve/client.go +++ b/observability-logs-openobserve/internal/openobserve/client.go @@ -184,6 +184,19 @@ func (c *Client) executeSearchQuery(ctx context.Context, queryJSON []byte) (*Ope return &openObserveResp, nil } +// extractTotalCount extracts the total count from a count query response. +// The response is expected to have hits[0].total as the count value. +func extractTotalCount(resp *OpenObserveResponse) int { + if len(resp.Hits) > 0 { + if total, ok := resp.Hits[0]["total"]; ok { + if v, ok := total.(float64); ok { + return int(v) + } + } + } + return 0 +} + func (c *Client) GetComponentLogs(ctx context.Context, params ComponentLogsParams) (*ComponentLogsResult, error) { queryJSON, err := generateComponentLogsQuery(params, c.stream, c.logger) if err != nil { @@ -209,9 +222,19 @@ func (c *Client) GetComponentLogs(ctx context.Context, params ComponentLogsParam logs = append(logs, entry) } + // Execute a separate count query to get the true total number of matching logs + countQueryJSON, err := generateComponentLogsCountQuery(params, c.stream, c.logger) + if err != nil { + return nil, fmt.Errorf("failed to generate component logs count query: %w", err) + } + countResp, err := c.executeSearchQuery(ctx, countQueryJSON) + if err != nil { + return nil, fmt.Errorf("failed to execute component logs count query: %w", err) + } + return &ComponentLogsResult{ Logs: logs, - TotalCount: openObserveResp.Total, + TotalCount: extractTotalCount(countResp), Took: openObserveResp.Took, }, nil } @@ -239,9 +262,19 @@ func (c *Client) GetWorkflowLogs(ctx context.Context, params WorkflowLogsParams) logs = append(logs, entry) } + // Execute a separate count query to get the true total number of matching workflow logs + countQueryJSON, err := generateWorkflowLogsCountQuery(params, c.stream, c.logger) + if err != nil { + return nil, fmt.Errorf("failed to generate workflow logs count query: %w", err) + } + countResp, err := c.executeSearchQuery(ctx, countQueryJSON) + if err != nil { + return nil, fmt.Errorf("failed to execute workflow logs count query: %w", err) + } + return &WorkflowLogsResult{ Logs: logs, - TotalCount: openObserveResp.Total, + TotalCount: extractTotalCount(countResp), Took: openObserveResp.Took, }, nil } diff --git a/observability-logs-openobserve/internal/openobserve/client_test.go b/observability-logs-openobserve/internal/openobserve/client_test.go index 98ca975..084b72e 100644 --- a/observability-logs-openobserve/internal/openobserve/client_test.go +++ b/observability-logs-openobserve/internal/openobserve/client_test.go @@ -4,14 +4,38 @@ package openobserve import ( + "bytes" "context" "encoding/json" + "io" "net/http" "net/http/httptest" + "strings" "testing" "time" ) +// isCountQuery checks if the request body contains a count query (size=0 and SELECT count). +func isCountQuery(r *http.Request) bool { + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + return false + } + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + var body map[string]interface{} + if err := json.Unmarshal(bodyBytes, &body); err != nil { + return false + } + query, ok := body["query"].(map[string]interface{}) + if !ok { + return false + } + size, _ := query["size"].(float64) + sql, _ := query["sql"].(string) + return size == 0 && strings.Contains(strings.ToLower(sql), "count") +} + func newTestClient(serverURL string) *Client { return NewClient(serverURL, "default", "default", "admin", "token", testLogger()) } @@ -50,6 +74,19 @@ func TestGetComponentLogs(t *testing.T) { t.Error("missing or incorrect basic auth") } + if isCountQuery(r) { + resp := OpenObserveResponse{ + Took: 1, + Total: 1, + Hits: []map[string]interface{}{ + {"total": float64(2)}, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + return + } + resp := OpenObserveResponse{ Took: 42, Total: 2, @@ -137,6 +174,19 @@ func TestGetComponentLogs_ServerError(t *testing.T) { func TestGetWorkflowLogs(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isCountQuery(r) { + resp := OpenObserveResponse{ + Took: 1, + Total: 1, + Hits: []map[string]interface{}{ + {"total": float64(1)}, + }, + } + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(resp) + return + } + resp := OpenObserveResponse{ Took: 10, Total: 1, diff --git a/observability-logs-openobserve/internal/openobserve/queries.go b/observability-logs-openobserve/internal/openobserve/queries.go index a9098d1..c3d7d69 100644 --- a/observability-logs-openobserve/internal/openobserve/queries.go +++ b/observability-logs-openobserve/internal/openobserve/queries.go @@ -119,6 +119,111 @@ func parseDurationMinutes(duration string) (int, error) { } } +// generateComponentLogsCountQuery generates a count query to get the true total of matching component logs. +func generateComponentLogsCountQuery(params ComponentLogsParams, stream string, logger *slog.Logger) ([]byte, error) { + if params.Namespace == "" { + return nil, fmt.Errorf("namespace is required for component log queries") + } + + var conditions []string + + conditions = append(conditions, "kubernetes_labels_openchoreo_dev_namespace = '"+escapeSQLString(params.Namespace)+"'") + + if params.ProjectID != "" { + conditions = append(conditions, "kubernetes_labels_openchoreo_dev_project_uid = '"+escapeSQLString(params.ProjectID)+"'") + } + if params.EnvironmentID != "" { + conditions = append(conditions, "kubernetes_labels_openchoreo_dev_environment_uid = '"+escapeSQLString(params.EnvironmentID)+"'") + } + if len(params.ComponentIDs) > 0 { + componentConditions := make([]string, len(params.ComponentIDs)) + for i, id := range params.ComponentIDs { + componentConditions[i] = "kubernetes_labels_openchoreo_dev_component_uid = '" + escapeSQLString(id) + "'" + } + conditions = append(conditions, "("+strings.Join(componentConditions, " OR ")+")") + } + if params.SearchPhrase != "" { + conditions = append(conditions, "log LIKE '%"+escapeSQLString(params.SearchPhrase)+"%'") + } + if len(params.LogLevels) > 0 { + levelConditions := make([]string, len(params.LogLevels)) + for i, level := range params.LogLevels { + levelConditions[i] = "logLevel = '" + escapeSQLString(level) + "'" + } + conditions = append(conditions, "("+strings.Join(levelConditions, " OR ")+")") + } + + sql := "SELECT count(*) as total FROM " + quoteIdentifier(stream) + if len(conditions) > 0 { + sql += " WHERE " + strings.Join(conditions, " AND ") + } + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "sql": sql, + "start_time": params.StartTime.UnixMicro(), + "end_time": params.EndTime.UnixMicro(), + "from": 0, + "size": 0, + }, + } + + if logger.Enabled(nil, slog.LevelDebug) { + if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil { + fmt.Printf("Generated count query for component logs:\n") + fmt.Println(string(prettyJSON)) + } + } + + return json.Marshal(query) +} + +// generateWorkflowLogsCountQuery generates a count query to get the true total of matching workflow logs. +func generateWorkflowLogsCountQuery(params WorkflowLogsParams, stream string, logger *slog.Logger) ([]byte, error) { + var conditions []string + + if params.Namespace != "" { + conditions = append(conditions, "kubernetes_namespace_name = 'workflows-"+escapeSQLString(params.Namespace)+"'") + } + if params.WorkflowRunName != "" { + conditions = append(conditions, "kubernetes_labels_workflows_argoproj_io_workflow = '"+escapeSQLString(params.WorkflowRunName)+"'") + } + if params.SearchPhrase != "" { + conditions = append(conditions, "log LIKE '%"+escapeSQLString(params.SearchPhrase)+"%'") + } + if len(params.LogLevels) > 0 { + levelConditions := make([]string, len(params.LogLevels)) + for i, level := range params.LogLevels { + levelConditions[i] = "logLevel = '" + escapeSQLString(level) + "'" + } + conditions = append(conditions, "("+strings.Join(levelConditions, " OR ")+")") + } + + sql := "SELECT count(*) as total FROM " + quoteIdentifier(stream) + if len(conditions) > 0 { + sql += " WHERE " + strings.Join(conditions, " AND ") + } + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "sql": sql, + "start_time": params.StartTime.UnixMicro(), + "end_time": params.EndTime.UnixMicro(), + "from": 0, + "size": 0, + }, + } + + if logger.Enabled(nil, slog.LevelDebug) { + if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil { + fmt.Printf("Generated count query for workflow logs:\n") + fmt.Println(string(prettyJSON)) + } + } + + return json.Marshal(query) +} + // generateAlertConfig generates an OpenObserve alert configuration as JSON func generateAlertConfig(params LogAlertParams, streamName string, logger *slog.Logger) ([]byte, error) { query := fmt.Sprintf( diff --git a/observability-tracing-openobserve/README.md b/observability-tracing-openobserve/README.md index b79770f..5052a4b 100644 --- a/observability-tracing-openobserve/README.md +++ b/observability-tracing-openobserve/README.md @@ -37,7 +37,7 @@ helm upgrade --install observability-tracing-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-tracing-openobserve \ --create-namespace \ --namespace openchoreo-observability-plane \ - --version 0.2.0 + --version 0.2.1 ``` To switch to HA mode, disable the standalone chart and enable the distributed chart: @@ -46,7 +46,7 @@ To switch to HA mode, disable the standalone chart and enable the distributed ch helm upgrade --install observability-tracing-openobserve \ oci://ghcr.io/openchoreo/helm-charts/observability-tracing-openobserve \ --namespace openchoreo-observability-plane \ - --version 0.2.0 \ + --version 0.2.1 \ --reuse-values \ --set openobserve-standalone.enabled=false \ --set openobserve.enabled=true @@ -61,6 +61,6 @@ Refer to the [openobserve Helm chart documentation](https://github.com/openobser > oci://ghcr.io/openchoreo/helm-charts/observability-tracing-openobserve \ > --create-namespace \ > --namespace openchoreo-observability-plane \ -> --version 0.2.0 \ +> --version 0.2.1 \ > --set openobserve-standalone.enabled=false >``` diff --git a/observability-tracing-openobserve/helm/Chart.yaml b/observability-tracing-openobserve/helm/Chart.yaml index ac5835d..5f15598 100644 --- a/observability-tracing-openobserve/helm/Chart.yaml +++ b/observability-tracing-openobserve/helm/Chart.yaml @@ -5,8 +5,8 @@ apiVersion: v2 name: observability-tracing-openobserve description: A Helm chart for OpenChoreo Tracing Module for OpenObserve type: application -version: 0.2.0 -appVersion: "0.2.0" +version: 0.2.1 +appVersion: "0.2.1" keywords: - openobserve - openchoreo diff --git a/observability-tracing-openobserve/internal/api/gen/models.gen.go b/observability-tracing-openobserve/internal/api/gen/models.gen.go index 06d4200..d6e6a25 100644 --- a/observability-tracing-openobserve/internal/api/gen/models.gen.go +++ b/observability-tracing-openobserve/internal/api/gen/models.gen.go @@ -7,10 +7,6 @@ import ( "time" ) -const ( - BearerAuthScopes = "BearerAuth.Scopes" -) - // Defines values for ErrorResponseTitle. const ( BadRequest ErrorResponseTitle = "badRequest" @@ -116,7 +112,7 @@ type TraceSpansListResponse struct { // TookMs The time taken to query the spans in milliseconds TookMs *int `json:"tookMs,omitempty"` - // Total The total number of spans + // Total The total number of matching spans, capped at 1000 Total *int `json:"total,omitempty"` } @@ -125,7 +121,7 @@ type TracesListResponse struct { // TookMs The time taken to query the traces in milliseconds TookMs *int `json:"tookMs,omitempty"` - // Total The total number of traces + // Total The total number of matching traces, capped at 1000 Total *int `json:"total,omitempty"` // Traces The list of traces diff --git a/observability-tracing-openobserve/internal/api/gen/server.gen.go b/observability-tracing-openobserve/internal/api/gen/server.gen.go index 4e68091..edb7539 100644 --- a/observability-tracing-openobserve/internal/api/gen/server.gen.go +++ b/observability-tracing-openobserve/internal/api/gen/server.gen.go @@ -50,12 +50,6 @@ type MiddlewareFunc func(http.Handler) http.Handler // QueryTraces operation middleware func (siw *ServerInterfaceWrapper) QueryTraces(w http.ResponseWriter, r *http.Request) { - ctx := r.Context() - - ctx = context.WithValue(ctx, BearerAuthScopes, []string{}) - - r = r.WithContext(ctx) - handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { siw.Handler.QueryTraces(w, r) })) @@ -81,12 +75,6 @@ func (siw *ServerInterfaceWrapper) QuerySpansForTrace(w http.ResponseWriter, r * return } - ctx := r.Context() - - ctx = context.WithValue(ctx, BearerAuthScopes, []string{}) - - r = r.WithContext(ctx) - handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { siw.Handler.QuerySpansForTrace(w, r, traceId) })) @@ -121,12 +109,6 @@ func (siw *ServerInterfaceWrapper) GetSpanDetailsForTrace(w http.ResponseWriter, return } - ctx := r.Context() - - ctx = context.WithValue(ctx, BearerAuthScopes, []string{}) - - r = r.WithContext(ctx) - handler := http.Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { siw.Handler.GetSpanDetailsForTrace(w, r, traceId, spanId) })) @@ -634,33 +616,32 @@ func (sh *strictHandler) Health(w http.ResponseWriter, r *http.Request) { // Base64 encoded, gzipped, json marshaled Swagger object var swaggerSpec = []string{ - "H4sIAAAAAAAC/+xZTXPbNhP+Kxi870wutCTbaQ+6Oc5H3SZxEqvTQ+LDilyJiEiAAUA5ikf/vbMAKZEU", - "6MixnZl0fLIpYBf78exD7PKaxyovlERpDR9fcxOnmIP797ReuEDQcXoRqwLp90KrArUV6HZtxOnBrmgL", - "N1YLOefriKNcCq1k3rcuIUdTQIzB1UKrzxiHJNcR1/ilFBoTPv7YUHMZ1VvV1MmuI/5Ca6U/oCmUNAEP", - "ErQgMv+fibUorFCSj/kkRYYkynI0BubII45fIS8yUv9GGCPknNVmsJnALDHsibGg7UTk+ISBTNgTlIl7", - "4lEgPKT+VCV40+mxSpDNtMqZmhrUS9SM/oi4bdD5s4uD1wdHR6FzrLAZ7umhLHMK6RSSD/ilRGN5xEsJ", - "pU2VFt8w4RGfKT0VSYKSR1xIi1pCduEsc6FuJKGRr520TDTEeFGAfO4SYPozBNZqMS2tfxIWcxN2ZruR", - "qRmzKTJTAFnZ1rfAVVh+gatacKMqFM8lZGVPPN3SHkpCEal+AK1hRc9JqYFUv+1xt15vOsuEZBKkMhgr", - "mRifrRwsH1Oqfn+6tYQyN0fty9RhtAchMmFW5NgJ6UZtAhYPaEMoUgVoYpAC5FkSVu93eNvPnod0aDSq", - "1DGe3A0GTGMGFhNmlfutVvvLwsPcENUbwklLfwnZI7gQMulkOqjhLfThhcj4uxpqkuyxnpZ/FHQ3co15", - "LYztZxo6pgdUmTCWrPFboi3+Om+Tx5p9MGzuhaxfDpvdsrZKLd70oMcda2GBkljsS4l6tTHBEIxykWVi", - "i6Nd2Fhloeey45aYLPMp6ibSu0p6C+w7xfUjjlmn974981rDWvzSjRSwEb8nDnD6Hp4E3DF7s4BWqsEB", - "vct1tfZuqIsxWKmnqvSdQaBU2zik8DSd2I3Frev2dvFwu/sozWcwzGlubT9K6ji3P2v0l+R7KqX6Jr+D", - "1Fvhx1Xl3vHKRC6q1M6gzCwfH45GUeCYHL6KvMwbCXdlRTyg0ZaauLXa43SMIp4LWT0GgdDuV/+vccbH", - "/H/Dbac7rNrcYbDHJRVK23OdoG454IznIR9oP1Mk0A1W3U/BRjLQHv0AeG+TjE63vD1rSyDtqO120S6s", - "camFXV1Q7Dx8niFo1CelTelp6p5e1gb9+c+E1LrdfFytbo1LrS34mhQLOVN+lCAt+HZfunLh5wXK01Rp", - "VGyCkLueqBMbYdjJuzNmCozFTMSeVxOcCYnGBYq0aogtsylYBq7GqHGHBAqLmuWlsUxQE52jtJ+kVcxh", - "SYNFdiVs6rQ0LDmvuvDBJ0JmJmKs3naV0ScFxCmyo8GIOmedVb6a8XB4dXU1ALc8UHo+rGTN8PXZ6Yu3", - "Fy8OjgajQWrzrNGz852TYSoyYVdsUjlyUjly8u6MR3yJ2vjYHA5GgxFpUgVKKAQf8+PBaHBM/Q7Y1CVw", - "CIUYLg8hK1I4HPr32tBDi7hCmQA5v/cvZ/9idnMJClBgNkFE4/JBrOnFJvWbU3tGeqaSVZ36akYERZFV", - "eRx+NnRiPZX6XjEHCG/dxr7VJfqG0l1RXAiORqN7tqB1C3IWdEDrQ0dhFpgwU8YxGjMrs8zd/57eo0Ht", - "0VfAljO5hEwkTNcBo/MPf975fzcnS+7w4593+MvNHGsd8d9+btj91Iz5sRnzczPHsmWeA5Vfq86INmFu", - "iL2rErqkzcHyva5uKuuhuzjtV87+jjVTumJIvG1lu+b6pdKT6gpTgIYcLWoy+poLOopoh0c1T9YXqm6B", - "Ro0gd19kl/9p5tgdUASg4zY9kscjeexBHjtVfRceufYznTXZP8cAk7xCy/zHFDd3BT/oaZ/eZo5XaBsf", - "AB6cPaKgpmpUdWsaemgm6H4W6eGCTcgfKeGREm6ihH3KM0gOKUJm02+9dX+aYrxwVwW/k7pVW5rmUKPZ", - "cPVdI/5wwvyOpdWZ5ztL3KRj86HUG7naZ8wSqDhvPBOG1Xpcro/vYKT7BNu2sY7ZFOIFymRMXazE2DW3", - "MxCZ+wwbGiLseFvK+/J3q6k5DeDjj5dNlPksspgw0QBUldzLdUe2PUP4eEnUWslch9omptFqgUvIGMqk", - "UEJas6XyCrZE9G3ZplUhwcq89eX63wAAAP//Ugk4HhEhAAA=", + "H4sIAAAAAAAC/+xZX1PbuhL/KhrdO9MXkwTovQ95o/TPYU5b2sJ5OuVhY2+wii250jo0ZfLdz6xkJ04i", + "01CgMz3DU2Jrtf/3J+36RqamrIxGTU6Ob6RLcyzB/z1uF84QbJqfpaZCfl9ZU6ElhZ5quZ0faM4k0pFV", + "+lIuEol6pqzRZd+6hhJdBSlGVytrvmAa27lIpMWvtbKYyfHfHTYXSUtqJn7vIpGvrDX2E7rKaBexIEMC", + "VYR/LrWqImW0HMvzHAXyVlGic3CJMpH4DcqqYPbvlHNKX4pWDTFVWGROPHMEls5Vic8E6Ew8Q535J5lE", + "3MPsj02Gt0lPTYZiak0pzMShnaEV/KPSdYVOX5ztvd07OIjJIUUF7mihrkt26QSyT/i1RkcykbWGmnJj", + "1XfMZCKnxk5UlqGWiVSa0Goozrxm3tWdIHTitRWWcwspnlWgX/oAuP4IAZFVk5rCkyIsXdyYFaEwU0E5", + "ClcBa7nO7wrn8f1XOG83LlnF/DmDou7xp1/agUnMI80LsBbm/JzVFpj1+x5z2/WusUJpoUEbh6nRmQvR", + "KoHkmEP1/+crTThyl2hDmfoc7ckQnQlSJW64dMk2A8I9Joh5qgLLCFKBPsni7ANF0P3kZYyHRWdqm+LR", + "/dJAWCyAMBNk/LuW7W+bHu4Wr97iTl76U+mejVdKZxuRjnJ4D335wmD8Qw4tSPZoz8s/m3S3Yo17qxz1", + "Iw2L6UmqQjlibQJJssq/jdPkqWYfLTd3yqzfLjc3y5qMuXrXkz1eLMEVakaxrzXa+VIFx2lUqqJQqzza", + "ThsyBD2XHb8kdF1O0LJtJVCa8x3Hc09EClWFmQAS+6PRKMK9t/J+UHU/YzF5vo9mcmC/g82JDKS3g0ZD", + "82Co4fk9Pmx4MTvjhjWmgxq9y2199xK05Rut7WNTh14iUtzLMC7LoWvEti/uXOl384en7gPBEME4Cvq1", + "3UBsw7jdcaa/Vj9yjbV3/61MvVP++HLd2V+FKlUT2inUBcnx/miURMSU8E2VddkJuC8rBgiLVFtG44bG", + "8xglslS6eYwmwnqH+1+LUzmW/xmueuNh0xgPo10xszCWTm2Gds0Ar7yM2cD0wvCGTWe1HRgsd0Yaqp9I", + "3rsEY6O/XslaAci617b7buah9NSEIYEmCI289mktTyvUx7mxaMQ5Qum7nQ0blBNHH06EqzBVU5UG/Mtw", + "qjQ6bxBztZCSoBxIgK8Fxm7IoCK0oqwdCcXtcYmaPmsywsfcAqG4VpR7Lh1NTpv+evCZM6hQKTbHVaP0", + "UQVpjuJgwAdBbQs5ljlR5cbD4fX19QD88sDYy2Gz1w3fnhy/en/2au9gMBrkVBadblxuSYaJKhTNxXlj", + "yFFjyNGHE5nIGVoXfLM/GA1GzMlUqKFSciwPB6PBIXcyQLmv0yFUajjbh6LKYX8Yzp9hSAGuaeMiIPox", + "nK7hZPUTB3ZQZOrAgODjwegWtp23J5wNyPHCZPM29M30B6qqaOI4/OJYYjtv+lHRRYBpsZ6jZGsMraK/", + "Y3gXHIxGD6zB2jXGa7CRtMF17GaFmXB1mqJz07oo/M3u+QMqtD7UiuhyomdQqEzY1mEsf//Xyf+rOzPy", + "wg9/nfDXywnVIpH/+7VuD/MwEQZiIkzEmM7VZQlcfmt1xtgLl45RtimhCyaOlu9Nc6NYDP0FZ7dyDneh", + "qbENQuJdK9u3za+NPW+uGhVYKJHQstI3UrEohh2ZtDjZXnw2CzTpOHnzwLn4VyPH9ughkjqe6Ak8nsBj", + "B/DYqur74MhNmNYsWP9LjCDJGyQRPpP4iSqEEc669HXkeIPUGe0/OnokUU7NEOrOMPTYSLD5waMHC5Yu", + "f4KEJ0i4DRJ2Kc8oOOQIBeXfe+v+OMf0yl8VAiV3lVS77vCh23D1XSP+8JvlPUtrY1LvNfETieUn0KDk", + "fJdxSKTigvJCOdHy8bE+vIeS/uPquo6tzyaQXqHOxtzFakx9czsFVfgPrLFmf8vaWj+UvStO63kV4iZS", + "zoJOCjXhvPBMm5c3sU5IWCSrcAaFQJ1VRmlyK3RuMpGxe31vV2xsYyN/cbH4JwAA//9pX8pfviAAAA==", } // GetSwagger returns the content of the embedded swagger specification file diff --git a/observability-tracing-openobserve/internal/openobserve/client.go b/observability-tracing-openobserve/internal/openobserve/client.go index c6408a3..f029f8b 100644 --- a/observability-tracing-openobserve/internal/openobserve/client.go +++ b/observability-tracing-openobserve/internal/openobserve/client.go @@ -249,9 +249,19 @@ func (c *Client) GetTraces(ctx context.Context, params TracesQueryParams) (*Trac traces = append(traces, agg.entry) } + // Execute a separate count query to get the true total number of matching traces + countQueryJSON, err := generateTracesCountQuery(params, c.stream, c.logger) + if err != nil { + return nil, fmt.Errorf("failed to generate traces count query: %w", err) + } + countResp, err := c.executeSearchQuery(ctx, countQueryJSON) + if err != nil { + return nil, fmt.Errorf("failed to execute traces count query: %w", err) + } + return &TracesResult{ Traces: traces, - Total: len(traces), + Total: extractTotalCount(countResp), TookMs: openObserveResp.Took, }, nil } @@ -274,9 +284,19 @@ func (c *Client) GetSpans(ctx context.Context, params TracesQueryParams) (*Spans spans = append(spans, entry) } + // Execute a separate count query to get the true total number of matching spans + countQueryJSON, err := generateSpansCountQuery(params, c.stream, c.logger) + if err != nil { + return nil, fmt.Errorf("failed to generate spans count query: %w", err) + } + countResp, err := c.executeSearchQuery(ctx, countQueryJSON) + if err != nil { + return nil, fmt.Errorf("failed to execute spans count query: %w", err) + } + return &SpansResult{ Spans: spans, - Total: openObserveResp.Total, + Total: extractTotalCount(countResp), TookMs: openObserveResp.Took, }, nil } @@ -304,6 +324,23 @@ func (c *Client) GetSpanDetail(ctx context.Context, params TracesQueryParams) (* }, nil } +// extractTotalCount extracts the total count from a count query response. +// The response is expected to have hits[0].total as the count value. +func extractTotalCount(resp *OpenObserveResponse) int { + if len(resp.Hits) > 0 { + if total, ok := resp.Hits[0]["total"]; ok { + switch v := total.(type) { + case json.Number: + n, _ := v.Int64() + return int(n) + case float64: + return int(v) + } + } + } + return 0 +} + // parseSpanEntry converts a raw OpenObserve hit into a SpanEntry func parseSpanEntry(hit map[string]interface{}) SpanEntry { entry := SpanEntry{} diff --git a/observability-tracing-openobserve/internal/openobserve/client_test.go b/observability-tracing-openobserve/internal/openobserve/client_test.go index aeba60e..b02d79a 100644 --- a/observability-tracing-openobserve/internal/openobserve/client_test.go +++ b/observability-tracing-openobserve/internal/openobserve/client_test.go @@ -4,9 +4,11 @@ package openobserve import ( + "bytes" "context" "encoding/json" "fmt" + "io" "log/slog" "net/http" "net/http/httptest" @@ -44,6 +46,28 @@ func TestNewClient(t *testing.T) { } } +// isCountQuery checks if the request body contains a count query (size=0 and SELECT count). +// It reads the body and replaces it so subsequent reads still work. +func isCountQuery(r *http.Request) bool { + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + return false + } + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + var body map[string]interface{} + if err := json.Unmarshal(bodyBytes, &body); err != nil { + return false + } + query, ok := body["query"].(map[string]interface{}) + if !ok { + return false + } + size, _ := query["size"].(float64) + sql, _ := query["sql"].(string) + return size == 0 && strings.Contains(strings.ToLower(sql), "count") +} + func TestGetTraces(t *testing.T) { startNs := time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC).UnixNano() endNs := time.Date(2025, 1, 1, 12, 1, 0, 0, time.UTC).UnixNano() @@ -64,6 +88,21 @@ func TestGetTraces(t *testing.T) { t.Error("missing or incorrect basic auth") } + if isCountQuery(r) { + // Return count response for the count query + resp := OpenObserveResponse{ + Took: 1, + Total: 1, + Hits: []map[string]interface{}{ + {"total": json.Number("2")}, + }, + } + w.Header().Set("Content-Type", "application/json") + data, _ := json.Marshal(resp) + w.Write(data) + return + } + resp := OpenObserveResponse{ Took: 42, Total: 3, @@ -178,6 +217,19 @@ func TestGetTraces_ServerError(t *testing.T) { func TestGetTraces_EmptyResponse(t *testing.T) { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isCountQuery(r) { + resp := OpenObserveResponse{ + Took: 1, + Total: 1, + Hits: []map[string]interface{}{ + {"total": json.Number("0")}, + }, + } + w.Header().Set("Content-Type", "application/json") + data, _ := json.Marshal(resp) + w.Write(data) + return + } resp := OpenObserveResponse{ Took: 1, Total: 0, @@ -212,6 +264,20 @@ func TestGetSpans(t *testing.T) { endNs := time.Date(2025, 1, 1, 12, 0, 1, 0, time.UTC).UnixNano() server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if isCountQuery(r) { + resp := OpenObserveResponse{ + Took: 1, + Total: 1, + Hits: []map[string]interface{}{ + {"total": json.Number("2")}, + }, + } + w.Header().Set("Content-Type", "application/json") + data, _ := json.Marshal(resp) + w.Write(data) + return + } + resp := OpenObserveResponse{ Took: 10, Total: 2, diff --git a/observability-tracing-openobserve/internal/openobserve/queries.go b/observability-tracing-openobserve/internal/openobserve/queries.go index 2e19932..e4927f1 100644 --- a/observability-tracing-openobserve/internal/openobserve/queries.go +++ b/observability-tracing-openobserve/internal/openobserve/queries.go @@ -184,6 +184,79 @@ func generateSpanDetailQuery(params TracesQueryParams, stream string, logger *sl return json.Marshal(query) } +// generateTracesCountQuery generates a count query to get the true total number of matching traces. +func generateTracesCountQuery(params TracesQueryParams, stream string, logger *slog.Logger) ([]byte, error) { + safeStream, err := validateSQLIdentifier(stream) + if err != nil { + return nil, fmt.Errorf("invalid stream identifier: %w", err) + } + + sql := fmt.Sprintf( + "SELECT count(distinct trace_id) as total FROM %s", + safeStream, + ) + + conditions := buildFilterConditions(params) + if len(conditions) > 0 { + sql += " WHERE " + strings.Join(conditions, " AND ") + } + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "sql": sql, + "start_time": params.StartTime.UnixMicro(), + "end_time": params.EndTime.UnixMicro(), + "from": 0, + "size": 0, + }, + } + + if logger.Enabled(nil, slog.LevelDebug) { + if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil { + fmt.Printf("Generated count query for traces:\n") + fmt.Println(string(prettyJSON)) + } + } + + return json.Marshal(query) +} + +// generateSpansCountQuery generates a count query to get the true total number of matching spans for a trace. +func generateSpansCountQuery(params TracesQueryParams, stream string, logger *slog.Logger) ([]byte, error) { + safeStream, err := validateSQLIdentifier(stream) + if err != nil { + return nil, fmt.Errorf("invalid stream identifier: %w", err) + } + + conditions := []string{ + "trace_id = '" + escapeSQLString(params.TraceID) + "'", + } + + sql := fmt.Sprintf( + "SELECT count(*) as total FROM %s WHERE %s", + safeStream, strings.Join(conditions, " AND "), + ) + + query := map[string]interface{}{ + "query": map[string]interface{}{ + "sql": sql, + "start_time": params.StartTime.UnixMicro(), + "end_time": params.EndTime.UnixMicro(), + "from": 0, + "size": 0, + }, + } + + if logger.Enabled(nil, slog.LevelDebug) { + if prettyJSON, err := json.MarshalIndent(query, "", " "); err == nil { + fmt.Printf("Generated count query for spans (trace=%s):\n", params.TraceID) + fmt.Println(string(prettyJSON)) + } + } + + return json.Marshal(query) +} + // buildFilterConditions builds SQL WHERE conditions from the scope filter parameters. func buildFilterConditions(params TracesQueryParams) []string { var conditions []string