Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 125 additions & 0 deletions api/labels/labels.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
// Copyright (c) 2022 Wireleap

package labels

type Contract struct {
Contract string `label:"contract"`
Role string `label:"role"`
}

func (ct Contract) SetContract(contract string) Contract {
ct.Contract = contract
return ct
}

func (ct Contract) SetRole(role string) Contract {
ct.Role = role
return ct
}

func (ct Contract) GetConnection() Connection {
return Connection{
Contract: ct.Contract,
Role: ct.Role,
}
}

func (ct Contract) WithErr(error string) ContractErr {
return ContractErr{
Contract: ct.Contract,
Role: ct.Role,
Error: error,
}
}

func (ct Contract) WithCapMode(mode string) ContractNetCap {
return ContractNetCap{
Contract: ct.Contract,
Role: ct.Role,
Mode: mode,
}
}

type ContractErr struct {
Contract string `label:"contract"`
Role string `label:"role"`
Error string `label:"error"`
}

func (cl ContractErr) SetContract(contract string) ContractErr {
cl.Contract = contract
return cl
}

func (cl ContractErr) SetRole(role string) ContractErr {
cl.Role = role
return cl
}

func (cl ContractErr) GetContract() Contract {
return Contract{
Contract: cl.Contract,
Role: cl.Role,
}
}

func (cl ContractErr) SetError(error string) ContractErr {
cl.Error = error
return cl
}

type ContractNetCap struct {
Contract string `label:"contract"`
Role string `label:"role"`
Mode string `label:"mode"`
}

func (cnc ContractNetCap) SetContract(contract string) ContractNetCap {
cnc.Contract = contract
return cnc
}

func (cnc ContractNetCap) SetRole(role string) ContractNetCap {
cnc.Role = role
return cnc
}

func (cnc ContractNetCap) GetContract() Contract {
return Contract{
Contract: cnc.Contract,
Role: cnc.Role,
}
}

func (cnc ContractNetCap) SetCapMode(mode string) ContractNetCap {
cnc.Mode = mode
return cnc
}

type Connection struct {
Contract string `label:"contract"`
Role string `label:"role"`
Origin string `label:"origin"` // "client" or "target"
}

func (cl Connection) SetContract(contract string) Connection {
cl.Contract = contract
return cl
}

func (cl Connection) SetRole(role string) Connection {
cl.Role = role
return cl
}

func (cl Connection) GetContract() Contract {
return Contract{
Contract: cl.Contract,
Role: cl.Role,
}
}

func (cl Connection) SetOrigin(origin string) Connection {
cl.Origin = origin
return cl
}
35 changes: 0 additions & 35 deletions api/meteredrwc/mrwclabels/contractlabels.go

This file was deleted.

34 changes: 21 additions & 13 deletions api/meteredrwc/rwc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,35 +6,42 @@ import (
"io"
"sync/atomic"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/wireleap/relay/api/labels"
"github.com/wireleap/relay/telemetry"
)

/**
Updates:
- synced *uint64 + internal int on read
- prometheus telemetry (duration + bytes) on close [ToDo]
- prometheus telemetry (duration + bytes) on close
**/
type MRWC struct {
rwc io.ReadWriteCloser
bytes int
//promCounter prometheus.Counter
//promHist TimeHistogram
startAt time.Time
syncBytes *uint64
rwc io.ReadWriteCloser
bytes int
promCounter prometheus.Counter
promTProc telemetry.TimeHistogram
startAt time.Time
syncBytes *uint64
}

func New(rwc io.ReadWriteCloser, syncBytes *uint64) io.ReadWriteCloser {
// ToDo: Add promCounter and promHist
func New(rwc io.ReadWriteCloser, syncBytes *uint64, connLabs labels.Connection) io.ReadWriteCloser {
return &MRWC{
rwc: rwc,
startAt: time.Now(),
syncBytes: syncBytes,
rwc: rwc,
promCounter: telemetry.Metrics.Net.TotalBytes(connLabs),
promTProc: telemetry.Metrics.Conn.Lifetime(),
startAt: time.Now(),
syncBytes: syncBytes,
}
}

func (mRWC *MRWC) update(i int) {
if mRWC.syncBytes != nil {
atomic.AddUint64(mRWC.syncBytes, uint64(i))
}

mRWC.promCounter.Add(float64(i))
mRWC.bytes = mRWC.bytes + i
}

Expand All @@ -49,6 +56,7 @@ func (mRWC *MRWC) Write(p []byte) (n int, err error) {
}

func (mRWC *MRWC) Close() error {
//duration := time.Since(mRWC.startAt)
mRWC.promTProc.Since(mRWC.startAt)

return mRWC.rwc.Close()
}
4 changes: 3 additions & 1 deletion api/meteredrwc/rwc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"net"
"testing"
"time"

"github.com/wireleap/relay/api/labels"
)

