From db22746f48d4096aa12a0dc6b94d047775276f37 Mon Sep 17 00:00:00 2001 From: Mateo Presa Castro Date: Sun, 7 Dec 2025 09:15:55 +0100 Subject: [PATCH 1/3] refactor: move store interface outside store package --- kv/kv.go | 15 +++++++++++---- store/store.go | 8 -------- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 9e4e20e..d5376f2 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -13,7 +13,6 @@ import ( "github.com/dynamic-calm/mokv/api" "github.com/dynamic-calm/mokv/logger" - "github.com/dynamic-calm/mokv/store" "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/rs/zerolog/log" @@ -75,15 +74,23 @@ type KVConfig struct { DataDir string } +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte +} + // KV is a distributed key-value store implementation using Raft consensus. type KV struct { cfg *KVConfig - store store.Storer + store Storer raft *raft.Raft } // New creates and initializes a new distributed KV. -func New(store store.Storer, cfg *KVConfig) (*KV, error) { +func New(store Storer, cfg *KVConfig) (*KV, error) { kv := &KV{ cfg: cfg, store: store, @@ -355,7 +362,7 @@ func (kv *KV) setupRaft(dataDir string) error { } type fsm struct { - kv store.Storer + kv Storer dataDir string } diff --git a/store/store.go b/store/store.go index 103ad35..fc0967d 100644 --- a/store/store.go +++ b/store/store.go @@ -5,14 +5,6 @@ import ( "sync" ) -// Storer defines the interface for a the key-value storage. -type Storer interface { - Get(key string) ([]byte, error) - Set(key string, value []byte) error - Delete(key string) error - List() <-chan []byte -} - // Store is a concurrent, in-memory implementation of Storer using a sync.Map. type Store struct { m sync.Map From f10b547a329f882005ec4e26980cc3ee5782ed18 Mon Sep 17 00:00:00 2001 From: Mateo Presa Castro Date: Sun, 7 Dec 2025 09:29:26 +0100 Subject: [PATCH 2/3] refactor: triplicate Storer interface --- kv/kv.go | 7 ------- mokv/mokv.go | 10 +++++++++- server/server.go | 16 ++++++++++++---- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index d5376f2..a1771a1 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -19,13 +19,6 @@ import ( "google.golang.org/protobuf/proto" ) -type KVI interface { - Get(key string) ([]byte, error) - Set(key string, value []byte) error - Delete(key string) error - List() <-chan []byte -} - type ServerProvider interface { GetServers() ([]*api.Server, error) } diff --git a/mokv/mokv.go b/mokv/mokv.go index 16c0be4..5d02e60 100644 --- a/mokv/mokv.go +++ b/mokv/mokv.go @@ -41,6 +41,14 @@ type Config struct { LogLevel string } +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte +} + // GetEnv defines a function signature for retrieving environment variables. type GetEnv func(string) string @@ -48,7 +56,7 @@ type GetEnv func(string) string type MOKV struct { cfg *Config getEnv GetEnv - kv kv.KVI + kv Storer meterProvider *metric.MeterProvider grpcServer *grpc.Server metricsServer *http.Server diff --git a/server/server.go b/server/server.go index f6b36b3..53d7a13 100644 --- a/server/server.go +++ b/server/server.go @@ -18,19 +18,27 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +// Storer defines the interface for a the key-value storage. +type Storer interface { + Get(key string) ([]byte, error) + Set(key string, value []byte) error + Delete(key string) error + List() <-chan []byte +} + type kvServer struct { api.KVServer - KV kv.KVI + KV Storer serverGetter kv.ServerProvider logger zerolog.Logger } -func NewServerGetter(kv kv.KVI) kv.ServerProvider { +func NewServerGetter(kv Storer) kv.ServerProvider { return &kvServerGetter{kv: kv} } type kvServerGetter struct { - kv kv.KVI + kv Storer } func (kg *kvServerGetter) GetServers() ([]*api.Server, error) { @@ -42,7 +50,7 @@ func (kg *kvServerGetter) GetServers() ([]*api.Server, error) { // New creates and configures a new gRPC server instance with logging middleware, // health checks, and the registered KV service. -func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { +func New(KV Storer, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { logOpts := []logging.Option{ logging.WithLogOnEvents(logging.FinishCall), logging.WithLevels(logging.DefaultServerCodeToLevel), From 82f272d65b09859bfee819afb757c8265313da0e Mon Sep 17 00:00:00 2001 From: Mateo Presa Castro Date: Sun, 7 Dec 2025 09:50:34 +0100 Subject: [PATCH 3/3] refactor: server getter --- kv/kv.go | 4 +--- server/server.go | 47 ++++++++++++++++++------------------------- server/server_test.go | 10 ++++++++- 3 files changed, 30 insertions(+), 31 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index a1771a1..f2468ba 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -19,9 +19,7 @@ import ( "google.golang.org/protobuf/proto" ) -type ServerProvider interface { - GetServers() ([]*api.Server, error) -} + const ( // RaftRPC is the first byte sent on a connection to identify it as a Raft command diff --git a/server/server.go b/server/server.go index 53d7a13..0b3e904 100644 --- a/server/server.go +++ b/server/server.go @@ -5,11 +5,11 @@ import ( "fmt" "github.com/dynamic-calm/mokv/api" - "github.com/dynamic-calm/mokv/kv" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector" "github.com/rs/zerolog" + "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/health" @@ -26,31 +26,26 @@ type Storer interface { List() <-chan []byte } -type kvServer struct { - api.KVServer - KV Storer - serverGetter kv.ServerProvider - logger zerolog.Logger -} - -func NewServerGetter(kv Storer) kv.ServerProvider { - return &kvServerGetter{kv: kv} +// ServerProvider defines the interface for gettings the servers +// in the cluster. +type ServerProvider interface { + GetServers() ([]*api.Server, error) } -type kvServerGetter struct { - kv Storer +// KV combines storage and cluster awareness. +type KV interface { + Storer + ServerProvider } -func (kg *kvServerGetter) GetServers() ([]*api.Server, error) { - if provider, ok := kg.kv.(kv.ServerProvider); ok { - return provider.GetServers() - } - return nil, fmt.Errorf("kv store does not support getting servers") +type kvServer struct { + api.KVServer + kv KV } // New creates and configures a new gRPC server instance with logging middleware, // health checks, and the registered KV service. -func New(KV Storer, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { +func New(KV KV, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server { logOpts := []logging.Option{ logging.WithLogOnEvents(logging.FinishCall), logging.WithLevels(logging.DefaultServerCodeToLevel), @@ -77,10 +72,8 @@ func New(KV Storer, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Serv healthSrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) healthpb.RegisterHealthServer(s, healthSrv) - serverGetter := NewServerGetter(KV) srv := &kvServer{ - KV: KV, - serverGetter: serverGetter, + kv: KV, } api.RegisterKVServer(s, srv) return s @@ -88,7 +81,7 @@ func New(KV Storer, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Serv // Get retrieves a value for a specific key from the store. func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetResponse, error) { - value, err := s.KV.Get(req.Key) + value, err := s.kv.Get(req.Key) if err != nil { return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err() } @@ -97,8 +90,8 @@ func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetRespon // Set stores a key-value pair in the store. func (s *kvServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetResponse, error) { - if err := s.KV.Set(req.Key, req.Value); err != nil { - s.logger.Error().Err(err).Str("key", req.Key).Msg("set operation failed") + if err := s.kv.Set(req.Key, req.Value); err != nil { + log.Error().Err(err).Str("key", req.Key).Msg("set operation failed") return &api.SetResponse{Ok: false}, status.Errorf(codes.Internal, "failed to set key: %v", err) } return &api.SetResponse{Ok: true}, nil @@ -106,7 +99,7 @@ func (s *kvServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetRespon // Delete removes a key from the store. func (s *kvServer) Delete(ctx context.Context, req *api.DeleteRequest) (*api.DeleteResponse, error) { - if err := s.KV.Delete(req.Key); err != nil { + if err := s.kv.Delete(req.Key); err != nil { return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err() } return &api.DeleteResponse{Ok: true}, nil @@ -114,7 +107,7 @@ func (s *kvServer) Delete(ctx context.Context, req *api.DeleteRequest) (*api.Del // List streams all existing key-value pairs from the store to the client. func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[api.GetResponse]) error { - for value := range s.KV.List() { + for value := range s.kv.List() { select { case <-stream.Context().Done(): return stream.Context().Err() @@ -130,7 +123,7 @@ func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[ap // GetServers returns the list of nodes in the cluster. func (s *kvServer) GetServers(ctx context.Context, req *emptypb.Empty) (*api.GetServersResponse, error) { - servers, err := s.serverGetter.GetServers() + servers, err := s.kv.GetServers() if err != nil { return nil, err } diff --git a/server/server_test.go b/server/server_test.go index 9210fb7..aa07aaa 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -21,6 +21,14 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +type testKV struct { + *store.Store +} + +func (t *testKV) GetServers() ([]*api.Server, error) { + return nil, nil +} + // setupTestServer creates and starts a test server, returning cleanup function func setupTestServer(t *testing.T) (api.KVClient, func()) { t.Helper() @@ -30,7 +38,7 @@ func setupTestServer(t *testing.T) (api.KVClient, func()) { t.Fatalf("failed to create listener: %v", err) } - st := store.New() + st := &testKV{Store: store.New()} srv := server.New(st, log.Logger)