From 54eae85594b60fba7dc784243cace72df5858318 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:05:23 +0530 Subject: [PATCH 1/4] feat: added evalSLEEP --- core/eval.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/eval.go b/core/eval.go index 4ad153e..4802c09 100644 --- a/core/eval.go +++ b/core/eval.go @@ -304,6 +304,19 @@ func evalLATENCY(args []string) []byte { } } +func evalSLEEP(args []string)[]byte{ + if len(args)!=1{ + return Encode(errors.New("ERR wrong number of arguments for 'SLEEP' command"),false) + } + + DurationSec,err:=strconv.ParseInt(args[0],10,64) + if err!=nil{ + return Encode(errors.New("ERR value is not an integer or out of range"),false) + } + time.Sleep(time.Duration(durationSec)*time.Second) + return RESP_OK +} + // func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{ func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{ //It's job is like depending on what job is sent to us @@ -336,6 +349,10 @@ func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{ buf.Write(evalCLIENT(cmd.Args)) case "LATENCY": buf.Write(evalLATENCY(cmd.Args)) + case "LRU": + buf.Write(evalLRU(cmd.Args)) + case "SLEEP": + buf.Write(evalSLEEP(cmd.Args)) default: buf.Write(evalPING(cmd.Args)) } From 59cc1177934ee27dbeded002db48bed419b5878d Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:06:06 +0530 Subject: [PATCH 2/4] feat: added the logic of graceful shutdown using signals --- main.go | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 3119f30..f7405b4 100644 --- a/main.go +++ b/main.go @@ -4,8 +4,12 @@ package main import( "flag" "log" - +"os" +"os/signal" +"syscall" +"sync" "github.com/sharpsalt/Velox-In-Memory-Database/server" +"github.com/sharpsalt/Velox-In-Memory-Database/config" ) var config=&server.Config{} @@ -25,12 +29,32 @@ func setupFlag(){ func main(){ setupFlag() //we will setup the flags firt log.Println("hello!! is it really running") - err:=server.RunAsyncTCPServer(config) - if err!=nil{ - log.Println("Error starting server:", err) - return - } + // err:=server.RunAsyncTCPServer(config) + // if err!=nil{ + // log.Println("Error starting server:", err) + // return + // } /* I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously */ + var sigs cha os.Signal=make(chan os.Signal,1) + /* + we will listen to interrupts or signals is through a channel + so here we are creating a channel who accept of type os.Signal + + and then we are registering it to listen SIGTERM or SIGINT + */ + signal.Notify(sigs,syscall.SIGTERM,syscall.SIGINT) + var wg sync.WaitGroup + wg.Add(2) + + go server.RunAsyncTCPServer(&wg) + go server.WaitForSignal(&wg,sigs) + /* + We are spinning up 2 goroutine + + earlier we had only 1 thread, so currently we have 2thread, 1 is for actual signal(vo mera asynctcpserver) + and we will wait till completion + */ + wg.Wait() } From 007e9e6e060d5aa514d37d7e9df2f9bd7bb162d1 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:06:54 +0530 Subject: [PATCH 3/4] feat: Implemented WaitForSignal for signalling concept --- server/async_tcp.go | 63 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/server/async_tcp.go b/server/async_tcp.go index 3a5f11e..e768665 100644 --- a/server/async_tcp.go +++ b/server/async_tcp.go @@ -13,6 +13,34 @@ import( var con_client int=0 var cronFrequency time.Duration=1*time.Second //a cron frequency of 1s var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it ran +const EngineStatus_BUSY int32=1<<2 +const EngineStatus_WAITING=1<<1 +const EngineStatus_SHUTTING_DOWN int32=1<<3 + + +var eStatus int32=EngineStatus_WAITING + +func WaitForSignal(wg *sync.WaitGroup,sigs chan os.Signal){ + defer wg.Done() + <-sigs //this is a blcoking call like until we give a channel + //we would not be moving forward + + //if servers is busy cntinue to work + for atomic.LoadInt32(&eStatus)==EngineStatus_BUSY{ + } + + //CRITICAL to hanle + //we do not want server to ever go back to BUSY State + //when control flow is here + + //immediately set the status to be SHUTTING _DOWN + //the only place where we can set the status to be SHUTTING_DONW + atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN) + + //if server is in any other statem initiate a shutdown + core.ShutDown() + os.Exit(0) +} // readCommands reads RESP commands from a file descriptor and returns them func readCommands(c core.FDComm)([]*core.RedisCmd,error){ @@ -57,6 +85,12 @@ func RunAsyncTCPServer(cfg *Config) error{ log.Println("Starting an asynchronous TCP Server on", cfg.Host, cfg.Port) // since humlog linux based system use krrhe hai so // so we are using epoll + defer wg.Done() + defer func(){ + atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN) + }() + + log.Println("Starting an asynchronous TCP Server on ",config.Host,config.Port) max_clients := 20000 //create EPOLL Event Objects to hold events @@ -137,7 +171,7 @@ func RunAsyncTCPServer(cfg *Config) error{ return err } - for{ + for atomic.LoadInt32(&eStatus)!=EngineStatus_SHUTTING_DOWN{ //eralier we had infinite for loop so now we will chek it until the server is not shutting down /* The first thing which we do to execute this cron @@ -155,6 +189,10 @@ func RunAsyncTCPServer(cfg *Config) error{ core.DeleteExpiredKey() lastCronExecTime = time.Now() } + //Say,the Engine triggered SHUTTING down when the control flow is here-> + //Current: Engine status==WAITING + //Update: Engine status==SHUTTING_DOWN + //Then we have to exit (handled in Signal handler) @@ -162,13 +200,28 @@ func RunAsyncTCPServer(cfg *Config) error{ //that why i am invoking EPOLL wait //see if any FD is ready for an IO - nevents, e := syscall.EpollWait(epollFD, events[:], -1) + nevents, e := syscall.EpollWait(epollFD, events[:], -1) //EpollWait mtlb some file descriptor is ready for IO //EpollWait will monitor if any IO is ready, and put it in buffer(evenets buffer), if none is there then the call wouldget blocked if e != nil{ continue } + //Here, we do not want server to go back from SHUTTING_DOWN + //to BUSY + //If the engine status==SHUTTING_DOWN over here-> we have to exit + //hence the only legal transition is from WAITING to BUSY + //if that does not happen then we can exit + + //mark engine as BUSY only when it is in the waiting stats + if !atomic.CompareAndSwapInt32(&eStatus,EngineStatus_WAITING,EngineStatus_BUSY){ + //if swap unsuccessful then the exitsing status is not WAITING, but something + switch eStatus{ + case EngineStatus_SHUTTING_DOWN: + return nil + } + } + for i := 0; i < nevents; i++{ //if the socket server itself is ready for an IO if int(events[i].Fd) == serverFD{//each events has a File descriptors @@ -204,7 +257,13 @@ func RunAsyncTCPServer(cfg *Config) error{ respond(cmds, comm) } } + + //mark engine as WAITING + //no contention as the signal handler is blocked until + //the engine is BUSY + atomic.StoreInt32(&eStatus Date: Fri, 24 Apr 2026 23:07:22 +0530 Subject: [PATCH 4/4] feat: Implemented ShutDown --- core/events.go | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 core/events.go diff --git a/core/events.go b/core/events.go new file mode 100644 index 0000000..f591a9d --- /dev/null +++ b/core/events.go @@ -0,0 +1,6 @@ +package core + +func Shutdown(){ + //this function internally invokes BGREWRITEAOF, it would take the in-memory hash table and dump in it AOF File(Append only file) + evalBGREWRITEAOF([]string{}) +} \ No newline at end of file