Skip to content

[sql-50] session migration duplicate ID fix #1133

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 70 additions & 22 deletions session/sql_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,35 +40,16 @@ func MigrateSessionStoreToSQL(ctx context.Context, kvStore *bbolt.DB,
return err
}

// If sessions are linked to a group, we must insert the initial session
// of each group before the other sessions in that group. This ensures
// we can retrieve the SQL group ID when inserting the remaining
// sessions. Therefore, we first insert all initial group sessions,
// allowing us to fetch the group IDs and insert the rest of the
// sessions afterward.
// We therefore filter out the initial sessions first, and then migrate
// them prior to the rest of the sessions.
var (
initialGroupSessions []*Session
linkedSessions []*Session
)

for _, kvSession := range kvSessions {
if kvSession.GroupID == kvSession.ID {
initialGroupSessions = append(
initialGroupSessions, kvSession,
)
} else {
linkedSessions = append(linkedSessions, kvSession)
}
}
initialGroupSessions, linkedSessions := filterSessions(kvSessions)

// Migrate the non-linked sessions first.
err = migrateSessionsToSQLAndValidate(ctx, tx, initialGroupSessions)
if err != nil {
return fmt.Errorf("migration of non-linked session failed: %w",
err)
}

// Then migrate the linked sessions.
err = migrateSessionsToSQLAndValidate(ctx, tx, linkedSessions)
if err != nil {
return fmt.Errorf("migration of linked session failed: %w", err)
Expand All @@ -81,6 +62,73 @@ func MigrateSessionStoreToSQL(ctx context.Context, kvStore *bbolt.DB,
return nil
}

// filterSessions categorizes the sessions into two groups: initial group
// sessions and linked sessions. The initial group sessions are the first
// sessions in a session group, while the linked sessions are those that have a
// linked parent session. These are separated to ensure that we can insert the
// initial group sessions first, which allows us to fetch the SQL group ID when
// inserting the rest of the linked sessions afterward.
//
// Additionally, it checks for duplicate session IDs and drops all but
// one session with the same ID, keeping the one with the latest CreatedAt
// timestamp. Note that users with duplicate session IDs should be extremely
// rare, as it could only occur if colliding session IDs were created prior to
// the introduction of the session linking functionality.
func filterSessions(kvSessions []*Session) ([]*Session, []*Session) {
// First map sessions by their ID.
sessionsByID := make(map[ID][]*Session)
for _, s := range kvSessions {
sessionsByID[s.ID] = append(sessionsByID[s.ID], s)
}

var (
initialGroupSessions []*Session
linkedSessions []*Session
)

// Process the mapped sessions. If there are duplicate sessions with the
// same ID, we will only iterate the session with the latest CreatedAt
// timestamp, and drop the other sessions. This is to ensure that we can
// keep a UNIQUE constraint for the session ID (alias) in the SQL db.
for id, sessions := range sessionsByID {
sessionToKeep := sessions[0]
if len(sessions) > 1 {
log.Warnf("Found %d sessions with duplicate ID %x, "+
"keeping only the latest one", len(sessions),
id)

// Find the session with the latest timestamp.
latestSession := sessions[0]
for _, s := range sessions[1:] {
if s.CreatedAt.After(latestSession.CreatedAt) {
latestSession = s
}
}
sessionToKeep = latestSession

// Log the sessions that will be dropped.
for _, s := range sessions {
if s == sessionToKeep {
continue
}
log.Warnf("Dropping duplicate session with ID "+
"%x created at %v", id, s.CreatedAt)
}
}

// Categorize the session that we are keeping.
if sessionToKeep.GroupID == sessionToKeep.ID {
initialGroupSessions = append(
initialGroupSessions, sessionToKeep,
)
} else {
linkedSessions = append(linkedSessions, sessionToKeep)
}
}

return initialGroupSessions, linkedSessions
}

// getBBoltSessions is a helper function that fetches all sessions from the
// Bbolt store, by iterating directly over the buckets, without needing to
// use any public functions of the BoltStore struct.
Expand Down
Loading
Loading