Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.4.0] - 2026-03-06

### Added

- `datasets`: Added `WithCollections(...)` and `WithCollectionIDs(...)` query options for datapoint queries.

### Changed

- `datasets`: Updated `client.Datapoints.GetInto`, `client.Datapoints.Query`, and `client.Datapoints.QueryInto` to take a `datasetID` as the primary identifier, with collection filtering now configured via query options.

## [0.3.2] - 2026-02-26

### Added
Expand Down Expand Up @@ -67,7 +77,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Added support for Tilebox Observability, including logging and tracing helpers.
- Added examples for using the library.

[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...HEAD
[Unreleased]: https://github.com/tilebox/tilebox-go/compare/v0.4.0...HEAD
[0.4.0]: https://github.com/tilebox/tilebox-go/compare/v0.3.2...v0.4.0
[0.3.2]: https://github.com/tilebox/tilebox-go/compare/v0.3.1...v0.3.2
[0.3.1]: https://github.com/tilebox/tilebox-go/compare/v0.3.0...v0.3.1
[0.3.0]: https://github.com/tilebox/tilebox-go/compare/v0.2.0...v0.3.0
Expand Down
43 changes: 30 additions & 13 deletions datasets/v1/datapoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/google/uuid"
"github.com/paulmach/orb"
"github.com/samber/lo"
datasetsv1 "github.com/tilebox/tilebox-go/protogen/datasets/v1"
tileboxv1 "github.com/tilebox/tilebox-go/protogen/tilebox/v1"
"github.com/tilebox/tilebox-go/query"
Expand All @@ -24,12 +25,13 @@ type DatapointClient interface {
// Example usage:
//
// var datapoint v1.Sentinel1Sar
// err = client.Datapoints.GetInto(ctx, collectionIDs, datapointID, &datapoint)
GetInto(ctx context.Context, collectionIDs []uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error
// err = client.Datapoints.GetInto(ctx, datasetID, datapointID, &datapoint)
GetInto(ctx context.Context, datasetID uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error

// Query datapoints from one or more collections of the same dataset.
//
// Options:
// - WithCollections / WithCollectionIDs: specifies the collections to query. If no collections are specified, all collections of the dataset will be queried. (Optional)
// - WithTemporalExtent: specifies the time or data point interval for which data should be loaded. (Required)
// - WithSpatialExtent: specifies the spatial extent for which data should be loaded. (Optional)
// - WithSkipData: can be used to skip the actual data when loading datapoints, only returning required datapoint fields. (Optional)
Expand All @@ -39,7 +41,7 @@ type DatapointClient interface {
//
// Example usage:
//
// for datapointBytes, err := range client.Datapoints.Query(ctx, collectionIDs, WithTemporalExtent(timeInterval), WithSpatialExtent(geometry)) {
// for datapointBytes, err := range client.Datapoints.Query(ctx, datasetID, WithCollectionIDs(collectionID), WithTemporalExtent(timeInterval), WithSpatialExtent(geometry)) {
// if err != nil {
// // handle error
// }
Expand All @@ -51,8 +53,8 @@ type DatapointClient interface {
// // do something with the datapoint
// }
//
// Documentation: https://docs.tilebox.com/datasets/query
Query(ctx context.Context, collectionIDs []uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error]
// Documentation: https://docs.tilebox.com/datasets/query/querying-data
Query(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error]

// QueryInto queries datapoints from one or more collections of the same dataset into a slice of datapoints of a
// compatible proto.Message type.
Expand All @@ -62,8 +64,8 @@ type DatapointClient interface {
// Example usage:
//
// var datapoints []*v1.Sentinel1Sar
// err := client.Datapoints.QueryInto(ctx, collectionIDs, &datapoints, WithTemporalExtent(timeInterval))
QueryInto(ctx context.Context, collectionIDs []uuid.UUID, datapoints any, options ...QueryOption) error
// err := client.Datapoints.QueryInto(ctx, datasetID, &datapoints, WithTemporalExtent(timeInterval))
QueryInto(ctx context.Context, datasetID uuid.UUID, datapoints any, options ...QueryOption) error

// Ingest datapoints into a collection.
//
Expand Down Expand Up @@ -100,6 +102,7 @@ type queryOptions struct {
temporalExtent query.TemporalExtent
spatialExtent query.SpatialExtent
skipData bool
collectionIDs []uuid.UUID
}

// QueryOption is an interface for configuring a Query request.
Expand Down Expand Up @@ -128,6 +131,20 @@ func WithSpatialExtentFilter(spatialExtent query.SpatialExtent) QueryOption {
}
}

// WithCollections specifies the collections to query. If no collections are specified, all collections of the dataset will be queried.
func WithCollections(collections ...*Collection) QueryOption {
return WithCollectionIDs(lo.Map(collections, func(c *Collection, _ int) uuid.UUID {
return c.ID
})...)
}

// WithCollectionIDs specifies the collection IDs to query. If no collections are specified, all collections of the dataset will be queried.
func WithCollectionIDs(collectionIDs ...uuid.UUID) QueryOption {
return func(cfg *queryOptions) {
cfg.collectionIDs = append(cfg.collectionIDs, collectionIDs...)
}
}

// WithSkipData skips the data when querying datapoints.
// It is an optional flag for omitting the actual datapoint data from the response.
// If set, only the required datapoint fields will be returned.
Expand All @@ -139,15 +156,15 @@ func WithSkipData() QueryOption {
}
}

func (d datapointClient) GetInto(ctx context.Context, collectionIDs []uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error {
func (d datapointClient) GetInto(ctx context.Context, datasetID uuid.UUID, datapointID uuid.UUID, datapoint proto.Message, options ...QueryOption) error {
cfg := &queryOptions{
skipData: false,
}
for _, option := range options {
option(cfg)
}

rawDatapoint, err := d.dataAccessService.QueryByID(ctx, collectionIDs, datapointID, cfg.skipData)
rawDatapoint, err := d.dataAccessService.QueryByID(ctx, datasetID, cfg.collectionIDs, datapointID, cfg.skipData)
if err != nil {
return err
}
Expand All @@ -160,7 +177,7 @@ func (d datapointClient) GetInto(ctx context.Context, collectionIDs []uuid.UUID,
return nil
}

func (d datapointClient) Query(ctx context.Context, collectionIDs []uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error] {
func (d datapointClient) Query(ctx context.Context, datasetID uuid.UUID, options ...QueryOption) iter.Seq2[[]byte, error] {
cfg := &queryOptions{
skipData: false,
}
Expand Down Expand Up @@ -202,7 +219,7 @@ func (d datapointClient) Query(ctx context.Context, collectionIDs []uuid.UUID, o
}

for {
datapointsMessage, err := d.dataAccessService.Query(ctx, collectionIDs, filters, page, cfg.skipData)
datapointsMessage, err := d.dataAccessService.Query(ctx, datasetID, cfg.collectionIDs, filters, page, cfg.skipData)
if err != nil {
yield(nil, err)
return
Expand All @@ -222,7 +239,7 @@ func (d datapointClient) Query(ctx context.Context, collectionIDs []uuid.UUID, o
}
}

func (d datapointClient) QueryInto(ctx context.Context, collectionIDs []uuid.UUID, datapoints any, options ...QueryOption) error {
func (d datapointClient) QueryInto(ctx context.Context, datasetID uuid.UUID, datapoints any, options ...QueryOption) error {
err := validateDatapoints(datapoints)
if err != nil {
return err // already a nice validation error
Expand All @@ -231,7 +248,7 @@ func (d datapointClient) QueryInto(ctx context.Context, collectionIDs []uuid.UUI
slice := reflect.Indirect(reflect.ValueOf(datapoints))
datapointType := slice.Type().Elem().Elem()

rawDatapoints, err := Collect(d.Query(ctx, collectionIDs, options...))
rawDatapoints, err := Collect(d.Query(ctx, datasetID, options...))
if err != nil {
return err
}
Expand Down
82 changes: 69 additions & 13 deletions datasets/v1/datapoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type mockDataAccessService struct {
n int
}

func (m mockDataAccessService) Query(_ context.Context, _ []uuid.UUID, _ *datasetsv1.QueryFilters, _ *tileboxv1.Pagination, _ bool) (*datasetsv1.QueryResultPage, error) {
func (m mockDataAccessService) Query(_ context.Context, _ uuid.UUID, _ []uuid.UUID, _ *datasetsv1.QueryFilters, _ *tileboxv1.Pagination, _ bool) (*datasetsv1.QueryResultPage, error) {
data := make([][]byte, m.n)
for i := range m.n {
datapoint := examplesv1.Sentinel2Msi_builder{
Expand Down Expand Up @@ -68,6 +68,8 @@ func (m mockDataAccessService) Query(_ context.Context, _ []uuid.UUID, _ *datase

func Test_QueryOptions(t *testing.T) {
now := time.Now()
collectionID1 := uuid.New()
collectionID2 := uuid.New()
colorado := orb.Polygon{
{{-109.05, 37.09}, {-102.06, 37.09}, {-102.06, 41.59}, {-109.05, 41.59}, {-109.05, 37.09}},
}
Expand All @@ -92,6 +94,27 @@ func Test_QueryOptions(t *testing.T) {
),
},
},
{
name: "with collection ids",
options: []QueryOption{
WithCollectionIDs(collectionID1, collectionID2),
},
want: queryOptions{
collectionIDs: []uuid.UUID{collectionID1, collectionID2},
},
},
{
name: "with collections",
options: []QueryOption{
WithCollections(
&Collection{ID: collectionID1},
&Collection{ID: collectionID2},
),
},
want: queryOptions{
collectionIDs: []uuid.UUID{collectionID1, collectionID2},
},
},
{
name: "with spatial extent",
options: []QueryOption{
Expand Down Expand Up @@ -145,7 +168,7 @@ func Test_datapointClient_GetInto(t *testing.T) {

t.Run("GetInto", func(t *testing.T) {
var datapoint examplesv1.Sentinel2Msi
err := client.Datapoints.GetInto(ctx, []uuid.UUID{collection.ID}, datapointID, &datapoint)
err := client.Datapoints.GetInto(ctx, dataset.ID, datapointID, &datapoint, WithCollectionIDs(collection.ID))
require.NoError(t, err)

assert.Equal(t, "01941f29-c650-202f-6495-c71dd2118fb1", uuid.Must(uuid.FromBytes(datapoint.GetId().GetUuid())).String())
Expand All @@ -155,7 +178,7 @@ func Test_datapointClient_GetInto(t *testing.T) {

t.Run("GetInto WithSkipData", func(t *testing.T) {
var datapoint examplesv1.Sentinel2Msi
err := client.Datapoints.GetInto(ctx, []uuid.UUID{collection.ID}, datapointID, &datapoint, WithSkipData())
err := client.Datapoints.GetInto(ctx, dataset.ID, datapointID, &datapoint, WithCollectionIDs(collection.ID), WithSkipData())
require.NoError(t, err)

assert.Equal(t, "01941f29-c650-202f-6495-c71dd2118fb1", uuid.Must(uuid.FromBytes(datapoint.GetId().GetUuid())).String())
Expand All @@ -167,6 +190,7 @@ func Test_datapointClient_QueryInto(t *testing.T) {
ctx := context.Background()
client := NewDatapointClient(10)

datasetID := uuid.New()
collectionID := uuid.New()
timeInterval := query.NewTimeInterval(time.Now(), time.Now())

Expand Down Expand Up @@ -220,7 +244,7 @@ func Test_datapointClient_QueryInto(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := client.QueryInto(ctx, []uuid.UUID{tt.args.collectionID}, tt.args.datapoints, WithTemporalExtent(tt.args.interval))
err := client.QueryInto(ctx, datasetID, tt.args.datapoints, WithCollectionIDs(tt.args.collectionID), WithTemporalExtent(tt.args.interval))
if tt.wantErr != "" {
// we wanted an error, let's check if we got one
require.Error(t, err, "expected an error, got none")
Expand All @@ -246,22 +270,53 @@ func Benchmark_QueryInto(b *testing.B) {
ctx := context.Background()
client := NewDatapointClient(1000)

datasetID := uuid.New()
collectionID := uuid.New()
timeInterval := query.NewTimeInterval(time.Now(), time.Now())

var datapoints []*examplesv1.Sentinel2Msi
b.Run("CollectAs", func(b *testing.B) {
for range b.N {
err := client.QueryInto(ctx, []uuid.UUID{collectionID}, &datapoints, WithTemporalExtent(timeInterval))
err := client.QueryInto(ctx, datasetID, &datapoints, WithCollectionIDs(collectionID), WithTemporalExtent(timeInterval))
require.NoError(b, err)
}
})
resultQueryInto = datapoints
}

func Test_datapointClient_Query(t *testing.T) {
func Test_datapointClient_QueryDataset(t *testing.T) {
ctx := context.Background()
client := NewReplayClient(t, "query")

dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
require.NoError(t, err)

jan2025 := time.Date(2025, time.January, 1, 0, 0, 0, 0, time.UTC)
timeInterval := query.NewTimeInterval(jan2025, jan2025.Add(1*time.Hour))

t.Run("CollectAs", func(t *testing.T) {
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, dataset.ID, WithTemporalExtent(timeInterval)))
require.NoError(t, err)

assert.Len(t, datapoints, 1459)
assert.Equal(t, "01941f29-c650-202f-6495-c71dd2118fb1", uuid.Must(uuid.FromBytes(datapoints[0].GetId().GetUuid())).String())
assert.Equal(t, "2025-01-01 00:00:19.024 +0000 UTC", datapoints[0].GetTime().AsTime().String())
assert.Equal(t, "S2B_MSIL1C_20250101T000019_N0511_R073_T57QWV_20250101T010340.SAFE", datapoints[0].GetGranuleName())
})

t.Run("CollectAs WithSkipData", func(t *testing.T) {
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, dataset.ID, WithTemporalExtent(timeInterval), WithSkipData()))
require.NoError(t, err)

assert.Len(t, datapoints, 1459)
assert.Equal(t, "01941f29-c650-202f-6495-c71dd2118fb1", uuid.Must(uuid.FromBytes(datapoints[0].GetId().GetUuid())).String())
assert.Empty(t, datapoints[0].GetGranuleName())
})
}

func Test_datapointClient_QueryCollections(t *testing.T) {
ctx := context.Background()
client := NewReplayClient(t, "load")
client := NewReplayClient(t, "query_collections")

dataset, err := client.Datasets.Get(ctx, "open_data.copernicus.sentinel2_msi")
require.NoError(t, err)
Expand All @@ -274,7 +329,7 @@ func Test_datapointClient_Query(t *testing.T) {
timeInterval := query.NewTimeInterval(jan2025, jan2025.Add(1*time.Hour))

t.Run("CollectAs", func(t *testing.T) {
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, []uuid.UUID{collection.ID}, WithTemporalExtent(timeInterval)))
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, dataset.ID, WithCollectionIDs(collection.ID), WithTemporalExtent(timeInterval)))
require.NoError(t, err)

assert.Len(t, datapoints, 437)
Expand All @@ -284,7 +339,7 @@ func Test_datapointClient_Query(t *testing.T) {
})

t.Run("CollectAs WithSkipData", func(t *testing.T) {
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, []uuid.UUID{collection.ID}, WithTemporalExtent(timeInterval), WithSkipData()))
datapoints, err := CollectAs[*examplesv1.Sentinel2Msi](client.Datapoints.Query(ctx, dataset.ID, WithCollectionIDs(collection.ID), WithTemporalExtent(timeInterval), WithSkipData()))
require.NoError(t, err)

assert.Len(t, datapoints, 437)
Expand Down Expand Up @@ -323,7 +378,7 @@ func NewMockDatapointClient(tb testing.TB, n int) DatapointClient {
}
}

func (s *mockService) Query(_ context.Context, _ []uuid.UUID, _ ...QueryOption) iter.Seq2[[]byte, error] {
func (s *mockService) Query(_ context.Context, _ uuid.UUID, _ ...QueryOption) iter.Seq2[[]byte, error] {
return func(yield func([]byte, error) bool) {
for _, data := range s.data {
if !yield(data, nil) {
Expand All @@ -340,6 +395,7 @@ var result []*examplesv1.Sentinel2Msi
// It is used to benchmark the cost of reflection and proto.Marshal inside CollectAs
func BenchmarkCollectAs(b *testing.B) {
ctx := context.Background()
datasetID := uuid.New() // dummy dataset ID
collectionID := uuid.New() // dummy collection ID
queryInterval := query.NewEmptyTimeInterval() // dummy time interval

Expand All @@ -349,15 +405,15 @@ func BenchmarkCollectAs(b *testing.B) {
var r []*examplesv1.Sentinel2Msi // used to avoid the compiler optimizing the output
b.Run("CollectAs", func(b *testing.B) {
for range b.N {
data := client.Datapoints.Query(ctx, []uuid.UUID{collectionID}, WithTemporalExtent(queryInterval))
data := client.Datapoints.Query(ctx, datasetID, WithCollectionIDs(collectionID), WithTemporalExtent(queryInterval))
r, _ = CollectAs[*examplesv1.Sentinel2Msi](data)
}
})
result = r

b.Run("Marshal and no reflection", func(b *testing.B) {
for range b.N {
data := client.Datapoints.Query(ctx, []uuid.UUID{collectionID}, WithTemporalExtent(queryInterval))
data := client.Datapoints.Query(ctx, datasetID, WithCollectionIDs(collectionID), WithTemporalExtent(queryInterval))
datapoints := make([]*examplesv1.Sentinel2Msi, 0, 1000)
for datapoint, err := range data {
if err != nil {
Expand All @@ -376,7 +432,7 @@ func BenchmarkCollectAs(b *testing.B) {

b.Run("No marshal and no reflection", func(b *testing.B) {
for range b.N {
data := client.Datapoints.Query(ctx, []uuid.UUID{collectionID}, WithTemporalExtent(queryInterval))
data := client.Datapoints.Query(ctx, datasetID, WithCollectionIDs(collectionID), WithTemporalExtent(queryInterval))
datapoints := make([][]byte, 0, 1000)
for datapoint, err := range data {
if err != nil {
Expand Down
Loading