var test = []byte{'h', 'e', 'l', 'l', 'o', ' ', 'w', 'o', 'r', 'l', 'd'}
Expand All @@ -21,7 +23,7 @@ func TestRetransmit(t *testing.T) {
c1, c2 := net.Pipe()
go c2.Write(test) // Buffer alike

r := New(c1, &i)
r := New(c1, &i, labels.Connection{})

var w bytes.Buffer

Expand Down
71 changes: 63 additions & 8 deletions contractmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
"github.com/wireleap/common/cli/fsdir"
"github.com/wireleap/common/cli/upgrade"
"github.com/wireleap/relay/api/epoch"
"github.com/wireleap/relay/api/labels"
"github.com/wireleap/relay/api/synccounters"
"github.com/wireleap/relay/filenames"
"github.com/wireleap/relay/relaycfg"
"github.com/wireleap/relay/relaylib"
"github.com/wireleap/relay/relaystats"
"github.com/wireleap/relay/relaystats/nustore"
"github.com/wireleap/relay/telemetry"
"github.com/wireleap/relay/version"

"errors"
Expand Down Expand Up @@ -141,19 +143,34 @@ func (n netCapsCfg) HardCap() (map[string]uint64, uint64) {
}

// Retuns new map and global soft+hard cap calculations
func (n netCapsCfg) Caps() (resCaps map[string]cap, resGlobal cap) {
func (n netCapsCfg) Caps(controller *relaylib.Controller) (resCaps map[string]cap, resGlobal cap) {
resGlobal = cap{
soft: uint64(float64(n.globalCap) * netCapSoftLimit),
hard: uint64(float64(n.globalCap) * netCapHardLimit),
}

// Telemetry
ct := labels.Contract{Contract: "global"}
//telemetry.Metrics.Net.CapLimitsBytes(ct.WithCapMode("soft")).Set(float64(resGlobal.soft))
telemetry.Metrics.Net.CapLimitsBytes(ct.WithCapMode("hard")).Set(float64(resGlobal.hard))

contractCaps := n.contractCaps()

resCaps = make(map[string]cap, len(contractCaps))
for k, u := range contractCaps {
soft := uint64(float64(u) * netCapSoftLimit) // softFactor
hard := uint64(float64(u) * netCapHardLimit) // hardFactor
resCaps[k] = cap{soft, hard}

// Telemetry
role, err := controller.Role(k)
if err != nil {
log.Print(err)
}

ct = labels.Contract{Contract: k, Role: role}
telemetry.Metrics.Net.CapLimitsBytes(ct.WithCapMode("soft")).Set(float64(soft))
telemetry.Metrics.Net.CapLimitsBytes(ct.WithCapMode("hard")).Set(float64(hard))
}
return
}
Expand Down Expand Up @@ -283,7 +300,7 @@ func (m *Manager) setReachedCaps() {

// gather external data sources
contracts := m.Controller.Contracts()
caps, globalXCap := m.netCaps.Caps()
caps, globalXCap := m.netCaps.Caps(m.Controller)

// initialise global counter
sum := uint64(0)
Expand All @@ -310,20 +327,56 @@ func (m *Manager) setReachedCaps() {
sum = sum + i
}

if ct_cap, ok := caps[contract]; !ok {
// pass
} else if i > ct_cap.hard {
reachedCaps[contract] = hardCap
} else if i > ct_cap.soft {
reachedCaps[contract] = softCap
// telemetry labels
role, err := m.Controller.Role(contract)
if err != nil {
// This should never fail
return true
}

ct := labels.Contract{Contract: contract, Role: role}

if ct_cap, ok := caps[contract]; ok {
if i > ct_cap.hard {
reachedCaps[contract] = hardCap
telemetry.Metrics.Net.CapLimitStatus(ct).Set(hardCap) //telmetry
} else if i > ct_cap.soft {
reachedCaps[contract] = softCap
telemetry.Metrics.Net.CapLimitStatus(ct).Set(softCap) //telmetry
} else {
telemetry.Metrics.Net.CapLimitStatus(ct).Set(okCap) //telmetry
}

// telemetry
f := float64(i)
telemetry.Metrics.Net.RemainingCapBytes(ct.WithCapMode("soft")).Set(float64(ct_cap.soft) - f)
telemetry.Metrics.Net.RemainingCapBytes(ct.WithCapMode("hard")).Set(float64(ct_cap.hard) - f)
} else {
telemetry.Metrics.Net.CapLimitStatus(ct).Set(okCap - 1)
}

return true
}

m.NetStats.Active.ContractStats.Range(f)

// telemetry labels
ct := labels.Contract{Contract: "global"}

if m.netCaps.globalCap != 0 {
globalCap = sum >= globalXCap.hard

// telemetry
//telemetry.Metrics.Net.RemainingCapBytes(ct.WithCapMode("soft")).Set(float64(globalXCap.soft) - float64(sum))
telemetry.Metrics.Net.RemainingCapBytes(ct.WithCapMode("hard")).Set(float64(globalXCap.hard) - float64(sum))

if globalCap {
telemetry.Metrics.Net.CapLimitStatus(ct).Set(hardCap)
} else {
telemetry.Metrics.Net.CapLimitStatus(ct).Set(okCap)
}
} else {
telemetry.Metrics.Net.CapLimitStatus(ct).Set(okCap - 1)
}

return
Expand All @@ -341,6 +394,8 @@ func (m *Manager) setReachedCapsMock() {
reachedCaps = make(map[string]int, len(contracts))
for _, ct := range contracts {
reachedCaps[ct] = okCap

// skipping telemetry
}

return
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.16
require (
github.com/blang/semver v3.5.1+incompatible
github.com/c2h5oh/datasize v0.0.0-20200825124411-48ed595a09d2
github.com/cabify/gotoprom v1.1.0
github.com/prometheus/client_golang v1.12.2
github.com/wireleap/common v0.3.7
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 // indirect
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e // indirect
)
Loading