Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 0 additions & 12 deletions core/events.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,6 @@
package core

import "log"

// InitStore replays the AOF file on startup to restore persisted data
// Call this once before accepting any client connections
func InitStore() {
log.Println("Initializing store — replaying AOF if available...")
ReplayAOF()
log.Println("Store initialized, total keys:", StoreLen())
}

func Shutdown(){
//this function internally invokes BGREWRITEAOF, it would take the in-memory hash table and dump in it AOF File(Append only file)
log.Println("Shutting down — dumping AOF...")
evalBGREWRITEAOF([]string{})
log.Println("Shutdown complete")
}
9 changes: 9 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@
package main

import(
"flag"
"log"
"os"
"os/signal"
"syscall"
"sync"
"github.com/sharpsalt/Velox-In-Memory-Database/server"
"github.com/sharpsalt/Velox-In-Memory-Database/config"
"flag"
"log"
"os"
Expand Down Expand Up @@ -40,6 +48,7 @@ func main(){
/*
I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously
*/
var sigs cha os.Signal=make(chan os.Signal,1)
var sigs chan os.Signal=make(chan os.Signal,1)
/*
we will listen to interrupts or signals is through a channel
Expand Down
32 changes: 32 additions & 0 deletions server/async_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@
var cronFrequency time.Duration=1*time.Second //a cron frequency of 1s
var lastCronExecTime time.Time=time.Now() //and we are maintaining, last time it ran
const EngineStatus_BUSY int32=1<<2
const EngineStatus_WAITING=1<<1
const EngineStatus_SHUTTING_DOWN int32=1<<3


var eStatus int32=EngineStatus_WAITING

func WaitForSignal(wg *sync.WaitGroup,sigs chan os.Signal){
defer wg.Done()
<-sigs //this is a blcoking call like until we give a channel
//we would not be moving forward

//if servers is busy cntinue to work
for atomic.LoadInt32(&eStatus)==EngineStatus_BUSY{
}

//CRITICAL to hanle
//we do not want server to ever go back to BUSY State
//when control flow is here

//immediately set the status to be SHUTTING _DOWN
//the only place where we can set the status to be SHUTTING_DONW
atomic.StoreInt32(&eStatus,EngineStatus_SHUTTING_DOWN)

//if server is in any other statem initiate a shutdown
core.ShutDown()
os.Exit(0)
}
const EngineStatus_WAITING int32=1<<1
const EngineStatus_SHUTTING_DOWN int32=1<<3
const EngineStatus_TRANSACTION int32=1<<4
Expand Down Expand Up @@ -212,6 +239,8 @@
//that why i am invoking EPOLL wait

//see if any FD is ready for an IO
nevents, e := syscall.EpollWait(epollFD, events[:], -1) //EpollWait mtlb some file descriptor is ready for IO
//EpollWait will monitor if any IO is ready, and put it in buffer(evenets buffer), if none is there then the call wouldget blocked
// #10 FIX: 100ms timeout instead of -1 (blocking forever)
// With -1, the cron job (DeleteExpiredKey) would NEVER run because
// EpollWait blocks until an IO event arrives. With 100ms timeout,
Expand Down Expand Up @@ -283,9 +312,12 @@
//mark engine as WAITING
//no contention as the signal handler is blocked until
//the engine is BUSY
atomic.StoreInt32(&eStatus<EngineStatus_WAITING)
}
return nil
atomic.StoreInt32(&eStatus, EngineStatus_WAITING)
}
return nil

Check failure on line 320 in server/async_tcp.go

View workflow job for this annotation

GitHub Actions / build

syntax error: non-declaration statement outside function body
}

// respondAsync calls EvalAndRespond for the async server path
Expand Down
Loading