Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions example/api/extended.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

// This is an example extended endpoint reachable at /1.0/extended/simple.
Expand All @@ -40,7 +39,7 @@ var extendedWebsocketCmd = rest.Endpoint{

// This is the POST handler for the /1.0/extended/simple endpoint.
// This example shows how to forward a request to other cluster members.
func cmdSimple(state state.State, r *http.Request) response.Response {
func cmdSimple(state types.State, r *http.Request) response.Response {
// Check the user agent header to check if we are the notifying cluster member.
if !types.IsNotification(r) {
// Get a collection of clients every other cluster member, with the notification user-agent set.
Expand Down Expand Up @@ -101,7 +100,7 @@ func cmdSimple(state state.State, r *http.Request) response.Response {

// This is the GET handler for the /1.0/extended/websocket endpoint.
// This example shows how to use websockets.
func cmdWebsocket(state state.State, r *http.Request) response.Response {
func cmdWebsocket(state types.State, r *http.Request) response.Response {
if r.Header.Get("Upgrade") == "websocket" {
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
Expand Down
23 changes: 11 additions & 12 deletions example/cmd/microd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/canonical/microcluster/v3/example/version"
"github.com/canonical/microcluster/v3/microcluster"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

type cmdGlobal struct {
Expand Down Expand Up @@ -89,9 +88,9 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
}

// exampleHooks are some example post-action hooks that can be run by MicroCluster.
dargs.Hooks = &state.Hooks{
dargs.Hooks = &types.Hooks{
// PostBootstrap is run after the daemon is initialized and bootstrapped.
PostBootstrap: func(ctx context.Context, s state.State, initConfig map[string]string) error {
PostBootstrap: func(ctx context.Context, s types.State, initConfig map[string]string) error {
// We can derive the logger using our custom handler from the app.
logger := m.LoggerFromContext(ctx)

Expand All @@ -115,7 +114,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
return nil
},

PreInit: func(ctx context.Context, s state.State, bootstrap bool, initConfig map[string]string) error {
PreInit: func(ctx context.Context, s types.State, bootstrap bool, initConfig map[string]string) error {
logger := m.LoggerFromContext(ctx)

logger.Info("This is a hook that runs before the daemon is initialized")
Expand All @@ -125,7 +124,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// OnStart is run after the daemon is started.
OnStart: func(ctx context.Context, s state.State) error {
OnStart: func(ctx context.Context, s types.State) error {
logger := m.LoggerFromContext(ctx)

logger.Info("This is a hook that runs after the daemon first starts")
Expand All @@ -134,7 +133,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// PostJoin is run after the daemon is initialized and joins a cluster.
PostJoin: func(ctx context.Context, s state.State, initConfig map[string]string) error {
PostJoin: func(ctx context.Context, s types.State, initConfig map[string]string) error {
logger := m.LoggerFromContext(ctx)

logger.Info("This is a hook that runs after the daemon is initialized and joins an existing cluster, after OnNewMember runs on all peers")
Expand All @@ -144,7 +143,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// PreJoin is run after the daemon is initialized and joins a cluster.
PreJoin: func(ctx context.Context, s state.State, initConfig map[string]string) error {
PreJoin: func(ctx context.Context, s types.State, initConfig map[string]string) error {
logger := m.LoggerFromContext(ctx)

logger.Info("This is a hook that runs after the daemon is initialized and joins an existing cluster, before OnNewMember runs on all peers")
Expand All @@ -154,7 +153,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// PostRemove is run after the daemon is removed from a cluster.
PostRemove: func(ctx context.Context, s state.State, force bool) error {
PostRemove: func(ctx context.Context, s types.State, force bool) error {
logger := m.LoggerFromContext(ctx)

logger.Info(fmt.Sprintf("This is a hook that is run on peer %q after a cluster member is removed, with the force flag set to %v", s.Name(), force))
Expand All @@ -163,7 +162,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// PreRemove is run before the daemon is removed from the cluster.
PreRemove: func(ctx context.Context, s state.State, force bool) error {
PreRemove: func(ctx context.Context, s types.State, force bool) error {
logger := m.LoggerFromContext(ctx)

logger.Info(fmt.Sprintf("This is a hook that is run on peer %q just before it is removed, with the force flag set to %v", s.Name(), force))
Expand All @@ -172,7 +171,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// OnHeartbeat is run after a successful heartbeat round.
OnHeartbeat: func(ctx context.Context, s state.State, roleStatus map[string]types.RoleStatus) error {
OnHeartbeat: func(ctx context.Context, s types.State, roleStatus map[string]types.RoleStatus) error {
logger := m.LoggerFromContext(ctx)

logger.Info("This is a hook that is run on the dqlite leader after a successful heartbeat; role information for cluster members is available")
Expand All @@ -190,7 +189,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// OnNewMember is run after a new member has joined.
OnNewMember: func(ctx context.Context, s state.State, newMember types.ClusterMemberLocal) error {
OnNewMember: func(ctx context.Context, s types.State, newMember types.ClusterMemberLocal) error {
logger := m.LoggerFromContext(ctx)

logger.Info(fmt.Sprintf("This is a hook that is run on peer %q when the new cluster member %q has joined", s.Name(), newMember.Name))
Expand All @@ -199,7 +198,7 @@ func (c *cmdDaemon) run(cmd *cobra.Command, args []string) error {
},

// OnDaemonConfigUpdate is run after the local daemon config of a cluster member got modified.
OnDaemonConfigUpdate: func(ctx context.Context, s state.State, config types.DaemonConfig) error {
OnDaemonConfigUpdate: func(ctx context.Context, s types.State, config types.DaemonConfig) error {
logger := m.LoggerFromContext(ctx)
logger.Info(fmt.Sprintf("Running OnDaemonConfigUpdate triggered by %q", config.Name))

Expand Down
27 changes: 13 additions & 14 deletions internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

// Args are the data needed to start a MicroCluster daemon.
Expand All @@ -64,7 +63,7 @@ type Args struct {
APIExtensions []string

// Functions that trigger at various lifecycle events
Hooks *state.Hooks
Hooks *types.Hooks

// Each rest.Server will be initialized and managed by microcluster.
ExtensionServers map[string]rest.Server
Expand Down Expand Up @@ -97,7 +96,7 @@ type Daemon struct {
fsWatcher *sys.Watcher
trustStore *trust.Store

hooks state.Hooks // Hooks to be called upon various daemon actions.
hooks types.Hooks // Hooks to be called upon various daemon actions.

ReadyChan chan struct{} // Closed when the daemon is fully ready.
shutdownCtx context.Context // Cancelled when shutdown starts.
Expand Down Expand Up @@ -250,7 +249,7 @@ func (d *Daemon) Run(ctx context.Context, stateDir string, args Args) error {
}
}

func (d *Daemon) init(listenAddress string, socketGroup string, heartbeatInterval time.Duration, schemaExtensions []clusterDB.Update, apiExtensions []string, hooks *state.Hooks) error {
func (d *Daemon) init(listenAddress string, socketGroup string, heartbeatInterval time.Duration, schemaExtensions []clusterDB.Update, apiExtensions []string, hooks *types.Hooks) error {
d.applyHooks(hooks)

// Register smart error mappings.
Expand Down Expand Up @@ -374,21 +373,21 @@ func (d *Daemon) init(listenAddress string, socketGroup string, heartbeatInterva
return nil
}

func (d *Daemon) applyHooks(hooks *state.Hooks) {
func (d *Daemon) applyHooks(hooks *types.Hooks) {
// Apply a no-op hooks for any missing hooks.
noOpHook := func(ctx context.Context, s state.State) error { return nil }
noOpRemoveHook := func(ctx context.Context, s state.State, force bool) error { return nil }
noOpInitHook := func(ctx context.Context, s state.State, initConfig map[string]string) error { return nil }
noOpGenericInitHook := func(ctx context.Context, s state.State, bootstrap bool, initConfig map[string]string) error {
noOpHook := func(ctx context.Context, s types.State) error { return nil }
noOpRemoveHook := func(ctx context.Context, s types.State, force bool) error { return nil }
noOpInitHook := func(ctx context.Context, s types.State, initConfig map[string]string) error { return nil }
noOpGenericInitHook := func(ctx context.Context, s types.State, bootstrap bool, initConfig map[string]string) error {
return nil
}

noOpConfigHook := func(ctx context.Context, s state.State, config types.DaemonConfig) error { return nil }
noOpNewMemberHook := func(ctx context.Context, s state.State, newMember types.ClusterMemberLocal) error { return nil }
noOpHeartbeatHook := func(ctx context.Context, s state.State, roleStatus map[string]types.RoleStatus) error { return nil }
noOpConfigHook := func(ctx context.Context, s types.State, config types.DaemonConfig) error { return nil }
noOpNewMemberHook := func(ctx context.Context, s types.State, newMember types.ClusterMemberLocal) error { return nil }
noOpHeartbeatHook := func(ctx context.Context, s types.State, roleStatus map[string]types.RoleStatus) error { return nil }

if hooks == nil {
d.hooks = state.Hooks{}
d.hooks = types.Hooks{}
} else {
d.hooks = *hooks
}
Expand Down Expand Up @@ -1109,7 +1108,7 @@ func (d *Daemon) FileSystem() types.OS {
}

// State creates a State instance with the daemon's stateful components.
func (d *Daemon) State() state.State {
func (d *Daemon) State() types.State {
state := &internalState.InternalState{
Hooks: &d.hooks,
Context: d.shutdownCtx,
Expand Down
4 changes: 2 additions & 2 deletions internal/rest/access/authentication.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (e ErrInvalidHost) Unwrap() error {

// AllowAuthenticated checks if the request is trusted by extracting access.TrustedRequest from the request context.
// This handler is used as an access handler by default if AllowUntrusted is false on a rest.EndpointAction.
func AllowAuthenticated(state state.State, r *http.Request) (bool, response.Response) {
func AllowAuthenticated(state types.State, r *http.Request) (bool, response.Response) {
trusted := r.Context().Value(client.CtxAccess)
if trusted == nil {
return false, response.Forbidden(nil)
Expand Down Expand Up @@ -97,7 +97,7 @@ func checkMutualTLS(ctx context.Context, cert x509.Certificate, trustedCerts map
// Authenticate ensures the request certificates are trusted against the given set of trusted certificates.
// - Requests over the unix socket are always allowed.
// - HTTP requests require the TLS Peer certificate to match an entry in the supplied map of certificates.
func Authenticate(s state.State, r *http.Request, hostAddress string, trustedCerts map[string]x509.Certificate) (bool, error) {
func Authenticate(s types.State, r *http.Request, hostAddress string, trustedCerts map[string]x509.Certificate) (bool, error) {
if r.RemoteAddr == "@" {
return true, nil
}
Expand Down
3 changes: 1 addition & 2 deletions internal/rest/resources/api_1.0.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var api10Cmd = rest.Endpoint{
Expand All @@ -16,7 +15,7 @@ var api10Cmd = rest.Endpoint{
Get: rest.EndpointAction{Handler: api10Get, AllowUntrusted: true},
}

func api10Get(s state.State, r *http.Request) response.Response {
func api10Get(s types.State, r *http.Request) response.Response {
addrPort, err := types.ParseAddrPort(s.Address().Host)
if err != nil {
return response.SmartError(err)
Expand Down
3 changes: 1 addition & 2 deletions internal/rest/resources/certificates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var clusterCertificatesCmd = rest.Endpoint{
Expand All @@ -31,7 +30,7 @@ var clusterCertificatesCmd = rest.Endpoint{
Put: rest.EndpointAction{Handler: clusterCertificatesPut, AccessHandler: access.AllowAuthenticated},
}

func clusterCertificatesPut(s state.State, r *http.Request) response.Response {
func clusterCertificatesPut(s types.State, r *http.Request) response.Response {
certificateName, err := url.PathUnescape(mux.Vars(r)["name"])
if err != nil {
return response.SmartError(err)
Expand Down
11 changes: 5 additions & 6 deletions internal/rest/resources/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var clusterCmd = rest.Endpoint{
Expand Down Expand Up @@ -61,7 +60,7 @@ var clusterMemberInternalCmd = rest.Endpoint{
Put: rest.EndpointAction{Handler: clusterMemberPut, AccessHandler: access.AllowAuthenticated},
}

func clusterPost(s state.State, r *http.Request) response.Response {
func clusterPost(s types.State, r *http.Request) response.Response {
err := s.Database().IsOpen(r.Context())
if err != nil {
return response.SmartError(err)
Expand Down Expand Up @@ -243,7 +242,7 @@ func clusterPost(s state.State, r *http.Request) response.Response {
return response.SyncResponse(true, tokenResponse)
}

func clusterGet(s state.State, r *http.Request) response.Response {
func clusterGet(s types.State, r *http.Request) response.Response {
status := s.Database().Status()

// If the database is not in a ready or waiting state, we can't be sure it's available for use.
Expand Down Expand Up @@ -328,7 +327,7 @@ func clusterGet(s state.State, r *http.Request) response.Response {
// from the cluster when not the leader.
var clusterDisableMu sync.Mutex

func clusterMemberPut(s state.State, r *http.Request) response.Response {
func clusterMemberPut(s types.State, r *http.Request) response.Response {
force := r.URL.Query().Get("force") == "1"
reExec, err := resetClusterMember(r.Context(), s, force)
if err != nil {
Expand Down Expand Up @@ -356,7 +355,7 @@ func clusterMemberPut(s state.State, r *http.Request) response.Response {

// resetClusterMember clears the daemon state, closing the database and stopping all listeners.
// Returns a function that can be used to re-exec the daemon, forcibly reloading its state.
func resetClusterMember(ctx context.Context, s state.State, force bool) (reExec func(), err error) {
func resetClusterMember(ctx context.Context, s types.State, force bool) (reExec func(), err error) {
intState, err := internalState.ToInternal(s)
if err != nil {
return nil, err
Expand Down Expand Up @@ -415,7 +414,7 @@ func resetClusterMember(ctx context.Context, s state.State, force bool) (reExec
}

// clusterMemberDelete Removes a cluster member from dqlite and re-execs its daemon.
func clusterMemberDelete(s state.State, r *http.Request) response.Response {
func clusterMemberDelete(s types.State, r *http.Request) response.Response {
force := r.URL.Query().Get("force") == "1"
addr := r.URL.Query().Get("address")
name, err := url.PathUnescape(mux.Vars(r)["name"])
Expand Down
7 changes: 3 additions & 4 deletions internal/rest/resources/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var controlCmd = rest.Endpoint{
Expand All @@ -31,7 +30,7 @@ var controlCmd = rest.Endpoint{
Post: rest.EndpointAction{Handler: controlPost, AccessHandler: access.AllowAuthenticated},
}

func controlPost(state state.State, r *http.Request) response.Response {
func controlPost(state types.State, r *http.Request) response.Response {
status := state.Database().Status()
if status != types.DatabaseNotReady {
return response.SmartError(fmt.Errorf("Unable to initialize cluster: %s", status))
Expand Down Expand Up @@ -186,7 +185,7 @@ func controlPost(state state.State, r *http.Request) response.Response {
return response.EmptySyncResponse
}

func joinWithToken(state state.State, r *http.Request, req *types.Control) (*types.TokenResponse, *types.Remote, error) {
func joinWithToken(state types.State, r *http.Request, req *types.Control) (*types.TokenResponse, *types.Remote, error) {
token, err := types.DecodeToken(req.JoinToken)
if err != nil {
return nil, nil, err
Expand Down Expand Up @@ -289,7 +288,7 @@ func writeCert(dir, prefix string, cert, key, ca []byte) error {
return nil
}

func setupLocalMember(state state.State, localClusterMember *types.Remote, joinInfo *types.TokenResponse) ([]string, error) {
func setupLocalMember(state types.State, localClusterMember *types.Remote, joinInfo *types.TokenResponse) ([]string, error) {
// Set up cluster certificate.
err := writeCert(state.FileSystem().StateDir(), string(types.ClusterCertificateName), []byte(joinInfo.ClusterCert.String()), []byte(joinInfo.ClusterKey), nil)
if err != nil {
Expand Down
5 changes: 2 additions & 3 deletions internal/rest/resources/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
"github.com/canonical/microcluster/v3/state"
)

var daemonCmd = rest.Endpoint{
Expand All @@ -23,7 +22,7 @@ var daemonCmd = rest.Endpoint{
Put: rest.EndpointAction{Handler: daemonServersPut, AccessHandler: access.AllowAuthenticated},
}

func daemonServersGet(s state.State, r *http.Request) response.Response {
func daemonServersGet(s types.State, r *http.Request) response.Response {
intState, err := internalState.ToInternal(s)
if err != nil {
return response.SmartError(err)
Expand All @@ -32,7 +31,7 @@ func daemonServersGet(s state.State, r *http.Request) response.Response {
return response.SyncResponse(true, intState.LocalConfig().GetServers())
}

func daemonServersPut(s state.State, r *http.Request) response.Response {
func daemonServersPut(s types.State, r *http.Request) response.Response {
req := make(map[string]types.ServerConfig)

// Parse the request.
Expand Down
5 changes: 3 additions & 2 deletions internal/rest/resources/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/canonical/microcluster/v3/internal/state"
"github.com/canonical/microcluster/v3/microcluster/rest"
"github.com/canonical/microcluster/v3/microcluster/rest/response"
"github.com/canonical/microcluster/v3/microcluster/types"
)

var databaseCmd = rest.Endpoint{
Expand All @@ -18,7 +19,7 @@ var databaseCmd = rest.Endpoint{
Patch: rest.EndpointAction{Handler: databasePatch},
}

func databasePost(state state.State, r *http.Request) response.Response {
func databasePost(state types.State, r *http.Request) response.Response {
// Compare the dqlite version of the connecting client with our own.
versionHeader := r.Header.Get("X-Dqlite-Version")
if versionHeader == "" {
Expand All @@ -39,7 +40,7 @@ func databasePost(state state.State, r *http.Request) response.Response {
return response.EmptySyncResponse
}

func databasePatch(s state.State, r *http.Request) response.Response {
func databasePatch(s types.State, r *http.Request) response.Response {
// Compare the dqlite version of the connecting client with our own.
versionHeader := r.Header.Get("X-Dqlite-Version")
if versionHeader == "" {
Expand Down
Loading