Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type GossipStore interface {

// Remove a node from the database
RemoveNode(types.NodeId) error

// RegisterKey registers a key and an empty object which it can
// use to decode data received on wire
RegisterKey(types.StoreKey, interface{}) error
}

type Gossiper interface {
Expand Down
104 changes: 74 additions & 30 deletions proto/gossip_delegates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proto

import (
"bytes"
"encoding/gob"
"encoding/json"
"fmt"
"sync"
Expand Down Expand Up @@ -31,6 +30,8 @@ type GossipDelegate struct {
quorumTimeout time.Duration
timeoutVersion uint64
timeoutVersionLock sync.Mutex
// userNewHeaderFormat only used for testing backward compatibility
useNewHeaderFormat bool
}

func (gd *GossipDelegate) InitGossipDelegate(
Expand All @@ -42,6 +43,12 @@ func (gd *GossipDelegate) InitGossipDelegate(
) {
gd.GenNumber = genNumber
gd.nodeId = string(selfNodeId)
// ** Used only for testing backward compatibility
// TODO: Remove!
if gossipVersion == types.GOSSIP_TEST_VERSION {
gossipVersion = types.DEFAULT_GOSSIP_VERSION
gd.useNewHeaderFormat = true
}
gd.stateEvent = make(chan types.StateEvent)
// We start with a NOT_IN_QUORUM status
gd.InitStore(
Expand All @@ -68,24 +75,20 @@ func (gd *GossipDelegate) updateGossipTs() {
gd.lastGossipTs = time.Now()
}

func (gd *GossipDelegate) convertToBytes(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(obj)
if err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
func (gd *GossipDelegate) decodeGossipPacket(buf []byte, msg interface{}) error {
// Strip off the header
payload := buf[types.GOSSIP_HEADER_LENGTH:]
return gd.convertFromBytes(payload, msg)
}

func (gd *GossipDelegate) convertFromBytes(buf []byte, msg interface{}) error {
msgBuffer := bytes.NewBuffer(buf)
dec := gob.NewDecoder(msgBuffer)
err := dec.Decode(msg)
func (gd *GossipDelegate) encodeGossipPacket(obj interface{}) ([]byte, error) {
payload, err := gd.convertToBytes(obj)
if err != nil {
return err
return nil, err
}
return nil
header := make([]byte, types.GOSSIP_HEADER_LENGTH)
header[types.GH_VERSION_POS] = byte(types.GH_VERSION_1)
return append(header, payload...), nil
}

func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error {
Expand Down Expand Up @@ -121,7 +124,6 @@ func (gd *GossipDelegate) gossipChecks(node *memberlist.Node) error {
return err
}


// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
Expand Down Expand Up @@ -163,21 +165,42 @@ func (gd *GossipDelegate) GetBroadcasts(overhead, limit int) [][]byte {
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
func (gd *GossipDelegate) LocalState(join bool) []byte {
var (
err error
byteLocalState []byte
)

gd.updateSelfTs()

// We don't know which node we are talking to.
gs := NewGossipSessionInfo("", types.GD_ME_TO_PEER)
gs.Op = types.LocalPush

// We send our local state of nodeMap
// The receiver will decide which nodes to merge and which to ignore
localState := gd.GetLocalState()
byteLocalState, err := gd.convertToBytes(&localState)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error())
logrus.Infof(gs.Err)
byteLocalState = []byte{}
// This flag added only for testing backward compatibility
// FUTURE USE: We will always use the new header pattern
// Sending and Receiving NodeInfoMap over the wire will be deprecated
if gd.useNewHeaderFormat {
// We send our local state of nodeMap
// The receiver will decide which nodes to merge and which to ignore
localState := gd.GetGossipData()
byteLocalState, err = gd.encodeGossipPacket(&localState)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error())
logrus.Infof(gs.Err)
byteLocalState = []byte{}
}
} else {
// We send our local state of nodeMap
// The receiver will decide which nodes to merge and which to ignore
localState := gd.GetLocalState()
byteLocalState, err = gd.convertToBytes(&localState)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in LocalState. Unable to unmarshal: %v", err.Error())
logrus.Infof(gs.Err)
byteLocalState = []byte{}
}
}

gs.Err = ""
gd.updateGossipTs()
gd.history.AddLatest(gs)
Expand All @@ -190,20 +213,31 @@ func (gd *GossipDelegate) LocalState(join bool) []byte {
// boolean indicates this is for a join instead of a push/pull.
func (gd *GossipDelegate) MergeRemoteState(buf []byte, join bool) {
var remoteState types.NodeInfoMap
var remoteGossipData types.GossipDataMap
if join == true {
// NotifyJoin will take care of this info
return
}
gd.updateSelfTs()

gs := NewGossipSessionInfo("", types.GD_PEER_TO_ME)
err := gd.convertFromBytes(buf, &remoteState)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error())
logrus.Infof(gs.Err)
version := gd.handleGossipHeader(buf)
// This flag added only for testing backward compatibility
// FUTURE USE: We will always use the new header pattern
// Sending and Receiving NodeInfoMap over the wire will be deprecated
if version == types.GH_VERSION_BASE {
err := gd.convertFromBytes(buf, &remoteState)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error())
}
gd.Update(remoteState)
} else {
err := gd.decodeGossipPacket(buf, &remoteGossipData)
if err != nil {
gs.Err = fmt.Sprintf("gossip: Error in unmarshalling peer's local data. Error : %v", err.Error())
}
gd.UpdateGossipData(remoteGossipData)
}

gd.Update(remoteState)
gs.Op = types.MergeRemote
gs.Err = ""
gd.updateGossipTs()
Expand Down Expand Up @@ -361,3 +395,13 @@ func (gd *GossipDelegate) handleStateEvents() {
gd.UpdateSelfStatus(gd.currentState.NodeStatus())
}
}

func (gd *GossipDelegate) handleGossipHeader(packet []byte) types.GossipHeaderVersion {
headerV1 := make([]byte, types.GOSSIP_HEADER_LENGTH)
headerV1[types.GH_VERSION_POS] = byte(types.GH_VERSION_1)
if bytes.HasPrefix(packet, headerV1) {
return types.GH_VERSION_1
} else {
return types.GH_VERSION_BASE
}
}
108 changes: 105 additions & 3 deletions proto/gossip_store.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package proto

import (
"bytes"
"encoding/gob"
"fmt"
"github.com/libopenstorage/gossip/types"
"sync"
"time"

"github.com/libopenstorage/gossip/types"
)

const (
Expand All @@ -28,6 +29,9 @@ type GossipStoreImpl struct {
clusterSize int
// Ts at which we lost quorum
lostQuorumTs time.Time
// Map of registered keys and their repective types
// We only decode those keys which are registered with us.
typeMap types.TypeMap
}

func NewGossipStore(id types.NodeId, version, clusterId string) *GossipStoreImpl {
Expand Down Expand Up @@ -59,6 +63,7 @@ func (s *GossipStoreImpl) InitStore(
clusterId string,
) {
s.nodeMap = make(types.NodeInfoMap)
s.typeMap = make(types.TypeMap)
s.id = id
s.selfCorrect = true
s.GossipVersion = version
Expand Down Expand Up @@ -86,6 +91,7 @@ func (s *GossipStoreImpl) UpdateSelf(key types.StoreKey, val interface{}) {
s.Lock()
defer s.Unlock()

s.RegisterKey(key, val)
nodeInfo, _ := s.nodeMap[s.id]
nodeInfo.Value[key] = val
nodeInfo.LastUpdateTs = time.Now()
Expand Down Expand Up @@ -223,7 +229,7 @@ func (s *GossipStoreImpl) MetaInfo() types.NodeMetaInfo {
LastUpdateTs: selfNodeInfo.LastUpdateTs,
GenNumber: selfNodeInfo.GenNumber,
GossipVersion: s.GossipVersion,
ClusterId: s.ClusterId,
ClusterId: s.ClusterId,
}
return nodeMetaInfo
}
Expand All @@ -238,6 +244,28 @@ func (s *GossipStoreImpl) GetLocalState() types.NodeInfoMap {
return localCopy
}

func (s *GossipStoreImpl) GetGossipData() types.GossipDataMap {
s.Lock()
defer s.Unlock()
localCopy := make(types.GossipDataMap)
for id, nodeInfo := range s.nodeMap {
gossipData := types.GossipData{
Id: nodeInfo.Id,
GenNumber: nodeInfo.GenNumber,
LastUpdateTs: nodeInfo.LastUpdateTs,
WaitForGenUpdateTs: nodeInfo.WaitForGenUpdateTs,
Status: nodeInfo.Status,
}
gossipData.Value = make(map[string][]byte)
for key, val := range nodeInfo.Value {
bt, _ := s.convertToBytes(&val)
gossipData.Value[string(key)] = bt
}
localCopy[id] = gossipData
}
return localCopy
}

func (s *GossipStoreImpl) GetLocalNodeInfo(id types.NodeId) (types.NodeInfo, error) {
s.Lock()
defer s.Unlock()
Expand Down Expand Up @@ -274,6 +302,60 @@ func (s *GossipStoreImpl) Update(diff types.NodeInfoMap) {
}
}

func (s *GossipStoreImpl) UpdateGossipData(diff types.GossipDataMap) {
s.Lock()
defer s.Unlock()

for id, nodeGossipData := range diff {
if id == s.id {
continue
}
selfValue, ok := s.nodeMap[id]
if !ok {
// We got an update for a node which we do not have in our map
// Lets add it with an offline state
selfValue.Status = types.NODE_STATUS_DOWN
}
if !ok || !statusValid(selfValue.Status) ||
selfValue.LastUpdateTs.Before(nodeGossipData.LastUpdateTs) {
// Our view of Status of a Node, should only be determined by memberlist.
// We should not update the Status field in our nodeInfo based on what other node's
// value is.
nodeGossipData.Status = selfValue.Status
nodeInfo := types.NodeInfo{
Id: nodeGossipData.Id,
GenNumber: nodeGossipData.GenNumber,
LastUpdateTs: nodeGossipData.LastUpdateTs,
WaitForGenUpdateTs: nodeGossipData.WaitForGenUpdateTs,
Status: nodeGossipData.Status,
}
storeMap := make(types.StoreMap)
for key, value := range nodeGossipData.Value {
typeVal, ok := s.typeMap[types.StoreKey(key)]
if !ok {
// Unknown store key ignoring..
continue
}
intf := typeVal
s.convertFromBytes(value, &intf)
storeMap[types.StoreKey(key)] = intf
}
nodeInfo.Value = storeMap
s.nodeMap[id] = nodeInfo
}
}
}

func (s *GossipStoreImpl) RegisterKey(key types.StoreKey, value interface{}) error {
if _, ok := s.typeMap[key]; ok {
// Key already exists
return fmt.Errorf("Key already exists")
}
gob.Register(value)
s.typeMap[key] = value
return nil
}

func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) {
removeNodeIds := []types.NodeId{}
addNodeIds := []types.NodeId{}
Expand Down Expand Up @@ -320,3 +402,23 @@ func (s *GossipStoreImpl) updateCluster(peers map[types.NodeId]string) {
func (s *GossipStoreImpl) getClusterSize() int {
return s.clusterSize
}

func (s *GossipStoreImpl) convertToBytes(obj interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(obj)
if err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}

func (s *GossipStoreImpl) convertFromBytes(buf []byte, msg interface{}) error {
msgBuffer := bytes.NewBuffer(buf)
dec := gob.NewDecoder(msgBuffer)
err := dec.Decode(msg)
if err != nil {
return err
}
return nil
}
Loading