From 8eaab38800f8b60a7bd39f7e92a18e544b5604d4 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:05:23 +0530 Subject: [PATCH 1/7] feat: added evalSLEEP --- core/eval.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/eval.go b/core/eval.go index 4ad153e..4802c09 100644 --- a/core/eval.go +++ b/core/eval.go @@ -304,6 +304,19 @@ func evalLATENCY(args []string) []byte { } } +func evalSLEEP(args []string)[]byte{ + if len(args)!=1{ + return Encode(errors.New("ERR wrong number of arguments for 'SLEEP' command"),false) + } + + DurationSec,err:=strconv.ParseInt(args[0],10,64) + if err!=nil{ + return Encode(errors.New("ERR value is not an integer or out of range"),false) + } + time.Sleep(time.Duration(durationSec)*time.Second) + return RESP_OK +} + // func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{ func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{ //It's job is like depending on what job is sent to us @@ -336,6 +349,10 @@ func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{ buf.Write(evalCLIENT(cmd.Args)) case "LATENCY": buf.Write(evalLATENCY(cmd.Args)) + case "LRU": + buf.Write(evalLRU(cmd.Args)) + case "SLEEP": + buf.Write(evalSLEEP(cmd.Args)) default: buf.Write(evalPING(cmd.Args)) } From 3ed849f008ae2e10d0f0dd2f619e3acd1073f064 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:06:06 +0530 Subject: [PATCH 2/7] feat: added the logic of graceful shutdown using signals --- main.go | 36 ++++++++++++++++++++++++++++++------ 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 3119f30..f7405b4 100644 --- a/main.go +++ b/main.go @@ -4,8 +4,12 @@ package main import( "flag" "log" - +"os" +"os/signal" +"syscall" +"sync" "github.com/sharpsalt/Velox-In-Memory-Database/server" +"github.com/sharpsalt/Velox-In-Memory-Database/config" ) var config=&server.Config{} @@ -25,12 +29,32 @@ func setupFlag(){ func main(){ setupFlag() //we will setup the flags firt log.Println("hello!! is it really running") - err:=server.RunAsyncTCPServer(config) - if err!=nil{ - log.Println("Error starting server:", err) - return - } + // err:=server.RunAsyncTCPServer(config) + // if err!=nil{ + // log.Println("Error starting server:", err) + // return + // } /* I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously */ + var sigs cha os.Signal=make(chan os.Signal,1) + /* + we will listen to interrupts or signals is through a channel + so here we are creating a channel who accept of type os.Signal + + and then we are registering it to listen SIGTERM or SIGINT + */ + signal.Notify(sigs,syscall.SIGTERM,syscall.SIGINT) + var wg sync.WaitGroup + wg.Add(2) + + go server.RunAsyncTCPServer(&wg) + go server.WaitForSignal(&wg,sigs) + /* + We are spinning up 2 goroutine + + earlier we had only 1 thread, so currently we have 2thread, 1 is for actual signal(vo mera asynctcpserver) + and we will wait till completion + */ + wg.Wait() } From e63a1e4938c2a5859d620405c7f3599f2b293f24 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Fri, 24 Apr 2026 23:06:54 +0530 Subject: [PATCH 3/7] feat: Implemented WaitForSignal for signalling concept --- server/async_tcp.go | 63 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 61 insertions(+), 2 deletions(-) diff --git a/server/async_tcp.go b/server/async_tcp.go index 3a5f11e..e768665 100644 --- a/server/async_tcp.go +++ b/server/async_tcp.go @@ -13,6 +13,34 @@ import( var con_client int=0 var cronFrequency time.Duration=1*time.Second //a cron frequency of 1s var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it ran +const EngineStatus_BUSY int32=1<<2 +const EngineStatus_WAITING=1<<1 +const EngineStatus_SHUTTING_DOWN int32=1<<3 + + +var eStatus int32=EngineStatus_WAITING + +func WaitForSignal(wg *sync.WaitGroup,sigs chan os.Signal){ + defer wg.Done() + <-sigs //this is a blcoking call like until we give a channel + //we would not be moving forward + + //if servers is busy cntinue to work + for atomic.LoadInt32(&eStatus)==EngineStatus_BUSY{ + } + + //CRITICAL to hanle + //we do not want server to ever go back to BUSY State + //when control flow is here + + //immediately set the status to be SHUTTING _DOWN + //the only place where we can set the status to be SHUTTING_DONW + atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN) + + //if server is in any other statem initiate a shutdown + core.ShutDown() + os.Exit(0) +} // readCommands reads RESP commands from a file descriptor and returns them func readCommands(c core.FDComm)([]*core.RedisCmd,error){ @@ -57,6 +85,12 @@ func RunAsyncTCPServer(cfg *Config) error{ log.Println("Starting an asynchronous TCP Server on", cfg.Host, cfg.Port) // since humlog linux based system use krrhe hai so // so we are using epoll + defer wg.Done() + defer func(){ + atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN) + }() + + log.Println("Starting an asynchronous TCP Server on ",config.Host,config.Port) max_clients := 20000 //create EPOLL Event Objects to hold events @@ -137,7 +171,7 @@ func RunAsyncTCPServer(cfg *Config) error{ return err } - for{ + for atomic.LoadInt32(&eStatus)!=EngineStatus_SHUTTING_DOWN{ //eralier we had infinite for loop so now we will chek it until the server is not shutting down /* The first thing which we do to execute this cron @@ -155,6 +189,10 @@ func RunAsyncTCPServer(cfg *Config) error{ core.DeleteExpiredKey() lastCronExecTime = time.Now() } + //Say,the Engine triggered SHUTTING down when the control flow is here-> + //Current: Engine status==WAITING + //Update: Engine status==SHUTTING_DOWN + //Then we have to exit (handled in Signal handler) @@ -162,13 +200,28 @@ func RunAsyncTCPServer(cfg *Config) error{ //that why i am invoking EPOLL wait //see if any FD is ready for an IO - nevents, e := syscall.EpollWait(epollFD, events[:], -1) + nevents, e := syscall.EpollWait(epollFD, events[:], -1) //EpollWait mtlb some file descriptor is ready for IO //EpollWait will monitor if any IO is ready, and put it in buffer(evenets buffer), if none is there then the call wouldget blocked if e != nil{ continue } + //Here, we do not want server to go back from SHUTTING_DOWN + //to BUSY + //If the engine status==SHUTTING_DOWN over here-> we have to exit + //hence the only legal transition is from WAITING to BUSY + //if that does not happen then we can exit + + //mark engine as BUSY only when it is in the waiting stats + if !atomic.CompareAndSwapInt32(&eStatus,EngineStatus_WAITING,EngineStatus_BUSY){ + //if swap unsuccessful then the exitsing status is not WAITING, but something + switch eStatus{ + case EngineStatus_SHUTTING_DOWN: + return nil + } + } + for i := 0; i < nevents; i++{ //if the socket server itself is ready for an IO if int(events[i].Fd) == serverFD{//each events has a File descriptors @@ -204,7 +257,13 @@ func RunAsyncTCPServer(cfg *Config) error{ respond(cmds, comm) } } + + //mark engine as WAITING + //no contention as the signal handler is blocked until + //the engine is BUSY + atomic.StoreInt32(&eStatus Date: Fri, 24 Apr 2026 23:07:22 +0530 Subject: [PATCH 4/7] feat: Implemented ShutDown --- core/events.go | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 core/events.go diff --git a/core/events.go b/core/events.go new file mode 100644 index 0000000..f591a9d --- /dev/null +++ b/core/events.go @@ -0,0 +1,6 @@ +package core + +func Shutdown(){ + //this function internally invokes BGREWRITEAOF, it would take the in-memory hash table and dump in it AOF File(Append only file) + evalBGREWRITEAOF([]string{}) +} \ No newline at end of file From 550785a7e738b17d4a0da36b62e6fd5354eefa98 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Sat, 25 Apr 2026 01:09:44 +0530 Subject: [PATCH 5/7] feat: Implemented TxnBegin, TxnExec, TxnDiscard, TxnQueue --- core/comm.go | 68 +++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/core/comm.go b/core/comm.go index 9124ded..2dbfecb 100644 --- a/core/comm.go +++ b/core/comm.go @@ -1,21 +1,73 @@ package core -import "syscall" +import ( + "bytes" + "syscall" + "io" + "fmt" +) -//basically we reimplemented read and write interface -type FDComm struct{ - Fd int +type Client struct{ + io.ReadWriter + Fd int + enqueue RedisCmds + isTxn bool } +//basically we reimplemented read and write interface +// type FDComm struct{ +// Fd int +// } +/*this this FDComm which we have previously written is now changed to client +it has still same interface , since we don;t have to just read/write , we also do have to enqueue command , also i ahve added ki wo command transaction hai ki nahi + +so any client is connected to this then a unique object will be created for this client +*/ + //Since we have no socket connection so i have to make system call of write over FD with the byest that i have got -func (f FDComm) Write(b []byte)(int,error){ - return syscall.Write(f.Fd,b) +func (c Client) Write(b []byte)(int,error){ + return syscall.Write(c.Fd,b) } //everything else remains the same -func (f FDComm) Read(b []byte)(int,error){ - return syscall.Read(f.Fd,b) +func (c Client) Read(b []byte)(int,error){ + return syscall.Read(c.Fd,b) } +func (c *Client) TxnBegin(){ + c.isTxn=true +} + +func (c *Client) TxnExec() []byte{ + var out []byte + buf:=bytes.NewBuffer(out) + + buf.WriteString(frm.Sprintf("*%d\r\n",len(c.cqueue))) + + for _,_cmd:=range c.cqueue{ + buf.Write(executeCommand(_cmd,c)) + } + + c.cqueue=make(RedisCmd,0) + c.isTxn=false + return buf.Bytes() +} + +func (c *Client) TxnDiscard(){ + c.cqueue=make(RedisCmds,0) + c.isTxn=false +} + +func (c *Client) TxnQueue(cmd *RedisCmd){ + c.cqueue=append(c.cqueue,cmd) +} + +func NewClient(fd int) *Client{ + return &Client{ + Fd: fd, + enqueue: amke(Rediscmds,0) + } +} + From 9eaf95bc26017db2becd69f131476f7781ab06b6 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Sat, 25 Apr 2026 01:10:44 +0530 Subject: [PATCH 6/7] feat: Implemented evalMULTI and added cases for transaction based EXEC, and DISCARD --- core/eval.go | 105 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/core/eval.go b/core/eval.go index 4802c09..d74f05b 100644 --- a/core/eval.go +++ b/core/eval.go @@ -17,6 +17,13 @@ var RESP_ZERO []byte=[]byte(":0\r\n") var RESP_ONE []byte=[]byte(":1\r\n") var RESP_MINUS_1 []byte=[]byte(":-1\r\n") var RESP_MINUS_2 []byte=[]byte(":-2\r\n") +var RESP_QUEUED []byte=[]byte("+QUEUED\r\n") + +var txnCommands map[string]bool + +func init(){ + txnCommands=map[string]bool{"EXEC":true,"DISCARD":true} +} // func evalPING(args []string,c net.Conn)error{ func evalPING(args []string) []byte{ @@ -317,50 +324,94 @@ func evalSLEEP(args []string)[]byte{ return RESP_OK } -// func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{ -func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{ +func evalMULTI(agrs []string)[]byte{ + return RESP_OK +} + + +func executeCommand(cmds *RedisCmd, c *Client) []byte{ //It's job is like depending on what job is sent to us //we trigger the corresponding eval function - - var response []byte - buf := bytes.NewBuffer(response) // this is where we are buffering all - //our logic didn't chnaged, but the way we are consuming has changed - for _, cmd := range cmds{ switch cmd.Cmd{ case "PING": - buf.Write(evalPING(cmd.Args)) + evalPING(cmd.Args) case "SET": - buf.Write(evalSET(cmd.Args)) + evalSET(cmd.Args) case "GET": - buf.Write(evalGET(cmd.Args)) + evalGET(cmd.Args) case "TTL": - buf.Write(evalTTL(cmd.Args)) + evalTTL(cmd.Args) case "DEL": - buf.Write(evalDEL(cmd.Args)) + evalDEL(cmd.Args) case "EXPIRE": - buf.Write(evalEXPIRE(cmd.Args)) + evalEXPIRE(cmd.Args) case "BGREWRITEAOF": - buf.Write(evalBGREWRITEAOF(cmd.Args)) + evalBGREWRITEAOF(cmd.Args) case "INCR": - buf.Write(evalINCR(cmd.Args)) + evalINCR(cmd.Args) case "INFO": - buf.Write(evalINFO(cmd.Args)) + evalINFO(cmd.Args) case "CLIENT": - buf.Write(evalCLIENT(cmd.Args)) + evalCLIENT(cmd.Args) case "LATENCY": - buf.Write(evalLATENCY(cmd.Args)) + evalLATENCY(cmd.Args) case "LRU": - buf.Write(evalLRU(cmd.Args)) + evalLRU(cmd.Args) case "SLEEP": - buf.Write(evalSLEEP(cmd.Args)) + evalSLEEP(cmd.Args) + case "MULTI": + c.TxnBegin() + return evalMULTI(cmd.Args) + case "EXEC": + if !c.isTxn{ + return Encode(errors.New("ERR EXEC without MULTI"),false) + } + return c.TxnExec() + case "DISCARD": + if !c.isTxn{ + return Encode(errors.New("ERR DISCARD without MULTI"),false) + } + c.TxnDiscard() + return RESP_OK default: - buf.Write(evalPING(cmd.Args)) + return evalPING(cmd.Args) + } +} + +func executeCommandToBuffer(cmd *RedisCmd,buf *bytes.Buffer,c *Client){ + buf.Write(executeCommand(cmd,c)) +} + +// func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{ +func EvalAndRespond(cmds []*RedisCmd, c Client){ + //It's job is like depending on what job is sent to us + //we trigger the corresponding eval function + + var response []byte + buf := bytes.NewBuffer(response) // this is where we are buffering all + //our logic didn't chnaged, but the way we are consuming has changed + for _, cmd := range cmds{ + //if txn is not in progress , then we ca simply + //execute the command and add the response to the buffer + if !c.isTxn{ + executeCommandToBuffer(cmd,buf,c) + continue + } + + //if the txn is in progress, we enqueue the command + //and add the queued response to the buffer + if !txnCommands[cmd.Cmd]{ + //if the command is queuabe the enqueue + c.TxnQueue(cmd) + buf.Write(RESP_QUEUED) + }else{ + //if txn is active and the command is non-queuable + //ex: EXEC,DISCARD + //we execute the command and gather the response in buffer + executeCommandToBuffer(cmd,buf,c) } - /* - Earlier we used to return and like we used to pass io.ReadWriter but now instead of that the eval function that we ahev si returning the output - but here we are putting it in buffer - */ } - _, err := c.Write(buf.Bytes()) - return err + // _, err := c.Write(buf.Bytes()) + // return err + c.Write(buf.Bytes()) } From 956fde67a2d45d9411815498969ef78506afb305 Mon Sep 17 00:00:00 2001 From: Srijan Verma Date: Sat, 25 Apr 2026 01:11:41 +0530 Subject: [PATCH 7/7] feat: map of object for connected Client --- server/async_tcp.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/server/async_tcp.go b/server/async_tcp.go index e768665..615d0d7 100644 --- a/server/async_tcp.go +++ b/server/async_tcp.go @@ -16,9 +16,18 @@ var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it const EngineStatus_BUSY int32=1<<2 const EngineStatus_WAITING=1<<1 const EngineStatus_SHUTTING_DOWN int32=1<<3 +const EngineStatus_TRANSACTION int32=1<<4 var eStatus int32=EngineStatus_WAITING +var connectedClients map[int]*core.Client //here we decalred the object +/** +Every Time when a client is getting connected to us, we will get a file descriptor, this key is that particular fiile descriptor +**/ + +func init(){ + connectedClients=make(map[int]*core.Client) //here we added memory to that +} func WaitForSignal(wg *sync.WaitGroup,sigs chan os.Signal){ defer wg.Done() @@ -235,7 +244,8 @@ func RunAsyncTCPServer(cfg *Config) error{ //increase the number of concurrent clients count con_client++ - syscall.SetNonblock(fd, true) + connectedClients[fd]=core.NewClient(fd) + syscall.SetNonblock(serverFD, true) //add this new TCP connetion to be monitored var socketClientEvent syscall.EpollEvent = syscall.EpollEvent{ @@ -247,11 +257,16 @@ func RunAsyncTCPServer(cfg *Config) error{ } }else{ //if it is not my server which means that some client that is already connected to the server, then do somthing - comm := core.FDComm{Fd: int(events[i].Fd)} + // comm := core.FDComm{Fd: int(events[i].Fd)} + comm:=connectedClients[int(events[i].Fd)] + if comm==nil{ + continue + } cmds, err := readCommands(comm) //instead of passing 1 command, we will pass many commands if err != nil{ syscall.Close(int(events[i].Fd)) con_client -= 1 + delete(connectedClients,int(events[i].Fd)) continue } respond(cmds, comm)