Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 129 additions & 43 deletions pkg/dotc1z/sql_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,101 @@ const bulkPutParallelThreshold = 100
const insertChunkSize = 200
const maxPageSize = 10000

// rowIterator provides streaming access to query results without materializing the entire result slice.
// Use this for memory-efficient processing of large result sets.
type rowIterator[T proto.Message] struct {
rows *sql.Rows
factory func() T
unmarshalOpts proto.UnmarshalOptions
err error
current T
count uint32
pageSize uint32
lastRow int
nextPageToken string
done bool
}

// Next advances the iterator to the next row.
// Returns true if a row is available, false when iteration is complete or an error occurred.
func (it *rowIterator[T]) Next() bool {
if it.done || it.err != nil {
return false
}

if !it.rows.Next() {
it.done = true
it.err = it.rows.Err()
return false
}

it.count++
if it.count > it.pageSize {
// We've hit the extra row that indicates more pages exist
it.nextPageToken = strconv.Itoa(it.lastRow + 1)
it.done = true
return false
}

var data sql.RawBytes
if err := it.rows.Scan(&it.lastRow, &data); err != nil {
it.err = err
it.done = true
return false
}

it.current = it.factory()
if err := it.unmarshalOpts.Unmarshal(data, it.current); err != nil {
it.err = err
it.done = true
return false
}

return true
}

// Value returns the current row. Only valid after a successful Next() call.
func (it *rowIterator[T]) Value() T {
return it.current
}

// Err returns any error encountered during iteration.
func (it *rowIterator[T]) Err() error {
return it.err
}

// NextPageToken returns the pagination token for the next page, if any.
// Only valid after iteration completes (Next returns false).
func (it *rowIterator[T]) NextPageToken() string {
return it.nextPageToken
}

// Close releases database resources. Must be called when done with iteration.
func (it *rowIterator[T]) Close() error {
return it.rows.Close()
}

// Collect materializes all remaining rows into a slice.
// Useful when you want to use streaming for some operations but need a slice for others.
func (it *rowIterator[T]) Collect() ([]T, error) {
// Pre-allocate with remaining capacity estimate
remaining := int(it.pageSize) - int(it.count)
if remaining < 0 {
remaining = 0
}
result := make([]T, 0, remaining)

for it.Next() {
result = append(result, it.Value())
}

if err := it.Err(); err != nil {
return nil, err
}

return result, nil
}

// Use worker pool to limit goroutines.
var numWorkers = min(max(runtime.GOMAXPROCS(0), 1), 4)

Expand Down Expand Up @@ -105,20 +200,21 @@ func (c *C1File) throttledWarnSlowQuery(ctx context.Context, query string, durat
}
}

// listConnectorObjects uses a connector list request to fetch the corresponding data from the local db.
// It returns a slice of typed proto messages constructed via the provided factory function.
func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) ([]T, string, error) {
ctx, span := tracer.Start(ctx, "C1File.listConnectorObjects")
// streamConnectorObjects returns an iterator for memory-efficient streaming of query results.
// The caller MUST call Close() on the iterator when done.
// Use this instead of listConnectorObjects when processing large result sets or when early termination is needed.
func streamConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) (*rowIterator[T], error) {
ctx, span := tracer.Start(ctx, "C1File.streamConnectorObjects")
defer span.End()

err := c.validateDb(ctx)
if err != nil {
return nil, "", err
return nil, err
}

annoSyncID, err := annotations.GetSyncIdFromAnnotations(req.GetAnnotations())
if err != nil {
return nil, "", fmt.Errorf("error getting sync id from annotations for list request: %w", err)
return nil, fmt.Errorf("error getting sync id from annotations for list request: %w", err)
}

var reqSyncID string
Expand Down Expand Up @@ -197,7 +293,7 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table
// Use cached sync run to avoid N+1 queries during pagination
latestSyncRun, err := c.getCachedViewSyncRun(ctx)
if err != nil {
return nil, "", err
return nil, err
}

if latestSyncRun != nil {
Expand All @@ -223,7 +319,7 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table

query, args, err := q.ToSQL()
if err != nil {
return nil, "", err
return nil, err
}

// Start timing the query execution
Expand All @@ -232,51 +328,41 @@ func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, table
// Execute the query
rows, err := c.db.QueryContext(ctx, query, args...)
if err != nil {
return nil, "", err
return nil, err
}
defer rows.Close()

// Calculate the query duration
// Calculate the query duration and log slow queries
queryDuration := time.Since(queryStartTime)

// If the query took longer than the threshold, log a warning (rate-limited)
if queryDuration > c.slowQueryThreshold {
c.throttledWarnSlowQuery(ctx, query, queryDuration)
}

var unmarshalerOptions = proto.UnmarshalOptions{
Merge: true,
DiscardUnknown: true,
}
var count uint32 = 0
lastRow := 0
var data sql.RawBytes
var ret []T
for rows.Next() {
count++
if count > pageSize {
break
}
err := rows.Scan(&lastRow, &data)
if err != nil {
return nil, "", err
}
t := factory()
err = unmarshalerOptions.Unmarshal(data, t)
if err != nil {
return nil, "", err
}
ret = append(ret, t)
}
if rows.Err() != nil {
return nil, "", rows.Err()
return &rowIterator[T]{
rows: rows,
factory: factory,
unmarshalOpts: proto.UnmarshalOptions{
Merge: true,
DiscardUnknown: true,
},
pageSize: pageSize,
}, nil
}

// listConnectorObjects uses a connector list request to fetch the corresponding data from the local db.
// It returns a slice of typed proto messages constructed via the provided factory function.
func listConnectorObjects[T proto.Message](ctx context.Context, c *C1File, tableName string, req listRequest, factory func() T) ([]T, string, error) {
iter, err := streamConnectorObjects(ctx, c, tableName, req, factory)
if err != nil {
return nil, "", err
}
defer iter.Close()

nextPageToken := ""
if count > pageSize {
nextPageToken = strconv.Itoa(lastRow + 1)
ret, err := iter.Collect()
if err != nil {
return nil, "", err
}
return ret, nextPageToken, nil

return ret, iter.NextPageToken(), nil
}

var protoMarshaler = proto.MarshalOptions{Deterministic: false}
Expand Down
2 changes: 1 addition & 1 deletion pkg/dotc1z/sync_runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (c *C1File) ListSyncRuns(ctx context.Context, pageToken string, pageSize ui
q = q.Order(goqu.C("id").Asc())
q = q.Limit(uint(pageSize + 1))

var ret []*syncRun
ret := make([]*syncRun, 0, pageSize)

query, args, err := q.ToSQL()
if err != nil {
Expand Down
Loading