Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/pipeline_items.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (rc *SnapClient) replaceTables(
return 0, errors.Trace(err)
}

if err := notifyUpdateAllUsersPrivilege(renamedTables, rc.dom.NotifyUpdatePrivilege); err != nil {
if err := notifyUpdateAllUsersPrivilege(renamedTables, rc.dom.NotifyUpdateAllUsersPrivilege); err != nil {
return 0, errors.Trace(err)
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ func (rc *SnapClient) afterSystemTablesReplaced(ctx context.Context, db string,
var err error
for _, table := range tables {
if table == "user" {
if serr := rc.dom.NotifyUpdatePrivilege(); serr != nil {
if serr := rc.dom.NotifyUpdateAllUsersPrivilege(); serr != nil {
log.Warn("failed to flush privileges, please manually execute `FLUSH PRIVILEGES`")
err = multierr.Append(err, berrors.ErrUnknown.Wrap(serr).GenWithStack("failed to flush privileges"))
} else {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/systable_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestCheckPrivilegeTableRowsCollateCompatibility(t *testing.T) {
//
// The above variables are in the file br/pkg/restore/systable_restore.go
func TestMonitorTheSystemTableIncremental(t *testing.T) {
require.Equal(t, int64(223), session.CurrentBootstrapVersion)
require.Equal(t, int64(224), session.CurrentBootstrapVersion)
}

func TestIsStatsTemporaryTable(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6267,7 +6267,7 @@ func (e *executor) DropResourceGroup(ctx sessionctx.Context, stmt *ast.DropResou
if checker == nil {
return errors.New("miss privilege checker")
}
user, matched := checker.MatchUserResourceGroupName(groupName.L)
user, matched := checker.MatchUserResourceGroupName(ctx.GetRestrictedSQLExecutor(), groupName.L)
if matched {
err = errors.Errorf("user [%s] depends on the resource group to drop", user)
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ go_library(
"//pkg/util/printer",
"//pkg/util/replayer",
"//pkg/util/servermemorylimit",
"//pkg/util/size",
"//pkg/util/sqlexec",
"//pkg/util/sqlkiller",
"//pkg/util/syncutil",
Expand Down
112 changes: 91 additions & 21 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package domain

import (
"context"
"encoding/json"
"fmt"
"math"
"math/rand"
Expand Down Expand Up @@ -98,6 +99,7 @@ import (
"github.com/pingcap/tidb/pkg/util/memoryusagealarm"
"github.com/pingcap/tidb/pkg/util/replayer"
"github.com/pingcap/tidb/pkg/util/servermemorylimit"
"github.com/pingcap/tidb/pkg/util/size"
"github.com/pingcap/tidb/pkg/util/sqlkiller"
"github.com/pingcap/tidb/pkg/util/syncutil"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -1885,6 +1887,36 @@ func (do *Domain) GetPDHTTPClient() pdhttp.Client {
return nil
}

func decodePrivilegeEvent(resp clientv3.WatchResponse) PrivilegeEvent {
var msg PrivilegeEvent
for _, event := range resp.Events {
if event.Kv != nil {
val := event.Kv.Value
if len(val) > 0 {
var tmp PrivilegeEvent
err := json.Unmarshal(val, &tmp)
if err != nil {
logutil.BgLogger().Warn("decodePrivilegeEvent unmarshal fail", zap.Error(err))
break
}
if tmp.All {
msg.All = true
break
}
// duplicated users in list is ok.
msg.UserList = append(msg.UserList, tmp.UserList...)
}
}
}

// In case old version triggers the event, the event value is empty,
// Then we fall back to the old way: reload all the users.
if len(msg.UserList) == 0 {
msg.All = true
}
return msg
}

// LoadPrivilegeLoop create a goroutine loads privilege tables in a loop, it
// should be called only once in BootstrapSession.
func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
Expand All @@ -1894,15 +1926,12 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
if err != nil {
return err
}
do.privHandle = privileges.NewHandle(sctx.GetRestrictedSQLExecutor())
if err := do.privHandle.Update(); err != nil {
return errors.Trace(err)
}
do.privHandle = privileges.NewHandle(do.SysSessionPool(), sctx.GetSessionVars().GlobalVarsAccessor)

var watchCh clientv3.WatchChan
duration := 5 * time.Minute
if do.etcdClient != nil {
watchCh = do.etcdClient.Watch(context.Background(), privilegeKey)
watchCh = do.etcdClient.Watch(do.ctx, privilegeKey)
duration = 10 * time.Minute
}

Expand All @@ -1914,25 +1943,30 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {

var count int
for {
ok := true
var event PrivilegeEvent
select {
case <-do.exit:
return
case _, ok = <-watchCh:
case <-time.After(duration):
}
if !ok {
logutil.BgLogger().Warn("load privilege loop watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), privilegeKey)
count++
if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
case resp, ok := <-watchCh:
if ok {
count = 0
event = decodePrivilegeEvent(resp)
} else {
if do.ctx.Err() == nil {
logutil.BgLogger().Error("load privilege loop watch channel closed")
watchCh = do.etcdClient.Watch(do.ctx, privilegeKey)
count++
if count > 10 {
time.Sleep(time.Duration(count) * time.Second)
}
continue
}
}
continue
case <-time.After(duration):
event.All = true
}

count = 0
err := do.privHandle.Update()
err := privReloadEvent(do.privHandle, &event)
metrics.LoadPrivilegeCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
if err != nil {
logutil.BgLogger().Warn("load privilege failed", zap.Error(err))
Expand All @@ -1942,6 +1976,18 @@ func (do *Domain) LoadPrivilegeLoop(sctx sessionctx.Context) error {
return nil
}

func privReloadEvent(h *privileges.Handle, event *PrivilegeEvent) (err error) {
switch {
case !variable.AccelerateUserCreationUpdate.Load():
err = h.UpdateAll()
case event.All:
err = h.UpdateAllActive()
default:
err = h.Update(event.UserList)
}
return
}

// LoadSysVarCacheLoop create a goroutine loads sysvar cache in a loop,
// it should be called only once in BootstrapSession.
func (do *Domain) LoadSysVarCacheLoop(ctx sessionctx.Context) error {
Expand Down Expand Up @@ -2923,15 +2969,39 @@ const (
tiflashComputeNodeKey = "/tiflash/new_tiflash_compute_nodes"
)

// PrivilegeEvent is the message definition for NotifyUpdatePrivilege(), encoded in json.
// TiDB old version do not use no such message.
type PrivilegeEvent struct {
All bool
UserList []string
}

// NotifyUpdateAllUsersPrivilege updates privilege key in etcd, TiDB client that watches
// the key will get notification.
func (do *Domain) NotifyUpdateAllUsersPrivilege() error {
return do.notifyUpdatePrivilege(PrivilegeEvent{All: true})
}

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
// the key will get notification.
func (do *Domain) NotifyUpdatePrivilege() error {
func (do *Domain) NotifyUpdatePrivilege(userList []string) error {
return do.notifyUpdatePrivilege(PrivilegeEvent{UserList: userList})
}

func (do *Domain) notifyUpdatePrivilege(event PrivilegeEvent) error {
// No matter skip-grant-table is configured or not, sending an etcd message is required.
// Because we need to tell other TiDB instances to update privilege data, say, we're changing the
// password using a special TiDB instance and want the new password to take effect.
if do.etcdClient != nil {
data, err := json.Marshal(event)
if err != nil {
return errors.Trace(err)
}
if uint64(len(data)) > size.MB {
logutil.BgLogger().Warn("notify update privilege message too large", zap.ByteString("value", data))
}
row := do.etcdClient.KV
_, err := row.Put(context.Background(), privilegeKey, "")
_, err = row.Put(do.ctx, privilegeKey, string(data))
if err != nil {
logutil.BgLogger().Warn("notify update privilege failed", zap.Error(err))
}
Expand All @@ -2944,7 +3014,7 @@ func (do *Domain) NotifyUpdatePrivilege() error {
return nil
}

return do.PrivilegeHandle().Update()
return privReloadEvent(do.PrivilegeHandle(), &event)
}

// NotifyUpdateSysVarCache updates the sysvar cache key in etcd, which other TiDB
Expand Down
3 changes: 2 additions & 1 deletion pkg/executor/grant.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,8 @@ func (e *GrantExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}
isCommit = true
return domain.GetDomain(e.Ctx()).NotifyUpdatePrivilege()
users := userSpecToUserList(e.Users)
return domain.GetDomain(e.Ctx()).NotifyUpdatePrivilege(users)
}

func containsNonDynamicPriv(privList []*ast.PrivElem) bool {
Expand Down
2 changes: 0 additions & 2 deletions pkg/executor/infoschema_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,8 +711,6 @@ func TestIndexUsageTable(t *testing.T) {
where TABLE_SCHEMA = 'test' and TABLE_NAME = 'idt2' and INDEX_NAME = 'idx_4';`).Check(
testkit.RowsWithSep("|",
"test|idt2|idx_4"))
tk.MustQuery(`select count(*) from information_schema.tidb_index_usage;`).Check(
testkit.RowsWithSep("|", "81"))

tk.MustQuery(`select TABLE_SCHEMA, TABLE_NAME, INDEX_NAME from information_schema.tidb_index_usage
where TABLE_SCHEMA = 'test1';`).Check(testkit.Rows())
Expand Down
11 changes: 10 additions & 1 deletion pkg/executor/revoke.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,16 @@ func (e *RevokeExec) Next(ctx context.Context, _ *chunk.Chunk) error {
return err
}
isCommit = true
return domain.GetDomain(e.Ctx()).NotifyUpdatePrivilege()
users := userSpecToUserList(e.Users)
return domain.GetDomain(e.Ctx()).NotifyUpdatePrivilege(users)
}

func userSpecToUserList(specs []*ast.UserSpec) []string {
users := make([]string, 0, len(specs))
for _, user := range specs {
users = append(users, user.User.Username)
}
return users
}

// Checks that dynamic privileges are only of global scope.
Expand Down
8 changes: 4 additions & 4 deletions pkg/executor/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (e *ShowExec) fetchAll(ctx context.Context) error {
case ast.ShowEngines:
return e.fetchShowEngines(ctx)
case ast.ShowGrants:
return e.fetchShowGrants()
return e.fetchShowGrants(ctx)
case ast.ShowIndex:
return e.fetchShowIndex()
case ast.ShowProcedureStatus:
Expand Down Expand Up @@ -1869,7 +1869,7 @@ func (e *ShowExec) fetchShowCreateUser(ctx context.Context) error {
return nil
}

func (e *ShowExec) fetchShowGrants() error {
func (e *ShowExec) fetchShowGrants(ctx context.Context) error {
vars := e.Ctx().GetSessionVars()
checker := privilege.GetPrivilegeManager(e.Ctx())
if checker == nil {
Expand Down Expand Up @@ -1898,11 +1898,11 @@ func (e *ShowExec) fetchShowGrants() error {
if r.Hostname == "" {
r.Hostname = "%"
}
if !checker.FindEdge(e.Ctx(), r, e.User) {
if !checker.FindEdge(ctx, e.Ctx(), r, e.User) {
return exeerrors.ErrRoleNotGranted.GenWithStackByArgs(r.String(), e.User.String())
}
}
gs, err := checker.ShowGrants(e.Ctx(), e.User, e.Roles)
gs, err := checker.ShowGrants(ctx, e.Ctx(), e.User, e.Roles)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading