diff --git a/pkg/dotc1z/sql_helpers.go b/pkg/dotc1z/sql_helpers.go index 9c3efa344..9ff762579 100644 --- a/pkg/dotc1z/sql_helpers.go +++ b/pkg/dotc1z/sql_helpers.go @@ -26,6 +26,101 @@ const bulkPutParallelThreshold = 100 const insertChunkSize = 200 const maxPageSize = 10000 +// rowIterator provides streaming access to query results without materializing the entire result slice. +// Use this for memory-efficient processing of large result sets. +type rowIterator[T proto.Message] struct { + rows *sql.Rows + factory func() T + unmarshalOpts proto.UnmarshalOptions + err error + current T + count uint32 + pageSize uint32 + lastRow int + nextPageToken string + done bool +} + +// Next advances the iterator to the next row. +// Returns true if a row is available, false when iteration is complete or an error occurred. +func (it *rowIterator[T]) Next() bool { + if it.done || it.err != nil { + return false + } + + if !it.rows.Next() { + it.done = true + it.err = it.rows.Err() + return false + } + + it.count++ + if it.count > it.pageSize { + // We've hit the extra row that indicates more pages exist + it.nextPageToken = strconv.Itoa(it.lastRow + 1) + it.done = true + return false + } + + var data sql.RawBytes + if err := it.rows.Scan(&it.lastRow, &data); err != nil { + it.err = err + it.done = true + return false + } + + it.current = it.factory() + if err := it.unmarshalOpts.Unmarshal(data, it.current); err != nil { + it.err = err + it.done = true + return false + } + + return true +} + +// Value returns the current row. Only valid after a successful Next() call. +func (it *rowIterator[T]) Value() T { + return it.current +} + +// Err returns any error encountered during iteration. +func (it *rowIterator[T]) Err() error { + return it.err +} + +// NextPageToken returns the pagination token for the next page, if any. +// Only valid after iteration completes (Next returns false). +func (it *rowIterator[T]) NextPageToken() string { + return it.nextPageToken +} + +// Close releases database resources. Must be called when done with iteration. +func (it *rowIterator[T]) Close() error { + return it.rows.Close() +} + +// Collect materializes all remaining rows into a slice. +// Useful when you want to use streaming for some operations but need a slice for others. +func (it *rowIterator[T]) Collect() ([]T, error) { + // Pre-allocate with remaining capacity estimate + remaining := int(it.pageSize) - int(it.count) + if remaining < 0 { + remaining = 0 + } + result := make([]T, 0, remaining) + + for it.Next() { + result = append(result, it.Value()) + } + + if err := it.Err(); err != nil { + return nil, err + } + + return result, nil +} + // Use worker pool to limit goroutines. var numWorkers = min(max(runtime.GOMAXPROCS(0), 1), 4) @@ -105,20 +200,21 @@ func (c *C1File) throttledWarnSlowQuery(ctx context.Context, query string, durat } } -// listConnectorObjects uses a connector list request to fetch the corresponding data from the local db. -// It returns a slice of typed proto messages constructed via the provided factory function. -func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) ([]T, string, error) { - ctx, span := tracer.Start(ctx, "C1File.listConnectorObjects") +// streamConnectorObjects returns an iterator for memory-efficient streaming of query results. +// The caller MUST call Close() on the iterator when done. +// Use this instead of listConnectorObjects when processing large result sets or when early termination is needed. +func streamConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) (*rowIterator[T], error) { + ctx, span := tracer.Start(ctx, "C1File.streamConnectorObjects") defer span.End() err := c.validateDb(ctx) if err != nil { - return nil, "", err + return nil, err } annoSyncID, err := annotations.GetSyncIdFromAnnotations(req.GetAnnotations()) if err != nil { - return nil, "", fmt.Errorf("error getting sync id from annotations for list request: %w", err) + return nil, fmt.Errorf("error getting sync id from annotations for list request: %w", err) } var reqSyncID string @@ -197,7 +293,7 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table // Use cached sync run to avoid N+1 queries during pagination latestSyncRun, err := c.getCachedViewSyncRun(ctx) if err != nil { - return nil, "", err + return nil, err } if latestSyncRun != nil { @@ -223,7 +319,7 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table query, args, err := q.ToSQL() if err != nil { - return nil, "", err + return nil, err } // Start timing the query execution @@ -232,51 +328,41 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table // Execute the query rows, err := c.db.QueryContext(ctx, query, args...) if err != nil { - return nil, "", err + return nil, err } - defer rows.Close() - // Calculate the query duration + // Calculate the query duration and log slow queries queryDuration := time.Since(queryStartTime) - - // If the query took longer than the threshold, log a warning (rate-limited) if queryDuration > c.slowQueryThreshold { c.throttledWarnSlowQuery(ctx, query, queryDuration) } - var unmarshalerOptions = proto.UnmarshalOptions{ - Merge: true, - DiscardUnknown: true, - } - var count uint32 = 0 - lastRow := 0 - var data sql.RawBytes - var ret []T - for rows.Next() { - count++ - if count > pageSize { - break - } - err := rows.Scan(&lastRow, &data) - if err != nil { - return nil, "", err - } - t := factory() - err = unmarshalerOptions.Unmarshal(data, t) - if err != nil { - return nil, "", err - } - ret = append(ret, t) - } - if rows.Err() != nil { - return nil, "", rows.Err() + return &rowIterator[T]{ + rows: rows, + factory: factory, + unmarshalOpts: proto.UnmarshalOptions{ + Merge: true, + DiscardUnknown: true, + }, + pageSize: pageSize, + }, nil +} + +// listConnectorObjects uses a connector list request to fetch the corresponding data from the local db. +// It returns a slice of typed proto messages constructed via the provided factory function. +func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) ([]T, string, error) { + iter, err := streamConnectorObjects(ctx, c, tableName, req, factory) + if err != nil { + return nil, "", err } + defer iter.Close() - nextPageToken := "" - if count > pageSize { - nextPageToken = strconv.Itoa(lastRow + 1) + ret, err := iter.Collect() + if err != nil { + return nil, "", err } - return ret, nextPageToken, nil + + return ret, iter.NextPageToken(), nil } var protoMarshaler = proto.MarshalOptions{Deterministic: false} diff --git a/pkg/dotc1z/sync_runs.go b/pkg/dotc1z/sync_runs.go index e2b4ee781..65d7ca6e7 100644 --- a/pkg/dotc1z/sync_runs.go +++ b/pkg/dotc1z/sync_runs.go @@ -240,7 +240,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui q = q.Order(goqu.C("id").Asc()) q = q.Limit(uint(pageSize + 1)) - var ret []*syncRun + ret := make([]*syncRun, 0, pageSize) query, args, err := q.ToSQL() if err != nil {