Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 11 additions & 13 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,13 @@ import (

"github.com/dynamic-calm/mokv/api"
"github.com/dynamic-calm/mokv/logger"
"github.com/dynamic-calm/mokv/store"
"github.com/hashicorp/raft"
raftboltdb "github.com/hashicorp/raft-boltdb/v2"
"github.com/rs/zerolog/log"
"google.golang.org/protobuf/proto"
)

type KVI interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
List() <-chan []byte
}

type ServerProvider interface {
GetServers() ([]*api.Server, error)
}

const (
// RaftRPC is the first byte sent on a connection to identify it as a Raft command
Expand Down Expand Up @@ -75,15 +65,23 @@ type KVConfig struct {
DataDir string
}

// Storer defines the interface for a the key-value storage.
type Storer interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
List() <-chan []byte
}

// KV is a distributed key-value store implementation using Raft consensus.
type KV struct {
cfg *KVConfig
store store.Storer
store Storer
raft *raft.Raft
}

// New creates and initializes a new distributed KV.
func New(store store.Storer, cfg *KVConfig) (*KV, error) {
func New(store Storer, cfg *KVConfig) (*KV, error) {
kv := &KV{
cfg: cfg,
store: store,
Expand Down Expand Up @@ -355,7 +353,7 @@ func (kv *KV) setupRaft(dataDir string) error {
}

type fsm struct {
kv store.Storer
kv Storer
dataDir string
}

Expand Down
10 changes: 9 additions & 1 deletion mokv/mokv.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,22 @@ type Config struct {
LogLevel string
}

// Storer defines the interface for a the key-value storage.
type Storer interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
List() <-chan []byte
}

// GetEnv defines a function signature for retrieving environment variables.
type GetEnv func(string) string

// MOKV represents the main server instance orchestrating gRPC, Raft, and Discovery services.
type MOKV struct {
cfg *Config
getEnv GetEnv
kv kv.KVI
kv Storer
meterProvider *metric.MeterProvider
grpcServer *grpc.Server
metricsServer *http.Server
Expand Down
51 changes: 26 additions & 25 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ import (
"fmt"

"github.com/dynamic-calm/mokv/api"
"github.com/dynamic-calm/mokv/kv"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/selector"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
Expand All @@ -18,31 +18,34 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

type kvServer struct {
api.KVServer
KV kv.KVI
serverGetter kv.ServerProvider
logger zerolog.Logger
// Storer defines the interface for a the key-value storage.
type Storer interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
List() <-chan []byte
}

func NewServerGetter(kv kv.KVI) kv.ServerProvider {
return &kvServerGetter{kv: kv}
// ServerProvider defines the interface for gettings the servers
// in the cluster.
type ServerProvider interface {
GetServers() ([]*api.Server, error)
}

type kvServerGetter struct {
kv kv.KVI
// KV combines storage and cluster awareness.
type KV interface {
Storer
ServerProvider
}

func (kg *kvServerGetter) GetServers() ([]*api.Server, error) {
if provider, ok := kg.kv.(kv.ServerProvider); ok {
return provider.GetServers()
}
return nil, fmt.Errorf("kv store does not support getting servers")
type kvServer struct {
api.KVServer
kv KV
}

// New creates and configures a new gRPC server instance with logging middleware,
// health checks, and the registered KV service.
func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server {
func New(KV KV, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Server {
logOpts := []logging.Option{
logging.WithLogOnEvents(logging.FinishCall),
logging.WithLevels(logging.DefaultServerCodeToLevel),
Expand All @@ -69,18 +72,16 @@ func New(KV kv.KVI, logger zerolog.Logger, opts ...grpc.ServerOption) *grpc.Serv
healthSrv.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
healthpb.RegisterHealthServer(s, healthSrv)

serverGetter := NewServerGetter(KV)
srv := &kvServer{
KV: KV,
serverGetter: serverGetter,
kv: KV,
}
api.RegisterKVServer(s, srv)
return s
}

// Get retrieves a value for a specific key from the store.
func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetResponse, error) {
value, err := s.KV.Get(req.Key)
value, err := s.kv.Get(req.Key)
if err != nil {
return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err()
}
Expand All @@ -89,24 +90,24 @@ func (s *kvServer) Get(ctx context.Context, req *api.GetRequest) (*api.GetRespon

// Set stores a key-value pair in the store.
func (s *kvServer) Set(ctx context.Context, req *api.SetRequest) (*api.SetResponse, error) {
if err := s.KV.Set(req.Key, req.Value); err != nil {
s.logger.Error().Err(err).Str("key", req.Key).Msg("set operation failed")
if err := s.kv.Set(req.Key, req.Value); err != nil {
log.Error().Err(err).Str("key", req.Key).Msg("set operation failed")
return &api.SetResponse{Ok: false}, status.Errorf(codes.Internal, "failed to set key: %v", err)
}
return &api.SetResponse{Ok: true}, nil
}

// Delete removes a key from the store.
func (s *kvServer) Delete(ctx context.Context, req *api.DeleteRequest) (*api.DeleteResponse, error) {
if err := s.KV.Delete(req.Key); err != nil {
if err := s.kv.Delete(req.Key); err != nil {
return nil, status.New(codes.NotFound, s.notFoundMsg(req.Key)).Err()
}
return &api.DeleteResponse{Ok: true}, nil
}

// List streams all existing key-value pairs from the store to the client.
func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[api.GetResponse]) error {
for value := range s.KV.List() {
for value := range s.kv.List() {
select {
case <-stream.Context().Done():
return stream.Context().Err()
Expand All @@ -122,7 +123,7 @@ func (s *kvServer) List(req *emptypb.Empty, stream grpc.ServerStreamingServer[ap

// GetServers returns the list of nodes in the cluster.
func (s *kvServer) GetServers(ctx context.Context, req *emptypb.Empty) (*api.GetServersResponse, error) {
servers, err := s.serverGetter.GetServers()
servers, err := s.kv.GetServers()
if err != nil {
return nil, err
}
Expand Down
10 changes: 9 additions & 1 deletion server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ import (
"google.golang.org/protobuf/types/known/emptypb"
)

type testKV struct {
*store.Store
}

func (t *testKV) GetServers() ([]*api.Server, error) {
return nil, nil
}

// setupTestServer creates and starts a test server, returning cleanup function
func setupTestServer(t *testing.T) (api.KVClient, func()) {
t.Helper()
Expand All @@ -30,7 +38,7 @@ func setupTestServer(t *testing.T) (api.KVClient, func()) {
t.Fatalf("failed to create listener: %v", err)
}

st := store.New()
st := &testKV{Store: store.New()}

srv := server.New(st, log.Logger)

Expand Down
8 changes: 0 additions & 8 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,6 @@ import (
"sync"
)

// Storer defines the interface for a the key-value storage.
type Storer interface {
Get(key string) ([]byte, error)
Set(key string, value []byte) error
Delete(key string) error
List() <-chan []byte
}

// Store is a concurrent, in-memory implementation of Storer using a sync.Map.
type Store struct {
m sync.Map
Expand Down