Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions server/db/driver/pg/internal/markets.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ const (
// tables.
CreateMarketsTable = `CREATE TABLE IF NOT EXISTS %s (
name TEXT PRIMARY KEY,
base INT2,
quote INT2,
base INT8,
quote INT8,
lot_size INT8
)`

Expand Down
11 changes: 10 additions & 1 deletion server/db/driver/pg/markets.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package pg
import (
"database/sql"
"fmt"
"strings"

"decred.org/dcrdex/dex"
"decred.org/dcrdex/server/db/driver/pg/internal"
Expand Down Expand Up @@ -59,7 +60,8 @@ func newMarket(db *sql.DB, marketsTableName string, mkt *dex.MarketInfo) error {
return nil
}

func createMarketTables(db *sql.DB, marketUID string) error {
func createMarketTables(db *sql.DB, marketName string) error {
marketUID := marketSchema(marketName)
newMarket, err := createSchema(db, marketUID)
if err != nil {
return err
Expand All @@ -83,3 +85,10 @@ func createMarketTables(db *sql.DB, marketUID string) error {

return nil
}

// marketSchema replaces the special token symbol character '.' with the allowed
// PostgreSQL character '$'.
func marketSchema(marketName string) string {
// '$' separator might only work with PostgreSQL.
return strings.ReplaceAll(marketName, ".", "TKN")
}
16 changes: 8 additions & 8 deletions server/db/driver/pg/matches.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ func (a *Archiver) matchTableName(match *order.Match) (string, error) {
// can actually be forgiven (inactive, not already forgiven, and not in
// MatchComplete status).
func (a *Archiver) ForgiveMatchFail(mid order.MatchID) (bool, error) {
for m := range a.markets {
stmt := fmt.Sprintf(internal.ForgiveMatchFail, fullMatchesTableName(a.dbName, m))
for schema := range a.markets {
stmt := fmt.Sprintf(internal.ForgiveMatchFail, fullMatchesTableName(a.dbName, schema))
N, err := sqlExec(a.db, stmt, mid)
if err != nil { // not just no rows updated
return false, err
Expand All @@ -49,8 +49,8 @@ func (a *Archiver) ForgiveMatchFail(mid order.MatchID) (bool, error) {
func (a *Archiver) ActiveSwaps() ([]*db.SwapDataFull, error) {
var sd []*db.SwapDataFull

for m, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, m)
for schema, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, schema)
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
matches, swapData, err := activeSwaps(ctx, a.db, matchesTableName)
cancel()
Expand Down Expand Up @@ -140,8 +140,8 @@ func activeSwaps(ctx context.Context, dbe *sql.DB, tableName string) (matches []
func (a *Archiver) CompletedAndAtFaultMatchStats(aid account.AccountID, lastN int) ([]*db.MatchOutcome, error) {
var outcomes []*db.MatchOutcome

for m, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, m)
for schema, mkt := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, schema)
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
matchOutcomes, err := completedAndAtFaultMatches(ctx, a.db, matchesTableName, aid, lastN, mkt.Base, mkt.Quote)
cancel()
Expand Down Expand Up @@ -410,8 +410,8 @@ func (a *Archiver) AllActiveUserMatches(aid account.AccountID) ([]*db.MatchData,
defer cancel()

var matches []*db.MatchData
for m := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, m)
for schema := range a.markets {
matchesTableName := fullMatchesTableName(a.dbName, schema)
mdM, err := userMatches(ctx, a.db, matchesTableName, aid, false)
if err != nil {
return nil, err
Expand Down
14 changes: 7 additions & 7 deletions server/db/driver/pg/orders.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,8 +762,8 @@ type orderCompStamped struct {
func (a *Archiver) CompletedUserOrders(aid account.AccountID, N int) (oids []order.OrderID, compTimes []int64, err error) {
var ords []orderCompStamped

for m := range a.markets {
tableName := fullOrderTableName(a.dbName, m, false) // NOT active table
for schema := range a.markets {
tableName := fullOrderTableName(a.dbName, schema, false) // NOT active table
ctx, cancel := context.WithTimeout(a.ctx, a.queryTimeout)
mktOids, err := completedUserOrders(ctx, a.db, tableName, aid, N)
cancel()
Expand Down Expand Up @@ -850,15 +850,15 @@ func (a *Archiver) PreimageStats(user account.AccountID, lastN int) ([]*db.Preim
return rows.Err()
}

for m := range a.markets {
for schema := range a.markets {
// archived trade orders
stmt := fmt.Sprintf(internal.PreimageResultsLastN, fullOrderTableName(a.dbName, m, false))
stmt := fmt.Sprintf(internal.PreimageResultsLastN, fullOrderTableName(a.dbName, schema, false))
if err := queryOutcomes(stmt); err != nil {
return nil, err
}

// archived cancel orders
stmt = fmt.Sprintf(internal.CancelPreimageResultsLastN, fullCancelOrderTableName(a.dbName, m, false))
stmt = fmt.Sprintf(internal.CancelPreimageResultsLastN, fullCancelOrderTableName(a.dbName, schema, false))
if err := queryOutcomes(stmt); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1151,8 +1151,8 @@ func (a *Archiver) UserOrderStatuses(aid account.AccountID, base, quote uint32,
// active orders for a user across all markets.
func (a *Archiver) ActiveUserOrderStatuses(aid account.AccountID) ([]*db.OrderStatus, error) {
var orders []*db.OrderStatus
for m := range a.markets {
tableName := fullOrderTableName(a.dbName, m, true) // active table
for schema := range a.markets {
tableName := fullOrderTableName(a.dbName, schema, true) // active table
mktOrders, err := a.userOrderStatusesFromTable(tableName, aid, nil)
if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions server/db/driver/pg/pg.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func NewArchiverForRead(ctx context.Context, cfg *Config) (*Archiver, error) {

mktMap := make(map[string]*dex.MarketInfo, len(cfg.MarketCfg))
for _, mkt := range cfg.MarketCfg {
mktMap[mkt.Name] = mkt
mktMap[marketSchema(mkt.Name)] = mkt
}

return &Archiver{
Expand Down Expand Up @@ -197,16 +197,17 @@ func (a *Archiver) Close() error {
}

func (a *Archiver) marketSchema(base, quote uint32) (string, error) {
marketSchema, err := dex.MarketName(base, quote)
marketName, err := dex.MarketName(base, quote)
if err != nil {
return "", err
}
_, found := a.markets[marketSchema]
schema := marketSchema(marketName)
_, found := a.markets[schema]
if !found {
return "", db.ArchiveError{
Code: db.ErrUnsupportedMarket,
Detail: fmt.Sprintf(`archiver does not support the market "%s"`, marketSchema),
Detail: fmt.Sprintf(`archiver does not support the market "%s"`, schema),
}
}
return marketSchema, nil
return schema, nil
}
14 changes: 13 additions & 1 deletion server/db/driver/pg/upgrades.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"decred.org/dcrdex/server/db/driver/pg/internal"
)

const dbVersion = 3
const dbVersion = 4

// The number of upgrades defined MUST be equal to dbVersion.
var upgrades = []func(db *sql.Tx) error{
Expand All @@ -35,6 +35,10 @@ var upgrades = []func(db *sql.Tx) error{

// v3 upgrade adds the fee_asset column to the accounts table.
v3Upgrade,

// v4 upgrade updates the markets tables to use a integer type that can
// accommodate a 32-bit unsigned integer.
v4Upgrade,
}

// v1Upgrade adds the schema_version column and removes the state_hash column
Expand Down Expand Up @@ -279,6 +283,14 @@ func v3Upgrade(tx *sql.Tx) error {
return err
}

func v4Upgrade(tx *sql.Tx) (err error) {
if _, err = tx.Exec("ALTER TABLE markets ALTER COLUMN base TYPE INT8;"); err != nil {
return
}
_, err = tx.Exec("ALTER TABLE markets ALTER COLUMN quote TYPE INT8;")
return err
}

// DBVersion retrieves the database version from the meta table.
func DBVersion(db *sql.DB) (ver uint32, err error) {
err = db.QueryRow(internal.SelectDBVersion).Scan(&ver)
Expand Down