-
Notifications
You must be signed in to change notification settings - Fork 0
Add database provider abstraction layer #41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| # Database Provider Abstraction | ||
|
|
||
| ## Overview | ||
|
|
||
| Orchestrator supports a database provider abstraction layer that decouples core | ||
| orchestration logic from database-specific operations. This allows orchestrator | ||
| to manage different database engines through a common interface. | ||
|
|
||
| MySQL is the default (and currently only) provider. The abstraction layer is | ||
| designed to support future providers such as PostgreSQL. | ||
|
|
||
| ## Architecture | ||
|
|
||
| The provider system consists of three components: | ||
|
|
||
| 1. **`DatabaseProvider` interface** (`go/inst/provider.go`) -- defines the | ||
| contract that all database providers must implement. | ||
| 2. **Provider implementations** (e.g., `go/inst/provider_mysql.go`) -- concrete | ||
| implementations for specific database engines. | ||
| 3. **Provider registry** (`go/inst/provider_registry.go`) -- a global registry | ||
| that holds the active provider instance. | ||
|
|
||
| ## The DatabaseProvider Interface | ||
|
|
||
| ```go | ||
| type DatabaseProvider interface { | ||
| // Discovery | ||
| GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) | ||
| IsReplicaRunning(key InstanceKey) (bool, error) | ||
|
|
||
| // Read-only control | ||
| SetReadOnly(key InstanceKey, readOnly bool) error | ||
| IsReadOnly(key InstanceKey) (bool, error) | ||
|
|
||
| // Replication control | ||
| StartReplication(key InstanceKey) error | ||
| StopReplication(key InstanceKey) error | ||
|
|
||
| // Provider metadata | ||
| ProviderName() string | ||
| } | ||
| ``` | ||
|
|
||
| ### ReplicationStatus | ||
|
|
||
| The `ReplicationStatus` struct provides a database-agnostic view of replication | ||
| state: | ||
|
|
||
| | Field | Description | | ||
| |------------------|--------------------------------------------------------------| | ||
| | ReplicaRunning | Whether replication is fully operational | | ||
| | SQLThreadRunning | Whether the SQL/apply thread is running | | ||
| | IOThreadRunning | Whether the IO/receiver thread is running | | ||
| | Position | Opaque replication position (MySQL GTID, PG LSN, etc.) | | ||
| | Lag | Replication lag in seconds; -1 if unknown | | ||
|
|
||
| ## Using the Provider | ||
|
|
||
| ```go | ||
| import "github.com/proxysql/orchestrator/go/inst" | ||
|
|
||
| // Get the current provider | ||
| provider := inst.GetProvider() | ||
|
|
||
| // Check replication status | ||
| status, err := provider.GetReplicationStatus(instanceKey) | ||
|
|
||
| // Control read-only mode | ||
| err = provider.SetReadOnly(instanceKey, true) | ||
|
|
||
| // Control replication | ||
| err = provider.StopReplication(instanceKey) | ||
| err = provider.StartReplication(instanceKey) | ||
| ``` | ||
|
|
||
| ## MySQL Provider | ||
|
|
||
| The MySQL provider (`MySQLProvider`) is the default provider. It delegates to | ||
| orchestrator's existing MySQL DAO functions, so all current behavior is | ||
| preserved. | ||
|
|
||
| The MySQL provider is automatically registered at init time. No configuration | ||
| is needed to use it. | ||
|
|
||
| ## Implementing a New Provider | ||
|
|
||
| To add support for a new database engine: | ||
|
|
||
| 1. Create a new file `go/inst/provider_<engine>.go`. | ||
| 2. Define a struct that implements all methods of `DatabaseProvider`. | ||
| 3. Add a compile-time interface check: | ||
| ```go | ||
| var _ DatabaseProvider = (*MyNewProvider)(nil) | ||
| ``` | ||
| 4. Register the provider during initialization or based on configuration: | ||
| ```go | ||
| inst.SetProvider(NewMyProvider()) | ||
| ``` | ||
|
|
||
| ### Guidelines | ||
|
|
||
| - **Return errors, don't panic.** All provider methods return errors. | ||
| - **Map engine-specific state to ReplicationStatus.** The `ReplicationStatus` | ||
| struct is intentionally generic. Map your engine's replication details | ||
| into the common fields. | ||
| - **Position is opaque.** The `Position` field in `ReplicationStatus` is a | ||
| string that means different things for different engines. Consumers should | ||
| not parse it directly. | ||
| - **Lag of -1 means unknown.** If your engine cannot determine replication lag, | ||
| return -1. | ||
|
|
||
| ## Current Limitations | ||
|
|
||
| This is the initial extraction. The provider interface currently covers: | ||
|
|
||
| - Replication status discovery | ||
| - Read-only control | ||
| - Basic replication start/stop | ||
|
|
||
| Future work will expand the interface to cover: | ||
|
|
||
| - Topology changes (reparenting, detach/reattach) | ||
| - GTID operations | ||
| - Semi-sync configuration | ||
| - Instance discovery and metadata |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,78 @@ | ||
| /* | ||
| Copyright 2024 Orchestrator Authors | ||
|
|
||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||
| you may not use this file except in compliance with the License. | ||
| You may obtain a copy of the License at | ||
|
|
||
| http://www.apache.org/licenses/LICENSE-2.0 | ||
|
|
||
| Unless required by applicable law or agreed to in writing, software | ||
| distributed under the License is distributed on an "AS IS" BASIS, | ||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| See the License for the specific language governing permissions and | ||
| limitations under the License. | ||
| */ | ||
|
|
||
| package inst | ||
|
|
||
| // ReplicationStatus represents database-agnostic replication state. | ||
| // Each database provider maps its engine-specific replication details | ||
| // into this common structure. | ||
| type ReplicationStatus struct { | ||
| // ReplicaRunning indicates whether replication is fully operational | ||
| // (both IO and SQL threads running for MySQL, WAL receiver active for PostgreSQL, etc.) | ||
| ReplicaRunning bool | ||
|
|
||
| // SQLThreadRunning indicates whether the SQL/apply thread is running. | ||
| // For engines without separate threads, this mirrors ReplicaRunning. | ||
| SQLThreadRunning bool | ||
|
|
||
| // IOThreadRunning indicates whether the IO/receiver thread is running. | ||
| // For engines without separate threads, this mirrors ReplicaRunning. | ||
| IOThreadRunning bool | ||
|
|
||
| // Position is an opaque replication position string. | ||
| // For MySQL this is a GTID set or binlog file:pos; for PostgreSQL it would be an LSN. | ||
| Position string | ||
|
|
||
| // Lag is the replication lag in seconds. -1 indicates lag is unknown. | ||
| Lag int64 | ||
| } | ||
|
|
||
| // DatabaseProvider abstracts database-specific operations. | ||
| // Implementations exist per database engine (MySQL, future PostgreSQL, etc.). | ||
| // This interface covers the minimal set of operations needed for topology | ||
| // management in a database-agnostic way. | ||
| type DatabaseProvider interface { | ||
| // Discovery | ||
|
|
||
| // GetReplicationStatus retrieves the current replication state | ||
| // for the given instance, returning a database-agnostic status. | ||
| GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) | ||
|
|
||
| // IsReplicaRunning checks whether replication is actively running | ||
| // on the given instance. | ||
| IsReplicaRunning(key InstanceKey) (bool, error) | ||
|
|
||
| // Read-only control | ||
|
|
||
| // SetReadOnly sets or clears the read-only state on the given instance. | ||
| SetReadOnly(key InstanceKey, readOnly bool) error | ||
|
|
||
| // IsReadOnly checks whether the given instance is in read-only mode. | ||
| IsReadOnly(key InstanceKey) (bool, error) | ||
|
|
||
| // Replication control | ||
|
|
||
| // StartReplication starts the replication process on the given instance. | ||
| StartReplication(key InstanceKey) error | ||
|
|
||
| // StopReplication stops the replication process on the given instance. | ||
| StopReplication(key InstanceKey) error | ||
|
|
||
| // Provider metadata | ||
|
|
||
| // ProviderName returns a short identifier for this provider (e.g. "mysql"). | ||
| ProviderName() string | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,116 @@ | ||||||||||||||||||||||||||||||
| /* | ||||||||||||||||||||||||||||||
| Copyright 2024 Orchestrator Authors | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||||||||||||||||
| you may not use this file except in compliance with the License. | ||||||||||||||||||||||||||||||
| You may obtain a copy of the License at | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Unless required by applicable law or agreed to in writing, software | ||||||||||||||||||||||||||||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||||||||||||||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||||||||||||||||
| See the License for the specific language governing permissions and | ||||||||||||||||||||||||||||||
| limitations under the License. | ||||||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| package inst | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||||
| "github.com/proxysql/golib/log" | ||||||||||||||||||||||||||||||
| "github.com/proxysql/golib/sqlutils" | ||||||||||||||||||||||||||||||
| "github.com/proxysql/orchestrator/go/db" | ||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // MySQLProvider implements DatabaseProvider for MySQL and MySQL-compatible | ||||||||||||||||||||||||||||||
| // databases (MariaDB, Percona Server, etc.). It delegates to the existing | ||||||||||||||||||||||||||||||
| // orchestrator DAO functions. | ||||||||||||||||||||||||||||||
| type MySQLProvider struct{} | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // NewMySQLProvider creates a new MySQL database provider. | ||||||||||||||||||||||||||||||
| func NewMySQLProvider() *MySQLProvider { | ||||||||||||||||||||||||||||||
| return &MySQLProvider{} | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // ProviderName returns "mysql". | ||||||||||||||||||||||||||||||
| func (p *MySQLProvider) ProviderName() string { | ||||||||||||||||||||||||||||||
| return "mysql" | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // GetReplicationStatus retrieves the replication state for a MySQL instance | ||||||||||||||||||||||||||||||
| // by reading the topology and mapping it to a database-agnostic ReplicationStatus. | ||||||||||||||||||||||||||||||
| func (p *MySQLProvider) GetReplicationStatus(key InstanceKey) (*ReplicationStatus, error) { | ||||||||||||||||||||||||||||||
| instance, err := ReadTopologyInstance(&key) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return nil, log.Errore(err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| lag := int64(-1) | ||||||||||||||||||||||||||||||
| if instance.SecondsBehindMaster.Valid { | ||||||||||||||||||||||||||||||
| lag = instance.SecondsBehindMaster.Int64 | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| position := instance.ExecutedGtidSet | ||||||||||||||||||||||||||||||
| if position == "" { | ||||||||||||||||||||||||||||||
| position = instance.SelfBinlogCoordinates.DisplayString() | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| return &ReplicationStatus{ | ||||||||||||||||||||||||||||||
| ReplicaRunning: instance.ReplicaRunning(), | ||||||||||||||||||||||||||||||
| SQLThreadRunning: instance.ReplicationSQLThreadRuning, | ||||||||||||||||||||||||||||||
| IOThreadRunning: instance.ReplicationIOThreadRuning, | ||||||||||||||||||||||||||||||
| Position: position, | ||||||||||||||||||||||||||||||
| Lag: lag, | ||||||||||||||||||||||||||||||
| }, nil | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // IsReplicaRunning checks whether replication is actively running on the | ||||||||||||||||||||||||||||||
| // given MySQL instance. | ||||||||||||||||||||||||||||||
| func (p *MySQLProvider) IsReplicaRunning(key InstanceKey) (bool, error) { | ||||||||||||||||||||||||||||||
| instance, err := ReadTopologyInstance(&key) | ||||||||||||||||||||||||||||||
| if err != nil { | ||||||||||||||||||||||||||||||
| return false, log.Errore(err) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| return instance.ReplicaRunning(), nil | ||||||||||||||||||||||||||||||
|
Comment on lines
+70
to
+74
|
||||||||||||||||||||||||||||||
| instance, err := ReadTopologyInstance(&key) | |
| if err != nil { | |
| return false, log.Errore(err) | |
| } | |
| return instance.ReplicaRunning(), nil | |
| status, err := p.GetReplicationStatus(key) | |
| if err != nil { | |
| return false, err | |
| } | |
| return status.ReplicaRunning, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IsReplicaRunning method duplicates the logic for fetching the instance topology from GetReplicationStatus. To improve maintainability and reduce code duplication, consider implementing IsReplicaRunning by calling GetReplicationStatus and returning the ReplicaRunning field from the resulting status object. This makes it clear that IsReplicaRunning is a convenience helper for GetReplicationStatus.
| func (p *MySQLProvider) IsReplicaRunning(key InstanceKey) (bool, error) { | |
| instance, err := ReadTopologyInstance(&key) | |
| if err != nil { | |
| return false, log.Errore(err) | |
| } | |
| return instance.ReplicaRunning(), nil | |
| } | |
| func (p *MySQLProvider) IsReplicaRunning(key InstanceKey) (bool, error) { | |
| status, err := p.GetReplicationStatus(key) | |
| if err != nil { | |
| return false, err | |
| } | |
| return status.ReplicaRunning, nil | |
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,45 @@ | ||||||||||||||||
| /* | ||||||||||||||||
| Copyright 2024 Orchestrator Authors | ||||||||||||||||
|
|
||||||||||||||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||||||||||||||
| you may not use this file except in compliance with the License. | ||||||||||||||||
| You may obtain a copy of the License at | ||||||||||||||||
|
|
||||||||||||||||
| http://www.apache.org/licenses/LICENSE-2.0 | ||||||||||||||||
|
|
||||||||||||||||
| Unless required by applicable law or agreed to in writing, software | ||||||||||||||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||||||||||||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||||||||||||
| See the License for the specific language governing permissions and | ||||||||||||||||
| limitations under the License. | ||||||||||||||||
| */ | ||||||||||||||||
|
|
||||||||||||||||
| package inst | ||||||||||||||||
|
|
||||||||||||||||
| import "sync" | ||||||||||||||||
|
|
||||||||||||||||
| var ( | ||||||||||||||||
| providerMu sync.RWMutex | ||||||||||||||||
| providerInst DatabaseProvider | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| // SetProvider sets the global database provider. This should be called | ||||||||||||||||
| // during initialization to configure which database engine orchestrator | ||||||||||||||||
| // manages. The default provider is MySQL. | ||||||||||||||||
| func SetProvider(p DatabaseProvider) { | ||||||||||||||||
| providerMu.Lock() | ||||||||||||||||
| defer providerMu.Unlock() | ||||||||||||||||
|
||||||||||||||||
| defer providerMu.Unlock() | |
| defer providerMu.Unlock() | |
| if p == nil { | |
| // Ignore attempts to set a nil provider to avoid panics from GetProvider(). | |
| // The existing provider (including the default set in init) is preserved. | |
| return | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the instance has neither ExecutedGtidSet nor a valid binlog coordinate (SelfBinlogCoordinates.LogFile is empty), DisplayString() will produce the sentinel ":0", which looks like a real position but is not meaningful. Consider returning an empty Position (or another explicit "unknown" sentinel) when coordinates are empty, to avoid consumers misinterpreting ":0" as a valid replication position.