diff --git a/core/events.go b/core/events.go index 85a7dbb..f591a9d 100644 --- a/core/events.go +++ b/core/events.go @@ -1,18 +1,6 @@ package core -import "log" - -// InitStore replays the AOF file on startup to restore persisted data -// Call this once before accepting any client connections -func InitStore() { - log.Println("Initializing store — replaying AOF if available...") - ReplayAOF() - log.Println("Store initialized, total keys:", StoreLen()) -} - func Shutdown(){ //this function internally invokes BGREWRITEAOF, it would take the in-memory hash table and dump in it AOF File(Append only file) - log.Println("Shutting down — dumping AOF...") evalBGREWRITEAOF([]string{}) - log.Println("Shutdown complete") } \ No newline at end of file diff --git a/main.go b/main.go index 9d5bfd9..fb5b46e 100644 --- a/main.go +++ b/main.go @@ -2,6 +2,14 @@ package main import( +"flag" +"log" +"os" +"os/signal" +"syscall" +"sync" +"github.com/sharpsalt/Velox-In-Memory-Database/server" +"github.com/sharpsalt/Velox-In-Memory-Database/config" "flag" "log" "os" @@ -40,6 +48,7 @@ func main(){ /* I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously */ + var sigs cha os.Signal=make(chan os.Signal,1) var sigs chan os.Signal=make(chan os.Signal,1) /* we will listen to interrupts or signals is through a channel diff --git a/server/async_tcp.go b/server/async_tcp.go index 1b8de6e..5746b79 100644 --- a/server/async_tcp.go +++ b/server/async_tcp.go @@ -18,6 +18,33 @@ var con_client int=0 var cronFrequency time.Duration=1*time.Second //a cron frequency of 1s var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it ran const EngineStatus_BUSY int32=1<<2 +const EngineStatus_WAITING=1<<1 +const EngineStatus_SHUTTING_DOWN int32=1<<3 + + +var eStatus int32=EngineStatus_WAITING + +func WaitForSignal(wg *sync.WaitGroup,sigs chan os.Signal){ + defer wg.Done() + <-sigs //this is a blcoking call like until we give a channel + //we would not be moving forward + + //if servers is busy cntinue to work + for atomic.LoadInt32(&eStatus)==EngineStatus_BUSY{ + } + + //CRITICAL to hanle + //we do not want server to ever go back to BUSY State + //when control flow is here + + //immediately set the status to be SHUTTING _DOWN + //the only place where we can set the status to be SHUTTING_DONW + atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN) + + //if server is in any other statem initiate a shutdown + core.ShutDown() + os.Exit(0) +} const EngineStatus_WAITING int32=1<<1 const EngineStatus_SHUTTING_DOWN int32=1<<3 const EngineStatus_TRANSACTION int32=1<<4 @@ -212,6 +239,8 @@ func RunAsyncTCPServer(wg *sync.WaitGroup) error{ //that why i am invoking EPOLL wait //see if any FD is ready for an IO + nevents, e := syscall.EpollWait(epollFD, events[:], -1) //EpollWait mtlb some file descriptor is ready for IO + //EpollWait will monitor if any IO is ready, and put it in buffer(evenets buffer), if none is there then the call wouldget blocked // #10 FIX: 100ms timeout instead of -1 (blocking forever) // With -1, the cron job (DeleteExpiredKey) would NEVER run because // EpollWait blocks until an IO event arrives. With 100ms timeout, @@ -283,6 +312,9 @@ func RunAsyncTCPServer(wg *sync.WaitGroup) error{ //mark engine as WAITING //no contention as the signal handler is blocked until //the engine is BUSY + atomic.StoreInt32(&eStatus