From e9d41f46516422c284b76f496e8ad3692116e320 Mon Sep 17 00:00:00 2001 From: Rene Cannao Date: Tue, 24 Mar 2026 01:07:13 +0000 Subject: [PATCH] Add database provider abstraction layer Introduce a DatabaseProvider interface that abstracts database-specific operations (replication status, read-only control, start/stop replication) behind a common contract. This is the architectural foundation for multi-database support. - Define DatabaseProvider interface and ReplicationStatus type (provider.go) - Implement MySQLProvider that delegates to existing DAO functions (provider_mysql.go) - Add provider registry with MySQL as default provider (provider_registry.go) - Add unit tests for interface satisfaction and registry (provider_test.go) - Add documentation for the provider system (docs/database-providers.md) Closes #32 --- docs/database-providers.md | 125 +++++++++++++++++++++++++++++++++++ go/inst/provider.go | 78 ++++++++++++++++++++++ go/inst/provider_mysql.go | 116 ++++++++++++++++++++++++++++++++ go/inst/provider_registry.go | 45 +++++++++++++ go/inst/provider_test.go | 69 +++++++++++++++++++ 5 files changed, 433 insertions(+) create mode 100644 docs/database-providers.md create mode 100644 go/inst/provider.go create mode 100644 go/inst/provider_mysql.go create mode 100644 go/inst/provider_registry.go create mode 100644 go/inst/provider_test.go diff --git a/docs/database-providers.md b/docs/database-providers.md new file mode 100644 index 00000000..c6ea6b0f --- /dev/null +++ b/docs/database-providers.md @@ -0,0 +1,125 @@ +# Database Provider Abstraction + +## Overview + +Orchestrator supports a database provider abstraction layer that decouples core +orchestration logic from database-specific operations. This allows orchestrator +to manage different database engines through a common interface. + +MySQL is the default (and currently only) provider. The abstraction layer is +designed to support future providers such as PostgreSQL. + +## Architecture + +The provider system consists of three components: + +1. **`DatabaseProvider` interface** (`go/inst/provider.go`) -- defines the + contract that all database providers must implement. +2. **Provider implementations** (e.g., `go/inst/provider_mysql.go`) -- concrete + implementations for specific database engines. +3. **Provider registry** (`go/inst/provider_registry.go`) -- a global registry + that holds the active provider instance. + +## The DatabaseProvider Interface + +```go +type DatabaseProvider interface { + // Discovery + GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) + IsReplicaRunning(key InstanceKey) (bool, error) + + // Read-only control + SetReadOnly(key InstanceKey, readOnly bool) error + IsReadOnly(key InstanceKey) (bool, error) + + // Replication control + StartReplication(key InstanceKey) error + StopReplication(key InstanceKey) error + + // Provider metadata + ProviderName() string +} +``` + +### ReplicationStatus + +The `ReplicationStatus` struct provides a database-agnostic view of replication +state: + +| Field | Description | +|------------------|--------------------------------------------------------------| +| ReplicaRunning | Whether replication is fully operational | +| SQLThreadRunning | Whether the SQL/apply thread is running | +| IOThreadRunning | Whether the IO/receiver thread is running | +| Position | Opaque replication position (MySQL GTID, PG LSN, etc.) | +| Lag | Replication lag in seconds; -1 if unknown | + +## Using the Provider + +```go +import "github.com/proxysql/orchestrator/go/inst" + +// Get the current provider +provider := inst.GetProvider() + +// Check replication status +status, err := provider.GetReplicationStatus(instanceKey) + +// Control read-only mode +err = provider.SetReadOnly(instanceKey, true) + +// Control replication +err = provider.StopReplication(instanceKey) +err = provider.StartReplication(instanceKey) +``` + +## MySQL Provider + +The MySQL provider (`MySQLProvider`) is the default provider. It delegates to +orchestrator's existing MySQL DAO functions, so all current behavior is +preserved. + +The MySQL provider is automatically registered at init time. No configuration +is needed to use it. + +## Implementing a New Provider + +To add support for a new database engine: + +1. Create a new file `go/inst/provider_.go`. +2. Define a struct that implements all methods of `DatabaseProvider`. +3. Add a compile-time interface check: + ```go + var _ DatabaseProvider = (*MyNewProvider)(nil) + ``` +4. Register the provider during initialization or based on configuration: + ```go + inst.SetProvider(NewMyProvider()) + ``` + +### Guidelines + +- **Return errors, don't panic.** All provider methods return errors. +- **Map engine-specific state to ReplicationStatus.** The `ReplicationStatus` + struct is intentionally generic. Map your engine's replication details + into the common fields. +- **Position is opaque.** The `Position` field in `ReplicationStatus` is a + string that means different things for different engines. Consumers should + not parse it directly. +- **Lag of -1 means unknown.** If your engine cannot determine replication lag, + return -1. + +## Current Limitations + +This is the initial extraction. The provider interface currently covers: + +- Replication status discovery +- Read-only control +- Basic replication start/stop + +Future work will expand the interface to cover: + +- Topology changes (reparenting, detach/reattach) +- GTID operations +- Semi-sync configuration +- Instance discovery and metadata diff --git a/go/inst/provider.go b/go/inst/provider.go new file mode 100644 index 00000000..e4e7422e --- /dev/null +++ b/go/inst/provider.go @@ -0,0 +1,78 @@ +/* + Copyright 2024 Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package inst + +// ReplicationStatus represents database-agnostic replication state. +// Each database provider maps its engine-specific replication details +// into this common structure. +type ReplicationStatus struct { + // ReplicaRunning indicates whether replication is fully operational + // (both IO and SQL threads running for MySQL, WAL receiver active for PostgreSQL, etc.) + ReplicaRunning bool + + // SQLThreadRunning indicates whether the SQL/apply thread is running. + // For engines without separate threads, this mirrors ReplicaRunning. + SQLThreadRunning bool + + // IOThreadRunning indicates whether the IO/receiver thread is running. + // For engines without separate threads, this mirrors ReplicaRunning. + IOThreadRunning bool + + // Position is an opaque replication position string. + // For MySQL this is a GTID set or binlog file:pos; for PostgreSQL it would be an LSN. + Position string + + // Lag is the replication lag in seconds. -1 indicates lag is unknown. + Lag int64 +} + +// DatabaseProvider abstracts database-specific operations. +// Implementations exist per database engine (MySQL, future PostgreSQL, etc.). +// This interface covers the minimal set of operations needed for topology +// management in a database-agnostic way. +type DatabaseProvider interface { + // Discovery + + // GetReplicationStatus retrieves the current replication state + // for the given instance, returning a database-agnostic status. + GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) + + // IsReplicaRunning checks whether replication is actively running + // on the given instance. + IsReplicaRunning(key InstanceKey) (bool, error) + + // Read-only control + + // SetReadOnly sets or clears the read-only state on the given instance. + SetReadOnly(key InstanceKey, readOnly bool) error + + // IsReadOnly checks whether the given instance is in read-only mode. + IsReadOnly(key InstanceKey) (bool, error) + + // Replication control + + // StartReplication starts the replication process on the given instance. + StartReplication(key InstanceKey) error + + // StopReplication stops the replication process on the given instance. + StopReplication(key InstanceKey) error + + // Provider metadata + + // ProviderName returns a short identifier for this provider (e.g. "mysql"). + ProviderName() string +} diff --git a/go/inst/provider_mysql.go b/go/inst/provider_mysql.go new file mode 100644 index 00000000..791967fa --- /dev/null +++ b/go/inst/provider_mysql.go @@ -0,0 +1,116 @@ +/* + Copyright 2024 Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package inst + +import ( + "github.com/proxysql/golib/log" + "github.com/proxysql/golib/sqlutils" + "github.com/proxysql/orchestrator/go/db" +) + +// MySQLProvider implements DatabaseProvider for MySQL and MySQL-compatible +// databases (MariaDB, Percona Server, etc.). It delegates to the existing +// orchestrator DAO functions. +type MySQLProvider struct{} + +// NewMySQLProvider creates a new MySQL database provider. +func NewMySQLProvider() *MySQLProvider { + return &MySQLProvider{} +} + +// ProviderName returns "mysql". +func (p *MySQLProvider) ProviderName() string { + return "mysql" +} + +// GetReplicationStatus retrieves the replication state for a MySQL instance +// by reading the topology and mapping it to a database-agnostic ReplicationStatus. +func (p *MySQLProvider) GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) { + instance, err := ReadTopologyInstance(&key) + if err != nil { + return nil, log.Errore(err) + } + + lag := int64(-1) + if instance.SecondsBehindMaster.Valid { + lag = instance.SecondsBehindMaster.Int64 + } + + position := instance.ExecutedGtidSet + if position == "" { + position = instance.SelfBinlogCoordinates.DisplayString() + } + + return &ReplicationStatus{ + ReplicaRunning: instance.ReplicaRunning(), + SQLThreadRunning: instance.ReplicationSQLThreadRuning, + IOThreadRunning: instance.ReplicationIOThreadRuning, + Position: position, + Lag: lag, + }, nil +} + +// IsReplicaRunning checks whether replication is actively running on the +// given MySQL instance. +func (p *MySQLProvider) IsReplicaRunning(key InstanceKey) (bool, error) { + instance, err := ReadTopologyInstance(&key) + if err != nil { + return false, log.Errore(err) + } + return instance.ReplicaRunning(), nil +} + +// SetReadOnly sets or clears the global read_only variable on a MySQL instance. +// This delegates to the existing SetReadOnly function. +func (p *MySQLProvider) SetReadOnly(key InstanceKey, readOnly bool) error { + _, err := SetReadOnly(&key, readOnly) + return err +} + +// IsReadOnly checks whether a MySQL instance has global read_only enabled. +func (p *MySQLProvider) IsReadOnly(key InstanceKey) (bool, error) { + sqlDB, err := db.OpenTopology(key.Hostname, key.Port) + if err != nil { + return false, log.Errore(err) + } + var readOnly bool + err = sqlutils.QueryRowsMap(sqlDB, "SELECT @@global.read_only AS read_only", func(m sqlutils.RowMap) error { + readOnly = m.GetBool("read_only") + return nil + }) + if err != nil { + return false, log.Errore(err) + } + return readOnly, nil +} + +// StartReplication starts the replication threads on the given MySQL instance. +// This delegates to the existing StartReplication function. +func (p *MySQLProvider) StartReplication(key InstanceKey) error { + _, err := StartReplication(&key) + return err +} + +// StopReplication stops the replication threads on the given MySQL instance. +// This delegates to the existing StopReplication function. +func (p *MySQLProvider) StopReplication(key InstanceKey) error { + _, err := StopReplication(&key) + return err +} + +// Compile-time check that MySQLProvider implements DatabaseProvider. +var _ DatabaseProvider = (*MySQLProvider)(nil) diff --git a/go/inst/provider_registry.go b/go/inst/provider_registry.go new file mode 100644 index 00000000..ec9f93f7 --- /dev/null +++ b/go/inst/provider_registry.go @@ -0,0 +1,45 @@ +/* + Copyright 2024 Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package inst + +import "sync" + +var ( + providerMu sync.RWMutex + providerInst DatabaseProvider +) + +// SetProvider sets the global database provider. This should be called +// during initialization to configure which database engine orchestrator +// manages. The default provider is MySQL. +func SetProvider(p DatabaseProvider) { + providerMu.Lock() + defer providerMu.Unlock() + providerInst = p +} + +// GetProvider returns the currently configured database provider. +func GetProvider() DatabaseProvider { + providerMu.RLock() + defer providerMu.RUnlock() + return providerInst +} + +func init() { + // MySQL is the default provider, preserving backward compatibility. + SetProvider(NewMySQLProvider()) +} diff --git a/go/inst/provider_test.go b/go/inst/provider_test.go new file mode 100644 index 00000000..5ec47bae --- /dev/null +++ b/go/inst/provider_test.go @@ -0,0 +1,69 @@ +/* + Copyright 2024 Orchestrator Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package inst + +import ( + "testing" +) + +func TestMySQLProviderName(t *testing.T) { + p := NewMySQLProvider() + if p.ProviderName() != "mysql" { + t.Errorf("expected 'mysql', got %q", p.ProviderName()) + } +} + +func TestDefaultProviderIsMySQL(t *testing.T) { + p := GetProvider() + if p == nil { + t.Fatal("expected non-nil default provider") + } + if p.ProviderName() != "mysql" { + t.Errorf("expected default provider 'mysql', got %q", p.ProviderName()) + } +} + +func TestMySQLProviderImplementsInterface(t *testing.T) { + // Compile-time interface satisfaction check + var _ DatabaseProvider = (*MySQLProvider)(nil) +} + +func TestSetAndGetProvider(t *testing.T) { + original := GetProvider() + defer SetProvider(original) + + custom := NewMySQLProvider() + SetProvider(custom) + + got := GetProvider() + if got != custom { + t.Error("GetProvider did not return the provider set by SetProvider") + } +} + +func TestReplicationStatusDefaults(t *testing.T) { + status := ReplicationStatus{} + if status.ReplicaRunning { + t.Error("expected ReplicaRunning to be false by default") + } + if status.Lag != 0 { + t.Error("expected Lag to be 0 by default") + } + if status.Position != "" { + t.Error("expected Position to be empty by default") + } +}