Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 60 additions & 8 deletions core/comm.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,73 @@
package core

import "syscall"
import (
"bytes"
"syscall"
"io"
"fmt"
)

//basically we reimplemented read and write interface
type FDComm struct{
Fd int
type Client struct{
io.ReadWriter
Fd int
enqueue RedisCmds
isTxn bool
}

//basically we reimplemented read and write interface
// type FDComm struct{
// Fd int
// }
/*this this FDComm which we have previously written is now changed to client
it has still same interface , since we don;t have to just read/write , we also do have to enqueue command , also i ahve added ki wo command transaction hai ki nahi

so any client is connected to this then a unique object will be created for this client
*/

//Since we have no socket connection so i have to make system call of write over FD with the byest that i have got
func (f FDComm) Write(b []byte)(int,error){
return syscall.Write(f.Fd,b)
func (c Client) Write(b []byte)(int,error){
return syscall.Write(c.Fd,b)
}
//everything else remains the same

func (f FDComm) Read(b []byte)(int,error){
return syscall.Read(f.Fd,b)
func (c Client) Read(b []byte)(int,error){
return syscall.Read(c.Fd,b)
}


func (c *Client) TxnBegin(){
c.isTxn=true
}

func (c *Client) TxnExec() []byte{
var out []byte
buf:=bytes.NewBuffer(out)

buf.WriteString(frm.Sprintf("*%d\r\n",len(c.cqueue)))

for _,_cmd:=range c.cqueue{
buf.Write(executeCommand(_cmd,c))
}

c.cqueue=make(RedisCmd,0)
c.isTxn=false
return buf.Bytes()
}

func (c *Client) TxnDiscard(){
c.cqueue=make(RedisCmds,0)
c.isTxn=false
}

func (c *Client) TxnQueue(cmd *RedisCmd){
c.cqueue=append(c.cqueue,cmd)
}

func NewClient(fd int) *Client{
return &Client{
Fd: fd,
enqueue: amke(Rediscmds,0)
}
}


118 changes: 93 additions & 25 deletions core/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ var RESP_ZERO []byte=[]byte(":0\r\n")
var RESP_ONE []byte=[]byte(":1\r\n")
var RESP_MINUS_1 []byte=[]byte(":-1\r\n")
var RESP_MINUS_2 []byte=[]byte(":-2\r\n")
var RESP_QUEUED []byte=[]byte("+QUEUED\r\n")

var txnCommands map[string]bool

func init(){
txnCommands=map[string]bool{"EXEC":true,"DISCARD":true}
}

// func evalPING(args []string,c net.Conn)error{
func evalPING(args []string) []byte{
Expand Down Expand Up @@ -304,46 +311,107 @@ func evalLATENCY(args []string) []byte {
}
}

// func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{
func EvalAndRespond(cmds []*RedisCmd, c io.ReadWriter) error{
func evalSLEEP(args []string)[]byte{
if len(args)!=1{
return Encode(errors.New("ERR wrong number of arguments for 'SLEEP' command"),false)
}

DurationSec,err:=strconv.ParseInt(args[0],10,64)
if err!=nil{
return Encode(errors.New("ERR value is not an integer or out of range"),false)
}
time.Sleep(time.Duration(durationSec)*time.Second)
return RESP_OK
}

func evalMULTI(agrs []string)[]byte{
return RESP_OK
}


func executeCommand(cmds *RedisCmd, c *Client) []byte{
//It's job is like depending on what job is sent to us
//we trigger the corresponding eval function

var response []byte
buf := bytes.NewBuffer(response) // this is where we are buffering all
//our logic didn't chnaged, but the way we are consuming has changed
for _, cmd := range cmds{
switch cmd.Cmd{
case "PING":
buf.Write(evalPING(cmd.Args))
evalPING(cmd.Args)
case "SET":
buf.Write(evalSET(cmd.Args))
evalSET(cmd.Args)
case "GET":
buf.Write(evalGET(cmd.Args))
evalGET(cmd.Args)
case "TTL":
buf.Write(evalTTL(cmd.Args))
evalTTL(cmd.Args)
case "DEL":
buf.Write(evalDEL(cmd.Args))
evalDEL(cmd.Args)
case "EXPIRE":
buf.Write(evalEXPIRE(cmd.Args))
evalEXPIRE(cmd.Args)
case "BGREWRITEAOF":
buf.Write(evalBGREWRITEAOF(cmd.Args))
evalBGREWRITEAOF(cmd.Args)
case "INCR":
buf.Write(evalINCR(cmd.Args))
evalINCR(cmd.Args)
case "INFO":
buf.Write(evalINFO(cmd.Args))
evalINFO(cmd.Args)
case "CLIENT":
buf.Write(evalCLIENT(cmd.Args))
evalCLIENT(cmd.Args)
case "LATENCY":
buf.Write(evalLATENCY(cmd.Args))
evalLATENCY(cmd.Args)
case "LRU":
evalLRU(cmd.Args)
case "SLEEP":
evalSLEEP(cmd.Args)
case "MULTI":
c.TxnBegin()
return evalMULTI(cmd.Args)
case "EXEC":
if !c.isTxn{
return Encode(errors.New("ERR EXEC without MULTI"),false)
}
return c.TxnExec()
case "DISCARD":
if !c.isTxn{
return Encode(errors.New("ERR DISCARD without MULTI"),false)
}
c.TxnDiscard()
return RESP_OK
default:
buf.Write(evalPING(cmd.Args))
return evalPING(cmd.Args)
}
}

func executeCommandToBuffer(cmd *RedisCmd,buf *bytes.Buffer,c *Client){
buf.Write(executeCommand(cmd,c))
}

// func EvalAndRespond(cmd *Rediscmd,c net.Conn)error{
func EvalAndRespond(cmds []*RedisCmd, c Client){
//It's job is like depending on what job is sent to us
//we trigger the corresponding eval function

var response []byte
buf := bytes.NewBuffer(response) // this is where we are buffering all
//our logic didn't chnaged, but the way we are consuming has changed
for _, cmd := range cmds{
//if txn is not in progress , then we ca simply
//execute the command and add the response to the buffer
if !c.isTxn{
executeCommandToBuffer(cmd,buf,c)
continue
}

//if the txn is in progress, we enqueue the command
//and add the queued response to the buffer
if !txnCommands[cmd.Cmd]{
//if the command is queuabe the enqueue
c.TxnQueue(cmd)
buf.Write(RESP_QUEUED)
}else{
//if txn is active and the command is non-queuable
//ex: EXEC,DISCARD
//we execute the command and gather the response in buffer
executeCommandToBuffer(cmd,buf,c)
}
/*
Earlier we used to return and like we used to pass io.ReadWriter but now instead of that the eval function that we ahev si returning the output
but here we are putting it in buffer
*/
}
_, err := c.Write(buf.Bytes())
return err
// _, err := c.Write(buf.Bytes())
// return err
c.Write(buf.Bytes())
}
6 changes: 6 additions & 0 deletions core/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package core

func Shutdown(){
//this function internally invokes BGREWRITEAOF, it would take the in-memory hash table and dump in it AOF File(Append only file)
evalBGREWRITEAOF([]string{})
}
36 changes: 30 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@ package main
import(
"flag"
"log"

"os"
"os/signal"
"syscall"
"sync"
"github.com/sharpsalt/Velox-In-Memory-Database/server"
"github.com/sharpsalt/Velox-In-Memory-Database/config"
)

var config=&server.Config{}
Expand All @@ -25,12 +29,32 @@ func setupFlag(){
func main(){
setupFlag() //we will setup the flags firt
log.Println("hello!! is it really running")
err:=server.RunAsyncTCPServer(config)
if err!=nil{
log.Println("Error starting server:", err)
return
}
// err:=server.RunAsyncTCPServer(config)
// if err!=nil{
// log.Println("Error starting server:", err)
// return
// }
/*
I will be running Synchronous TCP Server means i iwll be starting the TCP connection on give port synchronously
*/
var sigs cha os.Signal=make(chan os.Signal,1)
/*
we will listen to interrupts or signals is through a channel
so here we are creating a channel who accept of type os.Signal

and then we are registering it to listen SIGTERM or SIGINT
*/
signal.Notify(sigs,syscall.SIGTERM,syscall.SIGINT)
var wg sync.WaitGroup
wg.Add(2)

go server.RunAsyncTCPServer(&wg)
go server.WaitForSignal(&wg,sigs)
/*
We are spinning up 2 goroutine

earlier we had only 1 thread, so currently we have 2thread, 1 is for actual signal(vo mera asynctcpserver)
and we will wait till completion
*/
wg.Wait()
}
Loading
Loading