From 618b23b67f263068e60155877196bf72bf014818 Mon Sep 17 00:00:00 2001 From: Marcus Goldschmidt Date: Fri, 30 May 2025 19:06:38 -0400 Subject: [PATCH 1/2] add integration test helper for connectors --- pkg/test/integration_wrapper.go | 182 ++++++++++ pkg/test/integration_wrapper_test.go | 86 +++++ .../grpc/test/bufconn/bufconn.go | 318 ++++++++++++++++++ vendor/modules.txt | 1 + 4 files changed, 587 insertions(+) create mode 100644 pkg/test/integration_wrapper.go create mode 100644 pkg/test/integration_wrapper_test.go create mode 100644 vendor/google.golang.org/grpc/test/bufconn/bufconn.go diff --git a/pkg/test/integration_wrapper.go b/pkg/test/integration_wrapper.go new file mode 100644 index 000000000..e49cd97ce --- /dev/null +++ b/pkg/test/integration_wrapper.go @@ -0,0 +1,182 @@ +package test + +import ( + "context" + "log" + "net" + "os" + "testing" + + connectorV2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/connectorbuilder" + "github.com/conductorone/baton-sdk/pkg/dotc1z" + "github.com/conductorone/baton-sdk/pkg/dotc1z/manager" + "github.com/conductorone/baton-sdk/pkg/sync" + "github.com/conductorone/baton-sdk/pkg/types" + "github.com/conductorone/baton-sdk/pkg/ugrpc" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "go.opentelemetry.io/otel/propagation" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/test/bufconn" +) + +const bufSize = 1024 + +type inMemoryConnectorClient struct { + connectorV2.ResourceTypesServiceClient + connectorV2.ResourcesServiceClient + connectorV2.ResourceGetterServiceClient + connectorV2.EntitlementsServiceClient + connectorV2.GrantsServiceClient + connectorV2.ConnectorServiceClient + connectorV2.AssetServiceClient + connectorV2.GrantManagerServiceClient + connectorV2.ResourceManagerServiceClient + connectorV2.ResourceDeleterServiceClient + connectorV2.AccountManagerServiceClient + connectorV2.CredentialManagerServiceClient + connectorV2.EventServiceClient + connectorV2.TicketsServiceClient + connectorV2.ActionServiceClient +} + +type IntegrationTestWrapper struct { + Client types.ConnectorClient + // Does not expose the underlying syncer directly, but provides a method to perform synchronization. + // sync.Syncer handle c1z file internally, will override the previous one when call Sync then Close + syncer sync.Syncer + c1zPath string + manager manager.Manager +} + +func NewIntegrationTestWrapper(ctx context.Context, t *testing.T, connector interface{}) *IntegrationTestWrapper { + srv, err := connectorbuilder.NewConnector(ctx, connector) + require.NoError(t, err) + + tempPath, err := os.CreateTemp("", "baton-integration-test-*.c1z") + require.NoError(t, err) + + t.Cleanup(func() { + err := os.Remove(tempPath.Name()) + require.NoError(t, err) + }) + + lis := bufconn.Listen(bufSize) + s := grpc.NewServer( + grpc.Creds(insecure.NewCredentials()), + grpc.ChainUnaryInterceptor(ugrpc.UnaryServerInterceptor(ctx)...), + grpc.ChainStreamInterceptor(ugrpc.StreamServerInterceptors(ctx)...), + grpc.StatsHandler( + otelgrpc.NewServerHandler( + otelgrpc.WithPropagators( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ), + ), + ), + ) + + connectorV2.RegisterConnectorServiceServer(s, srv) + connectorV2.RegisterGrantsServiceServer(s, srv) + connectorV2.RegisterEntitlementsServiceServer(s, srv) + connectorV2.RegisterResourcesServiceServer(s, srv) + connectorV2.RegisterResourceTypesServiceServer(s, srv) + connectorV2.RegisterAssetServiceServer(s, srv) + connectorV2.RegisterEventServiceServer(s, srv) + connectorV2.RegisterResourceGetterServiceServer(s, srv) + connectorV2.RegisterGrantManagerServiceServer(s, srv) + connectorV2.RegisterResourceManagerServiceServer(s, srv) + connectorV2.RegisterResourceDeleterServiceServer(s, srv) + connectorV2.RegisterAccountManagerServiceServer(s, srv) + connectorV2.RegisterCredentialManagerServiceServer(s, srv) + + go func() { + if err := s.Serve(lis); err != nil { + log.Fatalf("Server exited with error: %v", err) + } + }() + + t.Cleanup(func() { + s.Stop() + }) + + bufDialer := func(ctx context.Context, s string) (net.Conn, error) { + return lis.DialContext(ctx) + } + + cc, err := grpc.NewClient( + "passthrough://bufnet", + grpc.WithContextDialer(bufDialer), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + require.NoError(t, err) + + client := &inMemoryConnectorClient{ + ResourceTypesServiceClient: connectorV2.NewResourceTypesServiceClient(cc), + ResourcesServiceClient: connectorV2.NewResourcesServiceClient(cc), + EntitlementsServiceClient: connectorV2.NewEntitlementsServiceClient(cc), + GrantsServiceClient: connectorV2.NewGrantsServiceClient(cc), + ConnectorServiceClient: connectorV2.NewConnectorServiceClient(cc), + AssetServiceClient: connectorV2.NewAssetServiceClient(cc), + GrantManagerServiceClient: connectorV2.NewGrantManagerServiceClient(cc), + ResourceManagerServiceClient: connectorV2.NewResourceManagerServiceClient(cc), + ResourceDeleterServiceClient: connectorV2.NewResourceDeleterServiceClient(cc), + AccountManagerServiceClient: connectorV2.NewAccountManagerServiceClient(cc), + CredentialManagerServiceClient: connectorV2.NewCredentialManagerServiceClient(cc), + EventServiceClient: connectorV2.NewEventServiceClient(cc), + TicketsServiceClient: connectorV2.NewTicketsServiceClient(cc), + ActionServiceClient: connectorV2.NewActionServiceClient(cc), + ResourceGetterServiceClient: connectorV2.NewResourceGetterServiceClient(cc), + } + + syncer, err := sync.NewSyncer( + ctx, + client, + sync.WithC1ZPath(tempPath.Name()), + sync.WithProgressHandler(func(s *sync.Progress) { + t.Logf("Progress: %v", s) + }), + ) + require.NoError(t, err) + + m, err := manager.New(ctx, tempPath.Name()) + require.NoError(t, err) + + return &IntegrationTestWrapper{ + Client: client, + syncer: syncer, + manager: m, + c1zPath: tempPath.Name(), + } +} + +func (w *IntegrationTestWrapper) Manager() manager.Manager { + return w.manager +} + +func (w *IntegrationTestWrapper) LoadC1Z(ctx context.Context, t *testing.T) *dotc1z.C1File { + c1z, err := w.manager.LoadC1Z(ctx) + require.NoError(t, err) + require.NotNil(t, c1z) + + return c1z +} + +// Sync performs a synchronization operation using the provided syncer. +func (w *IntegrationTestWrapper) Sync(ctx context.Context) error { + err := w.syncer.Sync(ctx) + if err != nil { + return err + } + + err = w.syncer.Close(ctx) + if err != nil { + return err + } + + return nil +} diff --git a/pkg/test/integration_wrapper_test.go b/pkg/test/integration_wrapper_test.go new file mode 100644 index 000000000..0983e42e3 --- /dev/null +++ b/pkg/test/integration_wrapper_test.go @@ -0,0 +1,86 @@ +package test + +import ( + "context" + "testing" + + v2 "github.com/conductorone/baton-sdk/pb/c1/connector/v2" + "github.com/conductorone/baton-sdk/pkg/annotations" + "github.com/conductorone/baton-sdk/pkg/connectorbuilder" + "github.com/conductorone/baton-sdk/pkg/pagination" + "github.com/stretchr/testify/require" +) + +var mockResourceType = &v2.ResourceType{ + Id: "mock_resource_type", + Description: "mock resource type", + DisplayName: "mock resource type", + Traits: []v2.ResourceType_Trait{ + v2.ResourceType_TRAIT_UNSPECIFIED, + }, +} + +type mockConnector struct{} + +func (m mockConnector) Metadata(ctx context.Context) (*v2.ConnectorMetadata, error) { + return &v2.ConnectorMetadata{}, nil +} + +func (m mockConnector) Validate(ctx context.Context) (annotations.Annotations, error) { + return annotations.Annotations{}, nil +} + +func (m mockConnector) ResourceSyncers(ctx context.Context) []connectorbuilder.ResourceSyncer { + return []connectorbuilder.ResourceSyncer{ + &mockResource{ + resourceType: mockResourceType, + resources: []*v2.Resource{ + { + Id: &v2.ResourceId{ + Resource: "1", + ResourceType: mockResourceType.Id, + }, + DisplayName: "Mock Resource 1", + }, + }, + }, + } +} + +type mockResource struct { + resourceType *v2.ResourceType + resources []*v2.Resource +} + +func (m *mockResource) ResourceType(ctx context.Context) *v2.ResourceType { + return m.resourceType +} + +func (m *mockResource) List(ctx context.Context, parentResourceID *v2.ResourceId, pToken *pagination.Token) ([]*v2.Resource, string, annotations.Annotations, error) { + return m.resources, "", nil, nil +} + +func (m *mockResource) Entitlements(ctx context.Context, resource *v2.Resource, pToken *pagination.Token) ([]*v2.Entitlement, string, annotations.Annotations, error) { + return []*v2.Entitlement{}, "", nil, nil +} + +func (m *mockResource) Grants(ctx context.Context, resource *v2.Resource, pToken *pagination.Token) ([]*v2.Grant, string, annotations.Annotations, error) { + return []*v2.Grant{}, "", nil, nil +} + +func TestIntegrationTestWrapper_Sync(t *testing.T) { + ctx := context.Background() + + wrapper := NewIntegrationTestWrapper(ctx, t, &mockConnector{}) + require.NotNil(t, wrapper) + + err := wrapper.Sync(ctx) + require.NoError(t, err) + + z := wrapper.LoadC1Z(ctx, t) + + resources, err := z.ListResources(ctx, &v2.ResourcesServiceListResourcesRequest{}) + require.NoError(t, err) + + require.Len(t, resources.List, 1) +} diff --git a/vendor/google.golang.org/grpc/test/bufconn/bufconn.go b/vendor/google.golang.org/grpc/test/bufconn/bufconn.go new file mode 100644 index 000000000..e6eb4feeb --- /dev/null +++ b/vendor/google.golang.org/grpc/test/bufconn/bufconn.go @@ -0,0 +1,318 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package bufconn provides a net.Conn implemented by a buffer and related +// dialing and listening functionality. +package bufconn + +import ( + "context" + "fmt" + "io" + "net" + "sync" + "time" +) + +// Listener implements a net.Listener that creates local, buffered net.Conns +// via its Accept and Dial method. +type Listener struct { + mu sync.Mutex + sz int + ch chan net.Conn + done chan struct{} +} + +// Implementation of net.Error providing timeout +type netErrorTimeout struct { + error +} + +func (e netErrorTimeout) Timeout() bool { return true } +func (e netErrorTimeout) Temporary() bool { return false } + +var errClosed = fmt.Errorf("closed") +var errTimeout net.Error = netErrorTimeout{error: fmt.Errorf("i/o timeout")} + +// Listen returns a Listener that can only be contacted by its own Dialers and +// creates buffered connections between the two. +func Listen(sz int) *Listener { + return &Listener{sz: sz, ch: make(chan net.Conn), done: make(chan struct{})} +} + +// Accept blocks until Dial is called, then returns a net.Conn for the server +// half of the connection. +func (l *Listener) Accept() (net.Conn, error) { + select { + case <-l.done: + return nil, errClosed + case c := <-l.ch: + return c, nil + } +} + +// Close stops the listener. +func (l *Listener) Close() error { + l.mu.Lock() + defer l.mu.Unlock() + select { + case <-l.done: + // Already closed. + break + default: + close(l.done) + } + return nil +} + +// Addr reports the address of the listener. +func (l *Listener) Addr() net.Addr { return addr{} } + +// Dial creates an in-memory full-duplex network connection, unblocks Accept by +// providing it the server half of the connection, and returns the client half +// of the connection. +func (l *Listener) Dial() (net.Conn, error) { + return l.DialContext(context.Background()) +} + +// DialContext creates an in-memory full-duplex network connection, unblocks Accept by +// providing it the server half of the connection, and returns the client half +// of the connection. If ctx is Done, returns ctx.Err() +func (l *Listener) DialContext(ctx context.Context) (net.Conn, error) { + p1, p2 := newPipe(l.sz), newPipe(l.sz) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-l.done: + return nil, errClosed + case l.ch <- &conn{p1, p2}: + return &conn{p2, p1}, nil + } +} + +type pipe struct { + mu sync.Mutex + + // buf contains the data in the pipe. It is a ring buffer of fixed capacity, + // with r and w pointing to the offset to read and write, respectively. + // + // Data is read between [r, w) and written to [w, r), wrapping around the end + // of the slice if necessary. + // + // The buffer is empty if r == len(buf), otherwise if r == w, it is full. + // + // w and r are always in the range [0, cap(buf)) and [0, len(buf)]. + buf []byte + w, r int + + wwait sync.Cond + rwait sync.Cond + + // Indicate that a write/read timeout has occurred + wtimedout bool + rtimedout bool + + wtimer *time.Timer + rtimer *time.Timer + + closed bool + writeClosed bool +} + +func newPipe(sz int) *pipe { + p := &pipe{buf: make([]byte, 0, sz)} + p.wwait.L = &p.mu + p.rwait.L = &p.mu + + p.wtimer = time.AfterFunc(0, func() {}) + p.rtimer = time.AfterFunc(0, func() {}) + return p +} + +func (p *pipe) empty() bool { + return p.r == len(p.buf) +} + +func (p *pipe) full() bool { + return p.r < len(p.buf) && p.r == p.w +} + +func (p *pipe) Read(b []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + // Block until p has data. + for { + if p.closed { + return 0, io.ErrClosedPipe + } + if !p.empty() { + break + } + if p.writeClosed { + return 0, io.EOF + } + if p.rtimedout { + return 0, errTimeout + } + + p.rwait.Wait() + } + wasFull := p.full() + + n = copy(b, p.buf[p.r:len(p.buf)]) + p.r += n + if p.r == cap(p.buf) { + p.r = 0 + p.buf = p.buf[:p.w] + } + + // Signal a blocked writer, if any + if wasFull { + p.wwait.Signal() + } + + return n, nil +} + +func (p *pipe) Write(b []byte) (n int, err error) { + p.mu.Lock() + defer p.mu.Unlock() + if p.closed { + return 0, io.ErrClosedPipe + } + for len(b) > 0 { + // Block until p is not full. + for { + if p.closed || p.writeClosed { + return 0, io.ErrClosedPipe + } + if !p.full() { + break + } + if p.wtimedout { + return 0, errTimeout + } + + p.wwait.Wait() + } + wasEmpty := p.empty() + + end := cap(p.buf) + if p.w < p.r { + end = p.r + } + x := copy(p.buf[p.w:end], b) + b = b[x:] + n += x + p.w += x + if p.w > len(p.buf) { + p.buf = p.buf[:p.w] + } + if p.w == cap(p.buf) { + p.w = 0 + } + + // Signal a blocked reader, if any. + if wasEmpty { + p.rwait.Signal() + } + } + return n, nil +} + +func (p *pipe) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + p.closed = true + // Signal all blocked readers and writers to return an error. + p.rwait.Broadcast() + p.wwait.Broadcast() + return nil +} + +func (p *pipe) closeWrite() error { + p.mu.Lock() + defer p.mu.Unlock() + p.writeClosed = true + // Signal all blocked readers and writers to return an error. + p.rwait.Broadcast() + p.wwait.Broadcast() + return nil +} + +type conn struct { + io.Reader + io.Writer +} + +func (c *conn) Close() error { + err1 := c.Reader.(*pipe).Close() + err2 := c.Writer.(*pipe).closeWrite() + if err1 != nil { + return err1 + } + return err2 +} + +func (c *conn) SetDeadline(t time.Time) error { + c.SetReadDeadline(t) + c.SetWriteDeadline(t) + return nil +} + +func (c *conn) SetReadDeadline(t time.Time) error { + p := c.Reader.(*pipe) + p.mu.Lock() + defer p.mu.Unlock() + p.rtimer.Stop() + p.rtimedout = false + if !t.IsZero() { + p.rtimer = time.AfterFunc(time.Until(t), func() { + p.mu.Lock() + defer p.mu.Unlock() + p.rtimedout = true + p.rwait.Broadcast() + }) + } + return nil +} + +func (c *conn) SetWriteDeadline(t time.Time) error { + p := c.Writer.(*pipe) + p.mu.Lock() + defer p.mu.Unlock() + p.wtimer.Stop() + p.wtimedout = false + if !t.IsZero() { + p.wtimer = time.AfterFunc(time.Until(t), func() { + p.mu.Lock() + defer p.mu.Unlock() + p.wtimedout = true + p.wwait.Broadcast() + }) + } + return nil +} + +func (*conn) LocalAddr() net.Addr { return addr{} } +func (*conn) RemoteAddr() net.Addr { return addr{} } + +type addr struct{} + +func (addr) Network() string { return "bufconn" } +func (addr) String() string { return "bufconn" } diff --git a/vendor/modules.txt b/vendor/modules.txt index 9d28012cf..d17d499a6 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -651,6 +651,7 @@ google.golang.org/grpc/serviceconfig google.golang.org/grpc/stats google.golang.org/grpc/status google.golang.org/grpc/tap +google.golang.org/grpc/test/bufconn # google.golang.org/protobuf v1.36.5 ## explicit; go 1.21 google.golang.org/protobuf/encoding/protojson From 82158c83c89f1d8ba39e853a81e93a841846eeaf Mon Sep 17 00:00:00 2001 From: Marcus Goldschmidt Date: Mon, 2 Jun 2025 16:30:41 -0400 Subject: [PATCH 2/2] fix manager close for test --- pkg/test/integration_wrapper.go | 13 ++++++++++--- pkg/test/integration_wrapper_test.go | 5 +++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/pkg/test/integration_wrapper.go b/pkg/test/integration_wrapper.go index e49cd97ce..4eabf3683 100644 --- a/pkg/test/integration_wrapper.go +++ b/pkg/test/integration_wrapper.go @@ -2,7 +2,6 @@ package test import ( "context" - "log" "net" "os" "testing" @@ -22,7 +21,7 @@ import ( "google.golang.org/grpc/test/bufconn" ) -const bufSize = 1024 +const bufSize = 1024 * 1024 // 1MB buffer size for the in-memory connection type inMemoryConnectorClient struct { connectorV2.ResourceTypesServiceClient @@ -58,6 +57,9 @@ func NewIntegrationTestWrapper(ctx context.Context, t *testing.T, connector inte tempPath, err := os.CreateTemp("", "baton-integration-test-*.c1z") require.NoError(t, err) + err = tempPath.Close() + require.NoError(t, err) + t.Cleanup(func() { err := os.Remove(tempPath.Name()) require.NoError(t, err) @@ -96,7 +98,7 @@ func NewIntegrationTestWrapper(ctx context.Context, t *testing.T, connector inte go func() { if err := s.Serve(lis); err != nil { - log.Fatalf("Server exited with error: %v", err) + t.Errorf("Server exited with error: %v", err) } }() @@ -146,6 +148,11 @@ func NewIntegrationTestWrapper(ctx context.Context, t *testing.T, connector inte m, err := manager.New(ctx, tempPath.Name()) require.NoError(t, err) + t.Cleanup(func() { + err := m.Close(ctx) + require.NoError(t, err) + }) + return &IntegrationTestWrapper{ Client: client, syncer: syncer, diff --git a/pkg/test/integration_wrapper_test.go b/pkg/test/integration_wrapper_test.go index 0983e42e3..3dd086ae3 100644 --- a/pkg/test/integration_wrapper_test.go +++ b/pkg/test/integration_wrapper_test.go @@ -79,6 +79,11 @@ func TestIntegrationTestWrapper_Sync(t *testing.T) { z := wrapper.LoadC1Z(ctx, t) + t.Cleanup(func() { + err := z.Close() + require.NoError(t, err) + }) + resources, err := z.ListResources(ctx, &v2.ResourcesServiceListResourcesRequest{}) require.NoError(t, err)