diff --git a/kv/kv.go b/kv/kv.go index 9e4e20e..f2468ba 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -13,23 +13,13 @@ import ( "github.com/dynamic-calm/mokv/api" "github.com/dynamic-calm/mokv/logger" - "github.com/dynamic-calm/mokv/store" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/rs/zerolog/log" "google.golang.org/protobuf/proto" ) -type KVI interface { - Get(key string) ([]byte, error) - Set(key string, value []byte) error - Delete(key string) error - List() <-chan []byte -} -type ServerProvider interface { - GetServers() ([]*api.Server, error) -} const ( // RaftRPC is the first byte sent on a connection to identify it as a Raft command @@ -75,15 +65,23 @@ type KVConfig struct { DataDir string } +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte +} + // KV is a distributed key-value store implementation using Raft consensus. type KV struct { cfg *KVConfig - store store.Storer + store Storer raft *raft.Raft } // New creates and initializes a new distributed KV. -func New(store store.Storer, cfg *KVConfig) (*KV, error) { +func New(store Storer, cfg *KVConfig) (*KV, error) { kv := &KV{ cfg: cfg, store: store, @@ -355,7 +353,7 @@ func (kv *KV) setupRaft(dataDir string) error { } type fsm struct { - kv store.Storer + kv Storer dataDir string } diff --git a/mokv/mokv.go b/mokv/mokv.go index 16c0be4..5d02e60 100644 --- a/mokv/mokv.go +++ b/mokv/mokv.go @@ -41,6 +41,14 @@ type Config struct { LogLevel string } +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte +} + // GetEnv defines a function signature for retrieving environment variables. type GetEnv func(string) string @@ -48,7 +56,7 @@ type GetEnv func(string) string type MOKV struct { cfg *Config getEnv GetEnv - kv kv.KVI + kv Storer meterProvider *metric.MeterProvider grpcServer *grpc.Server metricsServer *http.Server diff --git a/server/server.go b/server/server.go index f6b36b3..0b3e904 100644 --- a/server/server.go +++ b/server/server.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/dynamic-calm/mokv/api" - "github.com/dynamic-calm/mokv/kv" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/health" @@ -18,31 +18,34 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -type kvServer struct { - api.KVServer - KV kv.KVI - serverGetter kv.ServerProvider - logger zerolog.Logger +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte } -func NewServerGetter(kv kv.KVI) kv.ServerProvider { - return &kvServerGetter{kv: kv} +// ServerProvider defines the interface for gettings the servers +// in the cluster. +type ServerProvider interface { + GetServers() ([]*api.Server, error) } -type kvServerGetter struct { - kv kv.KVI +// KV combines storage and cluster awareness. +type KV interface { + Storer + ServerProvider } -func (kg *kvServerGetter) GetServers() ([]*api.Server, error) { - if provider, ok := kg.kv.(kv.ServerProvider); ok { - return provider.GetServers() - } - return nil, fmt.Errorf("kv store does not support getting servers") +type kvServer struct { + api.KVServer + kv KV } // New creates and configures a new gRPC server instance with logging middleware, // health checks, and the registered KV service. -func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { +func New(KV KV, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { logOpts := []logging.Option{ logging.WithLogOnEvents(logging.FinishCall), logging.WithLevels(logging.DefaultServerCodeToLevel), @@ -69,10 +72,8 @@ func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Serv healthSrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) healthpb.RegisterHealthServer(s, healthSrv) - serverGetter := NewServerGetter(KV) srv := &kvServer{ - KV: KV, - serverGetter: serverGetter, + kv: KV, } api.RegisterKVServer(s, srv) return s @@ -80,7 +81,7 @@ func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Serv // Get retrieves a value for a specific key from the store. func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetResponse, error) { - value, err := s.KV.Get(req.Key) + value, err := s.kv.Get(req.Key) if err != nil { return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err() } @@ -89,8 +90,8 @@ func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetRespon // Set stores a key-value pair in the store. func (s *kvServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetResponse, error) { - if err := s.KV.Set(req.Key, req.Value); err != nil { - s.logger.Error().Err(err).Str("key", req.Key).Msg("set operation failed") + if err := s.kv.Set(req.Key, req.Value); err != nil { + log.Error().Err(err).Str("key", req.Key).Msg("set operation failed") return &api.SetResponse{Ok: false}, status.Errorf(codes.Internal, "failed to set key: %v", err) } return &api.SetResponse{Ok: true}, nil @@ -98,7 +99,7 @@ func (s *kvServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetRespon // Delete removes a key from the store. func (s *kvServer) Delete(ctx context.Context, req *api.DeleteRequest) (*api.DeleteResponse, error) { - if err := s.KV.Delete(req.Key); err != nil { + if err := s.kv.Delete(req.Key); err != nil { return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err() } return &api.DeleteResponse{Ok: true}, nil @@ -106,7 +107,7 @@ func (s *kvServer) Delete(ctx context.Context, req *api.DeleteRequest) (*api.Del // List streams all existing key-value pairs from the store to the client. func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[api.GetResponse]) error { - for value := range s.KV.List() { + for value := range s.kv.List() { select { case <-stream.Context().Done(): return stream.Context().Err() @@ -122,7 +123,7 @@ func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[ap // GetServers returns the list of nodes in the cluster. func (s *kvServer) GetServers(ctx context.Context, req *emptypb.Empty) (*api.GetServersResponse, error) { - servers, err := s.serverGetter.GetServers() + servers, err := s.kv.GetServers() if err != nil { return nil, err } diff --git a/server/server_test.go b/server/server_test.go index 9210fb7..aa07aaa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -21,6 +21,14 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +type testKV struct { + *store.Store +} + +func (t *testKV) GetServers() ([]*api.Server, error) { + return nil, nil +} + // setupTestServer creates and starts a test server, returning cleanup function func setupTestServer(t *testing.T) (api.KVClient, func()) { t.Helper() @@ -30,7 +38,7 @@ func setupTestServer(t *testing.T) (api.KVClient, func()) { t.Fatalf("failed to create listener: %v", err) } - st := store.New() + st := &testKV{Store: store.New()} srv := server.New(st, log.Logger) diff --git a/store/store.go b/store/store.go index 103ad35..fc0967d 100644 --- a/store/store.go +++ b/store/store.go @@ -5,14 +5,6 @@ import ( "sync" ) -// Storer defines the interface for a the key-value storage. -type Storer interface { - Get(key string) ([]byte, error) - Set(key string, value []byte) error - Delete(key string) error - List() <-chan []byte -} - // Store is a concurrent, in-memory implementation of Storer using a sync.Map. type Store struct { m sync.Map