From e657df70eddd7bd4f067388f9540b327f700ab89 Mon Sep 17 00:00:00 2001 From: Kevin Gillette Date: Sat, 9 Dec 2023 12:01:11 -0700 Subject: [PATCH 1/4] Support --delete option for reset-offsets command Also refactor some methods to have fewer parameters. --- cmd/topicctl/subcmd/reset.go | 209 ++++++++++++++++++++++++----------- pkg/cli/cli.go | 53 +++++---- pkg/groups/groups.go | 124 ++++++++++++++------- pkg/groups/groups_test.go | 60 ++++++---- 4 files changed, 296 insertions(+), 150 deletions(-) diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index 831e049d..4fa4981e 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -6,6 +6,7 @@ import ( "fmt" "strconv" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/groups" @@ -14,9 +15,9 @@ import ( ) var resetOffsetsCmd = &cobra.Command{ - Use: "reset-offsets [topic name] [group name]", + Use: "reset-offsets ", Short: "reset consumer group offsets", - Args: cobra.MinimumNArgs(2), + Args: cobra.ExactArgs(2), PreRunE: resetOffsetsPreRun, RunE: resetOffsetsRun, } @@ -27,6 +28,7 @@ type resetOffsetsCmdConfig struct { partitionOffsetMap map[string]int64 toEarliest bool toLatest bool + delete bool shared sharedOptions } @@ -62,40 +64,58 @@ func init() { "to-latest", false, "Resets offsets of consumer group members to latest offsets of partitions") + resetOffsetsCmd.Flags().BoolVar( + &resetOffsetsConfig.delete, + "delete", + false, + "Deletes offsets for the given consumer group") addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared) RootCmd.AddCommand(resetOffsetsCmd) } func resetOffsetsPreRun(cmd *cobra.Command, args []string) error { - resetOffsetSpecification := "You must choose only one of the following reset-offset specifications: --to-earliest, --to-latest, --offset." - offsetMapSpecification := "--partition-offset-map option cannot be coupled with any of the following options: --partitions, --to-earliest, --to-latest, --offset." + resetOffsetSpec := "You must choose only one of the following " + + "reset-offset specifications: --delete, --to-earliest, --to-latest, " + + "--offset, or --partition-offset-map." + offsetMapSpec := "--partition-offset-map option cannot be used with --partitions." - if len(resetOffsetsConfig.partitionOffsetMap) > 0 && (cmd.Flags().Changed("offset") || - len(resetOffsetsConfig.partitions) > 0 || - resetOffsetsConfig.toEarliest || - resetOffsetsConfig.toLatest) { - return errors.New(offsetMapSpecification) + cfg := resetOffsetsConfig - } else if resetOffsetsConfig.toEarliest && resetOffsetsConfig.toLatest { - return errors.New(resetOffsetSpecification) + numOffsetSpecs := numTrue( + cfg.toEarliest, + cfg.toLatest, + cfg.delete, + cmd.Flags().Changed("offset"), + len(cfg.partitionOffsetMap) > 0, + ) - } else if cmd.Flags().Changed("offset") && (resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest) { - return errors.New(resetOffsetSpecification) + if numOffsetSpecs > 1 { + return errors.New(resetOffsetSpec) } - return resetOffsetsConfig.shared.validate() + + if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 { + return errors.New(offsetMapSpec) + } + + return cfg.shared.validate() } func resetOffsetsRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - adminClient, err := resetOffsetsConfig.shared.getAdminClient(ctx, nil, true) + cfg := resetOffsetsConfig + + adminClient, err := cfg.shared.getAdminClient(ctx, nil, true) if err != nil { return err } + defer adminClient.Close() + connector := adminClient.GetConnector() + topic := args[0] group := args[1] @@ -103,69 +123,73 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { if err != nil { return err } - partitionIDsMap := map[int]struct{}{} + + partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions)) for _, partitionInfo := range topicInfo.Partitions { partitionIDsMap[partitionInfo.ID] = struct{}{} } - var resetOffsetsStrategy string - if resetOffsetsConfig.toLatest { - resetOffsetsStrategy = groups.LatestResetOffsetsStrategy - } else if resetOffsetsConfig.toEarliest { - resetOffsetsStrategy = groups.EarliestResetOffsetsStrategy + + var strategy string + + switch { + case resetOffsetsConfig.toLatest: + strategy = groups.LatestResetOffsetsStrategy + case resetOffsetsConfig.toEarliest: + strategy = groups.EarliestResetOffsetsStrategy } - partitionOffsets := map[int]int64{} - if len(resetOffsetsConfig.partitionOffsetMap) > 0 { - for partition, offset := range resetOffsetsConfig.partitionOffsetMap { - var partitionID int - if partitionID, err = strconv.Atoi(partition); err != nil { - return fmt.Errorf("Partition value %s must be a number", partition) - } - if _, ok := partitionIDsMap[partitionID]; !ok { - return fmt.Errorf("Partition %d not found in topic %s", partitionID, topic) - } + // If explicit per-partition offsets were specified, set them now. + partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap) + if err != nil { + return err + } - partitionOffsets[partitionID] = offset + // Set explicit partitions (without offsets) if specified, + // otherwise operate on fetched partition info; + // these will only take effect of per-partition offsets were not specified. + partitions := cfg.partitions + if len(partitions) == 0 && len(partitionOffsets) == 0 { + convert := func(info admin.PartitionInfo) int { return info.ID } + partitions = convertSlice(topicInfo.Partitions, convert) + } + for _, partition := range partitions { + _, ok := partitionIDsMap[partition] + if !ok { + format := "Partition %d not found in topic %s" + return fmt.Errorf(format, partition, topic) } - } else if len(resetOffsetsConfig.partitions) > 0 { - for _, partition := range resetOffsetsConfig.partitions { - if _, ok := partitionIDsMap[partition]; !ok { - return fmt.Errorf("Partition %d not found in topic %s", partition, topic) - } - if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest { - partitionOffsets[partition], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partition) - if err != nil { - return err - } - } else { - partitionOffsets[partition] = resetOffsetsConfig.offset - } + if strategy == "" { + partitionOffsets[partition] = cfg.offset + return nil + } + input := groups.GetEarliestOrLatestOffsetInput{ + Connector: connector, + Strategy: strategy, + Topic: topic, + Partition: partition, } - } else { - for _, partitionInfo := range topicInfo.Partitions { - if resetOffsetsConfig.toEarliest || resetOffsetsConfig.toLatest { - partitionOffsets[partitionInfo.ID], err = groups.GetEarliestOrLatestOffset(ctx, adminClient.GetConnector(), topic, resetOffsetsStrategy, partitionInfo.ID) - if err != nil { - return err - } - } else { - partitionOffsets[partitionInfo.ID] = resetOffsetsConfig.offset - } + + offset, err := groups.GetEarliestOrLatestOffset(ctx, &input) + if err != nil { + return err } + + partitionOffsets[partition] = offset } log.Infof( - "This will reset the offsets for the following partitions in topic %s for group %s:\n%s", + "This will reset the offsets for the following partitions "+ + "in topic %s for group %s:\n%s", topic, group, groups.FormatPartitionOffsets(partitionOffsets), ) - log.Info( - "Please ensure that all other consumers are stopped, otherwise the reset might be overridden.", - ) + + log.Info("Please ensure that all other consumers are stopped, " + + "otherwise the reset might be overridden.") ok, _ := apply.Confirm("OK to continue?", false) if !ok { @@ -173,10 +197,65 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { } cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - return cliRunner.ResetOffsets( - ctx, - topic, - group, - partitionOffsets, - ) + + if resetOffsetsConfig.delete { + input := groups.DeleteOffsetsInput{ + GroupID: group, + Topic: topic, + Partitions: partitions, + } + + return cliRunner.DeleteOffsets(ctx, &input) + } + + input := groups.ResetOffsetsInput{ + GroupID: group, + Topic: topic, + PartitionOffsets: partitionOffsets, + } + + return cliRunner.ResetOffsets(ctx, &input) +} + +func numTrue(bools ...bool) int { + var n int + for _, b := range bools { + if b { + n++ + } + } + + return n +} + +func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { + out := make([]T2, len(input)) + + for i, v := range input { + out[i] = fn(v) + } + + return out +} + +func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) { + out := make(map[int]int64, len(input)) + + for partition, offset := range input { + partitionID, err := strconv.Atoi(partition) + if err != nil { + format := "Partition value %s must be an integer" + return nil, fmt.Errorf(format, partition) + } + + _, ok := partitionIDsMap[partitionID] + if !ok { + format := "Partition %d not found" + return nil, fmt.Errorf(format, partitionID) + } + + out[partitionID] = offset + } + + return out, nil } diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 71abde33..bd3134bb 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -534,29 +534,14 @@ func (c *CLIRunner) GetUsers(ctx context.Context, names []string) error { return nil } -// ResetOffsets resets the offsets for a single consumer group / topic combination. -func (c *CLIRunner) ResetOffsets( - ctx context.Context, - topic string, - groupID string, - partitionOffsets map[int]int64, -) error { - c.startSpinner() - err := groups.ResetOffsets( - ctx, - c.adminClient.GetConnector(), - topic, - groupID, - partitionOffsets, - ) - c.stopSpinner() - if err != nil { - return err - } - - c.printer("Success") +// DeleteOffsets removes offsets for a single consumer group / topic combination. +func (c *CLIRunner) DeleteOffsets(ctx context.Context, input *groups.DeleteOffsetsInput) error { + return invoke(ctx, c, input, groups.DeleteOffsets) +} - return nil +// ResetOffsets resets the offsets for a single consumer group / topic combination. +func (c *CLIRunner) ResetOffsets(ctx context.Context, input *groups.ResetOffsetsInput) error { + return invoke(ctx, c, input, groups.ResetOffsets) } // Tail prints out a stream of the latest messages in a topic. @@ -589,6 +574,7 @@ func (c *CLIRunner) Tail( 10e3, 10e6, ) + stats, err := tailer.LogMessages(ctx, maxMessages, filterRegexp, raw, headers) filtered := filterRegexp != "" @@ -600,10 +586,7 @@ func (c *CLIRunner) Tail( } // GetACLs fetches the details of each acl in the cluster and prints out a summary. -func (c *CLIRunner) GetACLs( - ctx context.Context, - filter kafka.ACLFilter, -) error { +func (c *CLIRunner) GetACLs(ctx context.Context, filter kafka.ACLFilter) error { c.startSpinner() acls, err := c.adminClient.GetACLs(ctx, filter) @@ -629,8 +612,24 @@ func (c *CLIRunner) stopSpinner() { } } +type invokeFunc[T any] func(context.Context, *admin.Connector, T) error + +func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) error { + c.startSpinner() + + err := fn(ctx, c.adminClient.GetConnector(), v) + c.stopSpinner() + if err != nil { + return err + } + + c.printer("Success") + + return nil +} + func stringsToInts(strs []string) ([]int, error) { - ints := []int{} + ints := make([]int, 0, len(strs)) for _, str := range strs { nextInt, err := strconv.ParseInt(str, 10, 32) diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index b3b78439..fe7ae7fc 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -191,22 +191,57 @@ func GetMemberLags( return partitionLags, nil } +// DeleteOffsetsInput configures a call to [DeleteOffsets]. +type DeleteOffsetsInput struct { + GroupID string + Topic string + Partitions []int +} + +// DeleteOffsets removes a consumer group's offsets +// on the given topic-partition combinations. +func DeleteOffsets(ctx context.Context, connector *admin.Connector, input *DeleteOffsetsInput) error { + req := kafka.OffsetDeleteRequest{ + Addr: connector.KafkaClient.Addr, + GroupID: input.GroupID, + Topics: map[string][]int{input.Topic: input.Partitions}, + } + + resp, err := connector.KafkaClient.OffsetDelete(ctx, &req) + if err != nil { + return err + } + + var errs []error + + for _, results := range resp.Topics { + for _, result := range results { + if result.Error != nil { + errs = append(errs, result.Error) + } + } + } + + return errors.Join(errs...) +} + +// ResetOffsetsInput configures a call to [ResetOffsets]. +type ResetOffsetsInput struct { + GroupID string + Topic string + PartitionOffsets map[int]int64 +} + // ResetOffsets updates the offsets for a given topic / group combination. -func ResetOffsets( - ctx context.Context, - connector *admin.Connector, - topic string, - groupID string, - partitionOffsets map[int]int64, -) error { - consumerGroup, err := kafka.NewConsumerGroup( - kafka.ConsumerGroupConfig{ - ID: groupID, - Brokers: []string{connector.Config.BrokerAddr}, - Topics: []string{topic}, - Dialer: connector.Dialer, - }, - ) +func ResetOffsets(ctx context.Context, connector *admin.Connector, input *ResetOffsetsInput) error { + cfg := kafka.ConsumerGroupConfig{ + ID: input.GroupID, + Brokers: []string{connector.Config.BrokerAddr}, + Topics: []string{input.Topic}, + Dialer: connector.Dialer, + } + + consumerGroup, err := kafka.NewConsumerGroup(cfg) if err != nil { return err } @@ -216,33 +251,44 @@ func ResetOffsets( return err } - return generation.CommitOffsets( - map[string]map[int]int64{ - topic: partitionOffsets, - }, - ) + offsets := map[string]map[int]int64{ + input.Topic: input.PartitionOffsets, + } + + return generation.CommitOffsets(offsets) } -// GetEarliestorLatestOffset gets earliest/latest offset for a given topic partition for resetting offsets of consumer group -func GetEarliestOrLatestOffset( - ctx context.Context, - connector *admin.Connector, - topic string, - strategy string, - partition int, -) (int64, error) { - if strategy == EarliestResetOffsetsStrategy { - partitionBound, err := messages.GetPartitionBounds(ctx, connector, topic, partition, 0) - if err != nil { - return 0, err - } +// GetEarliestOrLatestOffsetInput configures a call to [GetEarliestOrLatestOffset]. +type GetEarliestOrLatestOffsetInput struct { + Connector *admin.Connector + Strategy string + Topic string + Partition int +} + +// GetEarliestorLatestOffset gets earliest/latest offset +// for a given topic partition for resetting offsets of consumer group. +func GetEarliestOrLatestOffset(ctx context.Context, input *GetEarliestOrLatestOffsetInput) (int64, error) { + if !isValidOffsetStrategy(input.Strategy) { + return 0, errors.New("Invalid reset offset strategy provided.") + } + + partitionBound, err := messages.GetPartitionBounds(ctx, input.Connector, input.Topic, input.Partition, 0) + if err != nil { + return 0, err + } + + switch input.Strategy { + case EarliestResetOffsetsStrategy: return partitionBound.FirstOffset, nil - } else if strategy == LatestResetOffsetsStrategy { - partitionBound, err := messages.GetPartitionBounds(ctx, connector, topic, partition, 0) - if err != nil { - return 0, err - } + case LatestResetOffsetsStrategy: return partitionBound.LastOffset, nil } - return 0, errors.New("Invalid reset offset strategy provided.") + + panic("impossible") +} + +func isValidOffsetStrategy(strategy string) bool { + return strategy == EarliestResetOffsetsStrategy || + strategy == LatestResetOffsetsStrategy } diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index a9198009..6dab4c00 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -331,11 +331,19 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { groupPartitions := groupDetails.Members[0].TopicPartitions[topicName] for _, partition := range groupPartitions { - offset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, partition) + input := GetEarliestOrLatestOffsetInput{ + Connector: connector, + Topic: topicName, + Partition: partition, + } + + input.Strategy = LatestResetOffsetsStrategy + offset, err := GetEarliestOrLatestOffset(ctx, &input) require.NoError(t, err) assert.Equal(t, int64(4), offset) - offset, err = GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, partition) + input.Strategy = EarliestResetOffsetsStrategy + offset, err = GetEarliestOrLatestOffset(ctx, &input) require.NoError(t, err) assert.Equal(t, int64(0), offset) } @@ -370,17 +378,16 @@ func TestResetOffsets(t *testing.T) { require.NoError(t, err) } - require.NoError(t, err) - err = ResetOffsets( - ctx, - connector, - topicName, - groupID, - map[int]int64{ + input := ResetOffsetsInput{ + Topic: topicName, + GroupID: groupID, + PartitionOffsets: map[int]int64{ 0: 2, 1: 1, }, - ) + } + + err = ResetOffsets(ctx, connector, &input) require.NoError(t, err) lags, err := GetMemberLags(ctx, connector, topicName, groupID) @@ -391,22 +398,37 @@ func TestResetOffsets(t *testing.T) { assert.Equal(t, int64(1), lags[1].MemberOffset) // latest offset of partition 0 - latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, LatestResetOffsetsStrategy, 0) + getInput := GetEarliestOrLatestOffsetInput{ + Connector: connector, + Topic: topicName, + Strategy: LatestResetOffsetsStrategy, + Partition: 0, + } + + latestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) require.NoError(t, err) + // earliest offset of partition 1 - earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, topicName, EarliestResetOffsetsStrategy, 1) + getInput = GetEarliestOrLatestOffsetInput{ + Connector: connector, + Topic: topicName, + Strategy: EarliestResetOffsetsStrategy, + Partition: 1, + } + + earliestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) require.NoError(t, err) - err = ResetOffsets( - ctx, - connector, - topicName, - groupID, - map[int]int64{ + resetInput := ResetOffsetsInput{ + Topic: topicName, + GroupID: groupID, + PartitionOffsets: map[int]int64{ 0: latestOffset, 1: earliestOffset, }, - ) + } + + err = ResetOffsets(ctx, connector, &resetInput) require.NoError(t, err) lags, err = GetMemberLags(ctx, connector, topicName, groupID) From 8c55bb4149109c603d1e8f49f54721a81b89f380 Mon Sep 17 00:00:00 2001 From: Kevin Gillette Date: Sat, 9 Dec 2023 20:25:47 -0700 Subject: [PATCH 2/4] Simplifications and lint fixes --- Dockerfile | 2 +- cmd/topicctl/subcmd/get.go | 9 +- cmd/topicctl/subcmd/rebalance.go | 9 +- cmd/topicctl/subcmd/tail.go | 15 --- go.mod | 2 +- pkg/admin/format.go | 98 +++++++----------- pkg/admin/types.go | 4 +- pkg/admin/zkclient.go | 116 +++++----------------- pkg/apply/apply.go | 18 ++-- pkg/apply/apply_test.go | 5 - pkg/apply/assigners/balancer_leader.go | 18 ++-- pkg/apply/assigners/cross_rack.go | 17 ++-- pkg/apply/assigners/cross_rack_test.go | 2 +- pkg/apply/assigners/single_rack.go | 14 +-- pkg/apply/assigners/static_single_rack.go | 14 +-- pkg/apply/extenders/balanced.go | 13 +-- pkg/apply/rebalancers/frequency.go | 28 +++--- pkg/check/check.go | 17 +--- pkg/cli/cli.go | 15 --- pkg/config/load_test.go | 64 ++++++------ pkg/util/durations.go | 19 ++-- pkg/util/progress.go | 3 +- pkg/util/terminal.go | 4 +- 23 files changed, 189 insertions(+), 317 deletions(-) diff --git a/Dockerfile b/Dockerfile index c1f66d92..c0490cfa 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM --platform=$BUILDPLATFORM golang:1.19 as builder +FROM --platform=$BUILDPLATFORM golang:1.21 as builder ENV SRC github.com/segmentio/topicctl ENV CGO_ENABLED=0 diff --git a/cmd/topicctl/subcmd/get.go b/cmd/topicctl/subcmd/get.go index 60e458a5..2c3ee11d 100644 --- a/cmd/topicctl/subcmd/get.go +++ b/cmd/topicctl/subcmd/get.go @@ -229,7 +229,7 @@ func partitionsCmd() *cobra.Command { Use: "partitions [optional: topics]", Short: "Get all partitions information for topics", Args: cobra.MinimumNArgs(0), - RunE: func(cmd *cobra.Command, args []string) error { + RunE: func(cmd *cobra.Command, topics []string) error { ctx := context.Background() sess := session.Must(session.NewSession()) @@ -239,11 +239,6 @@ func partitionsCmd() *cobra.Command { } defer adminClient.Close() - topics := []string{} - for _, arg := range args { - topics = append(topics, arg) - } - cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) return cliRunner.GetPartitions( ctx, @@ -264,7 +259,7 @@ func partitionsCmd() *cobra.Command { &partitionsConfig.summary, "summary", false, - fmt.Sprintf("Display summary of partitions"), + "Display summary of partitions", ) return partitionsCommand diff --git a/cmd/topicctl/subcmd/rebalance.go b/cmd/topicctl/subcmd/rebalance.go index 7da0096f..0158bb29 100644 --- a/cmd/topicctl/subcmd/rebalance.go +++ b/cmd/topicctl/subcmd/rebalance.go @@ -3,7 +3,6 @@ package subcmd import ( "context" "fmt" - "github.com/spf13/cobra" "os" "os/signal" "path/filepath" @@ -11,6 +10,8 @@ import ( "syscall" "time" + "github.com/spf13/cobra" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" @@ -102,6 +103,7 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { if err != nil { return err } + ctx = context.WithValue(ctx, "progress", rebalanceCtxStruct) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -147,11 +149,10 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { } // iterate through each topic config and initiate rebalance - topicConfigs := []config.TopicConfig{} topicErrorDict := make(map[string]error) for _, topicFile := range topicFiles { // do not consider invalid topic yaml files for rebalance - topicConfigs, err = config.LoadTopicsFile(topicFile) + topicConfigs, err := config.LoadTopicsFile(topicFile) if err != nil { log.Errorf("Invalid topic yaml file: %s", topicFile) continue @@ -205,8 +206,8 @@ func rebalanceRun(cmd *cobra.Command, args []string) error { errorTopics += 1 log.Errorf("topic: %s rebalance failed with error: %v", thisTopicName, thisTopicError) } else { - log.Infof("topic: %s rebalance is successful", thisTopicName) successTopics += 1 + log.Infof("topic: %s rebalance is successful", thisTopicName) } } diff --git a/cmd/topicctl/subcmd/tail.go b/cmd/topicctl/subcmd/tail.go index 555727ba..67522fdd 100644 --- a/cmd/topicctl/subcmd/tail.go +++ b/cmd/topicctl/subcmd/tail.go @@ -4,7 +4,6 @@ import ( "context" "os" "os/signal" - "strconv" "syscall" "github.com/segmentio/kafka-go" @@ -99,17 +98,3 @@ func tailRun(cmd *cobra.Command, args []string) error { tailConfig.headers, ) } - -func stringsToInts(strs []string) ([]int, error) { - ints := []int{} - - for _, str := range strs { - nextInt, err := strconv.ParseInt(str, 10, 32) - if err != nil { - return nil, err - } - ints = append(ints, int(nextInt)) - } - - return ints, nil -} diff --git a/go.mod b/go.mod index 083f473e..9f24353c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/segmentio/topicctl -go 1.18 +go 1.21 require ( github.com/aws/aws-sdk-go v1.44.208 diff --git a/pkg/admin/format.go b/pkg/admin/format.go index 03921237..f0a1316f 100644 --- a/pkg/admin/format.go +++ b/pkg/admin/format.go @@ -2,9 +2,11 @@ package admin import ( "bytes" + "cmp" "fmt" "math" "reflect" + "slices" "sort" "strings" "time" @@ -537,27 +539,25 @@ func FormatTopicsPartitionsSummary( }, ) - topicNames := []string{} - tableData := make(map[string][][]string) + topicNames := make([]string, 0, len(topicsPartitionsStatusSummary)) + tableData := make(map[string][][]string, len(topicsPartitionsStatusSummary)) + for topicName, partitionsStatusSummary := range topicsPartitionsStatusSummary { - topicTableRows := [][]string{} + topicTableRows := make([][]string, 0, len(partitionsStatusSummary)) for partitionStatus, partitionStatusIDs := range partitionsStatusSummary { topicTableRows = append(topicTableRows, []string{ - fmt.Sprintf("%s", topicName), - fmt.Sprintf("%s", partitionStatus), - fmt.Sprintf("%d", len(partitionStatusIDs)), + topicName, + string(partitionStatus), + fmt.Sprint(len(partitionStatusIDs)), fmt.Sprintf("%+v", partitionStatusIDs), }) } - // sort the topicTableRows by partitionStatus - statusSort := func(i, j int) bool { - // second element in the row is of type PartitionStatus - return string(topicTableRows[i][1]) < string(topicTableRows[j][1]) - } - - sort.Slice(topicTableRows, statusSort) + // sort the topicTableRows by partitionStatus: + // second element in the row is PartitionStatus + statusSort := func(x, y []string) int { return cmp.Compare(x[1], y[1]) } + slices.SortFunc(topicTableRows, statusSort) tableData[topicName] = topicTableRows topicNames = append(topicNames, topicName) @@ -619,27 +619,29 @@ func FormatTopicsPartitions( }, ) - topicNames := []string{} + inTerminal := util.InTerminal() + topicNames := make([]string, 0, len(topicsPartitionsStatusInfo)) brokerRacks := BrokerRacks(brokers) tableData := make(map[string][][]string) + for topicName, partitionsStatusInfo := range topicsPartitionsStatusInfo { topicTableRows := [][]string{} for _, partitionStatusInfo := range partitionsStatusInfo { racks := partitionStatusInfo.Racks(brokerRacks) - distinctRacks := make(map[string]int) + distinctRacks := make(map[string]int, len(racks)) for _, rack := range racks { distinctRacks[rack] += 1 } - partitionIsrs := []int{} - for _, partitionStatusIsr := range partitionStatusInfo.Partition.Isr { - partitionIsrs = append(partitionIsrs, partitionStatusIsr.ID) + partitionISRs := make([]int, len(partitionStatusInfo.Partition.Isr)) + for i, isr := range partitionStatusInfo.Partition.Isr { + partitionISRs[i] = isr.ID } - partitionReplicas := []int{} - for _, partitionReplica := range partitionStatusInfo.Partition.Replicas { - partitionReplicas = append(partitionReplicas, partitionReplica.ID) + partitionReplicas := make([]int, len(partitionStatusInfo.Partition.Replicas)) + for i, replica := range partitionStatusInfo.Partition.Replicas { + partitionReplicas[i] = replica.ID } inSync := true @@ -652,36 +654,30 @@ func FormatTopicsPartitions( correctLeader = false } - var statusPrinter func(f string, a ...interface{}) string - if !util.InTerminal() || inSync { - statusPrinter = fmt.Sprintf - } else if !inSync { - statusPrinter = color.New(color.FgRed).SprintfFunc() + statusPrinter := fmt.Sprint + if inTerminal && !inSync { + statusPrinter = color.New(color.FgRed).Sprint } - var statePrinter func(f string, a ...interface{}) string - if !util.InTerminal() || correctLeader { - statePrinter = fmt.Sprintf - } else if !correctLeader { - statePrinter = color.New(color.FgCyan).SprintfFunc() + statePrinter := fmt.Sprintf + if inTerminal && !correctLeader { + statePrinter = color.New(color.FgCyan).Sprintf } - leaderStateString := fmt.Sprintf("%d", partitionStatusInfo.Partition.Leader.ID) + leaderStateString := fmt.Sprint(partitionStatusInfo.Partition.Leader.ID) if !correctLeader { - leaderStateString = fmt.Sprintf("%d %+v", partitionStatusInfo.Partition.Leader.ID, - statePrinter("(%s)", string(partitionStatusInfo.LeaderState)), - ) + leaderStateString += " " + statePrinter("(%s)", partitionStatusInfo.LeaderState) } topicTableRows = append(topicTableRows, []string{ - fmt.Sprintf("%s", topicName), - fmt.Sprintf("%d", partitionStatusInfo.Partition.ID), + topicName, + fmt.Sprint(partitionStatusInfo.Partition.ID), leaderStateString, - fmt.Sprintf("%+v", partitionIsrs), + fmt.Sprintf("%+v", partitionISRs), fmt.Sprintf("%+v", partitionReplicas), - fmt.Sprintf("%d", len(distinctRacks)), + fmt.Sprint(len(distinctRacks)), fmt.Sprintf("%+v", racks), - fmt.Sprintf("%v", statusPrinter("%s", string(partitionStatusInfo.Status))), + statusPrinter(partitionStatusInfo.Status), }) } @@ -907,11 +903,12 @@ func FormatBrokerMaxPartitions( maxPartitionsPerBroker := MaxPartitionsPerBroker(curr, desired) finalPartitionsPerBroker := MaxPartitionsPerBroker(desired) - maxCount := maxInts( + maxCount := max( maxMapValues(startPartitionsPerBroker), maxMapValues(maxPartitionsPerBroker), maxMapValues(finalPartitionsPerBroker), ) + maxCountWidth := maxValueToMaxWidth(maxCount) for _, broker := range brokers { @@ -1152,25 +1149,6 @@ func maxValueToMaxWidth(maxValue int) int { return int(math.Log10(float64(maxValue))) + 1 } -func maxInt(a, b int) int { - if a > b { - return a - } - return b -} - -func maxInts(values ...int) int { - var maxValue int - - for v, value := range values { - if v == 0 || value > maxValue { - maxValue = value - } - } - - return maxValue -} - func maxMapValues(inputMap map[int]int) int { var maxValue int currIndex := 0 diff --git a/pkg/admin/types.go b/pkg/admin/types.go index d55a0e85..60b6c8ad 100644 --- a/pkg/admin/types.go +++ b/pkg/admin/types.go @@ -1166,13 +1166,13 @@ func GetTopicsPartitionsStatusInfo( partition.Leader.ID = -1 } - for i, _ := range partition.Isr { + for i := range partition.Isr { if partition.Isr[i].Host == "" && partition.Isr[i].Port == 0 { partition.Isr[i].ID = -1 } } - for i, _ := range partition.Replicas { + for i := range partition.Replicas { if partition.Replicas[i].Host == "" && partition.Replicas[i].Port == 0 { partition.Replicas[i].ID = -1 } diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 8f7f4d9d..996b594b 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -65,10 +65,7 @@ type ZKAdminClientConfig struct { } // NewZKAdminClient creates and returns a new Client instance. -func NewZKAdminClient( - ctx context.Context, - config ZKAdminClientConfig, -) (*ZKAdminClient, error) { +func NewZKAdminClient(ctx context.Context, config ZKAdminClientConfig) (*ZKAdminClient, error) { zkClient, err := zk.NewPooledClient( config.ZKAddrs, time.Minute, @@ -87,9 +84,8 @@ func NewZKAdminClient( if !strings.HasPrefix(zkPrefix, "/") { zkPrefix = fmt.Sprintf("/%s", zkPrefix) } - if strings.HasSuffix(zkPrefix, "/") { - zkPrefix = zkPrefix[:len(zkPrefix)-1] - } + + zkPrefix = strings.TrimSuffix(zkPrefix, "/") } client := &ZKAdminClient{ @@ -133,20 +129,17 @@ func NewZKAdminClient( } client.bootstrapAddrs = bootstrapAddrs - client.Connector, err = NewConnector( - ConnectorConfig{ - BrokerAddr: bootstrapAddrs[0], - }, - ) + client.Connector, err = NewConnector(ConnectorConfig{BrokerAddr: bootstrapAddrs[0]}) + if err != nil { + return nil, err + } return client, nil } // GetClusterID gets the cluster ID from zookeeper. This ID is generated when the cluster is // created and should be stable over the life of the cluster. -func (c *ZKAdminClient) GetClusterID( - ctx context.Context, -) (string, error) { +func (c *ZKAdminClient) GetClusterID(ctx context.Context) (string, error) { zkClusterIDPath := c.zNode(clusterIDPath) zkClusterIDObj := zkClusterID{} @@ -160,10 +153,7 @@ func (c *ZKAdminClient) GetClusterID( // GetBrokers gets information on one or more cluster brokers from zookeeper. // If the argument ids is unset, then it fetches all brokers. -func (c *ZKAdminClient) GetBrokers( - ctx context.Context, - ids []int, -) ([]BrokerInfo, error) { +func (c *ZKAdminClient) GetBrokers(ctx context.Context, ids []int) ([]BrokerInfo, error) { // TODO (maybe): Use Kafka API instead of ZK to get broker info var brokerIDs []int var err error @@ -302,11 +292,7 @@ func (c *ZKAdminClient) GetConnector() *Connector { // If the argument names is unset, then it fetches all topics. The detailed // parameter determines whether the ISRs and leaders are fetched for each // partition. -func (c *ZKAdminClient) GetTopics( - ctx context.Context, - names []string, - detailed bool, -) ([]TopicInfo, error) { +func (c *ZKAdminClient) GetTopics(ctx context.Context, names []string, detailed bool) ([]TopicInfo, error) { var topicNames []string var err error @@ -349,7 +335,7 @@ func (c *ZKAdminClient) GetTopics( } } - poolSize := minInt(len(topicNames), maxPoolSize) + poolSize := min(len(topicNames), maxPoolSize) for i := 0; i < poolSize; i++ { go func() { @@ -422,17 +408,11 @@ func (c *ZKAdminClient) GetTopic( return topics[0], nil } -func (c *ZKAdminClient) GetACLs( - ctx context.Context, - filter kafka.ACLFilter, -) ([]ACLInfo, error) { +func (c *ZKAdminClient) GetACLs(ctx context.Context, filter kafka.ACLFilter) ([]ACLInfo, error) { return nil, errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } -func (c *ZKAdminClient) CreateACLs( - ctx context.Context, - acls []kafka.ACLEntry, -) error { +func (c *ZKAdminClient) CreateACLs(ctx context.Context, acls []kafka.ACLEntry) error { return errors.New("ACLs not yet supported with zk access mode; omit zk addresses to fix.") } @@ -443,10 +423,7 @@ func (c *ZKAdminClient) GetUsers( return nil, errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.") } -func (c *ZKAdminClient) UpsertUser( - ctx context.Context, - user kafka.UserScramCredentialsUpsertion, -) error { +func (c *ZKAdminClient) UpsertUser(ctx context.Context, user kafka.UserScramCredentialsUpsertion) error { return errors.New("Users not yet supported with zk access mode; omit zk addresses to fix.") } @@ -589,10 +566,7 @@ func (c *ZKAdminClient) UpdateBrokerConfig( // CreateTopic creates a new topic with the argument config. It uses // the topic creation API exposed on the controller broker. -func (c *ZKAdminClient) CreateTopic( - ctx context.Context, - config kafka.TopicConfig, -) error { +func (c *ZKAdminClient) CreateTopic(ctx context.Context, config kafka.TopicConfig) error { if c.readOnly { return errors.New("Cannot create topic in read-only mode") } @@ -616,11 +590,7 @@ func (c *ZKAdminClient) CreateTopic( // AssignPartitions notifies the cluster to begin a partition reassignment. // This should only be used for existing partitions; to create new partitions, // use the AddPartitions method. -func (c *ZKAdminClient) AssignPartitions( - ctx context.Context, - topic string, - assignments []PartitionAssignment, -) error { +func (c *ZKAdminClient) AssignPartitions(ctx context.Context, topic string, assignments []PartitionAssignment) error { if c.readOnly { return errors.New("Cannot assign partitions in read-only mode") } @@ -658,11 +628,7 @@ func (c *ZKAdminClient) AssignPartitions( // AddPartitions adds one or more partitions to an existing topic. Unlike // AssignPartitions, this directly updates the topic's partition config in // zookeeper. -func (c *ZKAdminClient) AddPartitions( - ctx context.Context, - topic string, - newAssignments []PartitionAssignment, -) error { +func (c *ZKAdminClient) AddPartitions(ctx context.Context, topic string, newAssignments []PartitionAssignment) error { if c.readOnly { return errors.New("Cannot add partitions in read-only mode") } @@ -714,11 +680,7 @@ func (c *ZKAdminClient) AddPartitions( // RunLeaderElection triggers a leader election for the argument // topic and partitions. -func (c *ZKAdminClient) RunLeaderElection( - ctx context.Context, - topic string, - partitions []int, -) error { +func (c *ZKAdminClient) RunLeaderElection(ctx context.Context, topic string, partitions []int) error { if c.readOnly { return errors.New("Cannot run leader election in read-only mode") } @@ -755,18 +717,12 @@ func (c *ZKAdminClient) RunLeaderElection( // AcquireLock acquires and returns a lock from the underlying zookeeper client. // The Unlock method should be called on the lock when it's safe to release. -func (c *ZKAdminClient) AcquireLock( - ctx context.Context, - path string, -) (zk.Lock, error) { +func (c *ZKAdminClient) AcquireLock(ctx context.Context, path string) (zk.Lock, error) { return c.zkClient.AcquireLock(ctx, path) } // LockHeld determines whether the lock with the provided path is held (i.e., has children). -func (c *ZKAdminClient) LockHeld( - ctx context.Context, - path string, -) (bool, error) { +func (c *ZKAdminClient) LockHeld(ctx context.Context, path string) (bool, error) { exists, _, err := c.zkClient.Exists(ctx, path) if err != nil { return false, err @@ -803,9 +759,7 @@ func (c *ZKAdminClient) Close() error { // getControllerAddr gets the address of the cluster controller. This is needed // for creating new topics and other operations. -func (c *ZKAdminClient) getControllerAddr( - ctx context.Context, -) (string, error) { +func (c *ZKAdminClient) getControllerAddr(ctx context.Context) (string, error) { conn, err := kafka.DefaultDialer.DialContext(ctx, "tcp", c.bootstrapAddrs[0]) if err != nil { return "", err @@ -820,11 +774,7 @@ func (c *ZKAdminClient) getControllerAddr( return fmt.Sprintf("%s:%d", broker.Host, broker.Port), nil } -func (c *ZKAdminClient) getTopic( - ctx context.Context, - name string, - detailed bool, -) (TopicInfo, error) { +func (c *ZKAdminClient) getTopic(ctx context.Context, name string, detailed bool) (TopicInfo, error) { log.Debugf("Getting info for topic %s", name) topicInfo := TopicInfo{ @@ -884,7 +834,7 @@ func (c *ZKAdminClient) getTopic( } } - poolSize := minInt(len(zkTopicInfo.Partitions), maxPoolSize) + poolSize := min(len(zkTopicInfo.Partitions), maxPoolSize) for i := 0; i < poolSize; i++ { go func() { @@ -969,9 +919,7 @@ func (c *ZKAdminClient) getPartition( } // assignmentInProgress returns whether the zk assignment node exists. -func (c *ZKAdminClient) assignmentInProgress( - ctx context.Context, -) (bool, error) { +func (c *ZKAdminClient) assignmentInProgress(ctx context.Context) (bool, error) { exists, _, err := c.zkClient.Exists( ctx, c.zNode(assignmentPath), @@ -995,10 +943,7 @@ func (c *ZKAdminClient) zNode(elements ...string) string { return filepath.Join("/", c.zkPrefix, joinedElements) } -func (c *ZKAdminClient) getInstances( - ctx context.Context, - ips []string, -) (map[string]ec2.Instance, error) { +func (c *ZKAdminClient) getInstances(ctx context.Context, ips []string) (map[string]ec2.Instance, error) { instancesMap := map[string]ec2.Instance{} if c.sess == nil { @@ -1045,13 +990,6 @@ func (c *ZKAdminClient) getInstances( return instancesMap, nil } -func minInt(a, b int) int { - if a < b { - return a - } - return b -} - func updateConfig( configMap map[string]interface{}, configEntries []kafka.ConfigEntry, @@ -1108,9 +1046,7 @@ func updateConfig( return updatedKeys, nil } -func (c *ZKAdminClient) GetAllTopicsMetadata( - ctx context.Context, -) (*kafka.MetadataResponse, error) { +func (c *ZKAdminClient) GetAllTopicsMetadata(ctx context.Context) (*kafka.MetadataResponse, error) { client := c.GetConnector().KafkaClient req := kafka.MetadataRequest{ Topics: nil, diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index 6d55048a..54e63091 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -863,14 +863,9 @@ func (t *TopicApplier) updatePlacementRunner( numRounds := (len(assignmentsToUpdate) + batchSize - 1) / batchSize // Ceil() with integer math highlighter := color.New(color.FgYellow, color.Bold).SprintfFunc() for i, round := 0, 1; i < len(assignmentsToUpdate); i, round = i+batchSize, round+1 { - end := i + batchSize + end := min(i+batchSize, len(assignmentsToUpdate)) + roundLabel := highlighter("%d of %d", round, numRounds) - if end > len(assignmentsToUpdate) { - end = len(assignmentsToUpdate) - } - - var roundLabel string // "x of y" used to mark progress in balancing rounds - roundLabel = highlighter("%d of %d", round, numRounds) log.Infof( "Balancing round %s", roundLabel, @@ -994,6 +989,9 @@ func (t *TopicApplier) updatePartitionsIteration( outerLoop: for { select { + case <-ctx.Done(): + return ctx.Err() + case <-checkTimer.C: log.Infof("Checking if all partitions in topic %s are properly replicated...", highlighter(t.topicName)) @@ -1035,7 +1033,7 @@ outerLoop: } if len(notReady) == 0 { - elapsed := time.Now().Sub(roundStartTime) + elapsed := time.Since(roundStartTime) log.Infof("Partition(s) %+v looks good, continuing (last round duration: %s)", idsToUpdate, highlighter("%.1fs", float64(elapsed)/1000000000), // time.Duration is int64 nanoseconds @@ -1053,13 +1051,11 @@ outerLoop: var roundString string // convert to " (round x of y)" if roundLabel is present if roundLabel != "" { - roundString = fmt.Sprintf(" (current round %s, %+v elapsed)", roundLabel, time.Now().Sub(roundStartTime)) + roundString = fmt.Sprintf(" (current round %s, %+v elapsed)", roundLabel, time.Since(roundStartTime)) } else { roundString = roundLabel } log.Infof("Sleeping for %s%s", t.config.SleepLoopDuration.String(), roundString) - case <-ctx.Done(): - return ctx.Err() } } diff --git a/pkg/apply/apply_test.go b/pkg/apply/apply_test.go index 5f8e0b88..fae17912 100644 --- a/pkg/apply/apply_test.go +++ b/pkg/apply/apply_test.go @@ -2,7 +2,6 @@ package apply import ( "context" - "fmt" "testing" "time" @@ -900,10 +899,6 @@ func TestApplyOverrides(t *testing.T) { assert.Equal(t, applier.maxBatchSize, 8) } -func testTopicName(name string) string { - return util.RandomString(fmt.Sprintf("topic-%s-", name), 6) -} - func testApplier( ctx context.Context, t *testing.T, diff --git a/pkg/apply/assigners/balancer_leader.go b/pkg/apply/assigners/balancer_leader.go index e026a6e7..58cfd6e0 100644 --- a/pkg/apply/assigners/balancer_leader.go +++ b/pkg/apply/assigners/balancer_leader.go @@ -18,15 +18,15 @@ import ( // // The algorithm currently used is as follows: // -// while not balanced: -// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented) -// improve balance by doing a single leader replacement: -// use the picker to rank the partitions that have an overrepresented leader -// for each leader: -// for each partition with the leader: -// swap the leader with one of its followers if possible, then stop -// otherwise, use the picker to replace the leader in the top-ranked partition with -// a new broker from the target rack +// while not balanced: +// find racks with fewest and most leaders (i.e., the overrepresented and underrepresented) +// improve balance by doing a single leader replacement: +// use the picker to rank the partitions that have an overrepresented leader +// for each leader: +// for each partition with the leader: +// swap the leader with one of its followers if possible, then stop +// otherwise, use the picker to replace the leader in the top-ranked partition with +// a new broker from the target rack type BalancedLeaderAssigner struct { brokers []admin.BrokerInfo racks []string diff --git a/pkg/apply/assigners/cross_rack.go b/pkg/apply/assigners/cross_rack.go index 73066350..5dbe8b9a 100644 --- a/pkg/apply/assigners/cross_rack.go +++ b/pkg/apply/assigners/cross_rack.go @@ -2,9 +2,10 @@ package assigners import ( "fmt" + "sort" + "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply/pickers" - "sort" ) // CrossRackAssigner is an assigner that ensures that the replicas of each @@ -13,16 +14,18 @@ import ( // https://segment.atlassian.net/browse/DRES-922?focusedCommentId=237288 // // for each partition: -// for each non-leader replica: -// if replica is in same rack as leader: -// change replica to a placeholder (-1) +// +// for each non-leader replica: +// if replica is in same rack as leader: +// change replica to a placeholder (-1) // // then: // // for each partition: -// for each non-leader replica: -// if replica is set to placeholder: -// use picker to replace it with a broker in a different rack than the leader and any other replicas +// +// for each non-leader replica: +// if replica is set to placeholder: +// use picker to replace it with a broker in a different rack than the leader and any other replicas // // Note that this assigner doesn't make any leader changes. Thus, the assignments // need to already be leader balanced before we make the changes with this assigner. diff --git a/pkg/apply/assigners/cross_rack_test.go b/pkg/apply/assigners/cross_rack_test.go index f4c65865..f3eb6227 100644 --- a/pkg/apply/assigners/cross_rack_test.go +++ b/pkg/apply/assigners/cross_rack_test.go @@ -245,4 +245,4 @@ func TestCrossRackAssignerTwoReplicas(t *testing.T) { for _, testCase := range testCases { testCase.evaluate(t, assigner) } -} \ No newline at end of file +} diff --git a/pkg/apply/assigners/single_rack.go b/pkg/apply/assigners/single_rack.go index 917db950..1da6a775 100644 --- a/pkg/apply/assigners/single_rack.go +++ b/pkg/apply/assigners/single_rack.go @@ -11,16 +11,18 @@ import ( // partition are in the same rack as the leader. The algorithm is: // // for each partition: -// for each non-leader replica: -// if replica not in same rack as leader: -// change replica to a placeholder (-1) +// +// for each non-leader replica: +// if replica not in same rack as leader: +// change replica to a placeholder (-1) // // then: // // for each partition: -// for each non-leader replica: -// if replica is set to placeholder: -// use picker to replace it with a broker in the target rack +// +// for each non-leader replica: +// if replica is set to placeholder: +// use picker to replace it with a broker in the target rack // // Note that this assigner doesn't make any leader changes. Thus, the assignments // need to already be leader balanced before we make the changes with this assigner. diff --git a/pkg/apply/assigners/static_single_rack.go b/pkg/apply/assigners/static_single_rack.go index 1b1ddc99..d506dc22 100644 --- a/pkg/apply/assigners/static_single_rack.go +++ b/pkg/apply/assigners/static_single_rack.go @@ -14,16 +14,18 @@ import ( // The following algorithm is used: // // for each partition: -// for each replica: -// if replica not in the desired (static) rack: -// change the replica to a placeholder (-1) +// +// for each replica: +// if replica not in the desired (static) rack: +// change the replica to a placeholder (-1) // // then: // // for each partition: -// for each replica: -// if replica set to the placeholder: -// use picker to pick a broker from the set of all brokers in the target rack +// +// for each replica: +// if replica set to the placeholder: +// use picker to pick a broker from the set of all brokers in the target rack // // In the case of ties, the lowest indexed broker is picked (if randomize is false) or // a repeatably random choice (if randomize is true). diff --git a/pkg/apply/extenders/balanced.go b/pkg/apply/extenders/balanced.go index e94641d6..6b4c5963 100644 --- a/pkg/apply/extenders/balanced.go +++ b/pkg/apply/extenders/balanced.go @@ -12,12 +12,13 @@ import ( // algorithm is: // // for each new partition: -// set the leader rack to the next rack in the cycle -// choose the leader using the picker -// for each follower: -// set the rack to either the same one as the leader (if inRack true) or the next one in the -// cycle (if inRack false) -// pick the follower using the picker +// +// set the leader rack to the next rack in the cycle +// choose the leader using the picker +// for each follower: +// set the rack to either the same one as the leader (if inRack true) or the next one in the +// cycle (if inRack false) +// pick the follower using the picker type BalancedExtender struct { brokers []admin.BrokerInfo inRack bool diff --git a/pkg/apply/rebalancers/frequency.go b/pkg/apply/rebalancers/frequency.go index 142e6501..3f9ff089 100644 --- a/pkg/apply/rebalancers/frequency.go +++ b/pkg/apply/rebalancers/frequency.go @@ -13,24 +13,24 @@ import ( // FrequencyRebalancer is a Rebalancer that rebalances to achieve in-topic balance among // all available brokers. The algorithm used is: // -// for each replica position index: -// while true: -// get counts for each broker in that position -// partition the set of brokers into two sets, a "lower" one and an "upper one", based on -// on the sorted frequency counts (with brokers to be removed treated as the highest -// frequencies) -// for each lower broker, upper broker combination: -// try to replace the upper broker with the lower one -// if replacement made, continue to next while loop iteration -// if no replacement made, break out of while loop, continue to next partition index +// for each replica position index: +// while true: +// get counts for each broker in that position +// partition the set of brokers into two sets, a "lower" one and an "upper one", based on +// on the sorted frequency counts (with brokers to be removed treated as the highest +// frequencies) +// for each lower broker, upper broker combination: +// try to replace the upper broker with the lower one +// if replacement made, continue to next while loop iteration +// if no replacement made, break out of while loop, continue to next partition index // // Replacements are made if: // -// 1. The replacement improves the broker balance for the index OR -// the replacement improves the broker balance for the topic as a whole +// 1. The replacement improves the broker balance for the index OR +// the replacement improves the broker balance for the topic as a whole // AND -// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced -// leaders, in-rack, etc.) +// 2. The replacement is consistent with the placement strategy for the topic (e.g., balanced +// leaders, in-rack, etc.) // // The picker passed in to the rebalancer is used to sort the partitions for each broker (if it // appears more than once for the current index) and also to break ties when sorting and diff --git a/pkg/check/check.go b/pkg/check/check.go index 7b7463c6..b48f392a 100644 --- a/pkg/check/check.go +++ b/pkg/check/check.go @@ -3,7 +3,7 @@ package check import ( "context" "fmt" - "sort" + "slices" "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/config" @@ -101,17 +101,10 @@ func CheckTopic(ctx context.Context, config CheckConfig) (TopicCheckResults, err if len(diffKeys) == 0 && len(missingKeys) == 0 { results.UpdateLastResult(true, "") } else { - combinedKeys := []string{} - for _, diffKey := range diffKeys { - combinedKeys = append(combinedKeys, diffKey) - } - for _, missingKey := range missingKeys { - combinedKeys = append(combinedKeys, missingKey) - } - - sort.Slice(combinedKeys, func(a, b int) bool { - return combinedKeys[a] < combinedKeys[b] - }) + combinedKeys := make([]string, 0, len(diffKeys)+len(missingKeys)) + combinedKeys = append(combinedKeys, diffKeys...) + combinedKeys = append(combinedKeys, missingKeys...) + slices.Sort(combinedKeys) results.UpdateLastResult( false, diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index bd3134bb..0684208e 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -7,7 +7,6 @@ import ( "path/filepath" "regexp" "sort" - "strconv" "strings" "time" @@ -627,17 +626,3 @@ func invoke[T any](ctx context.Context, c *CLIRunner, v T, fn invokeFunc[T]) err return nil } - -func stringsToInts(strs []string) ([]int, error) { - ints := make([]int, 0, len(strs)) - - for _, str := range strs { - nextInt, err := strconv.ParseInt(str, 10, 32) - if err != nil { - return nil, err - } - ints = append(ints, int(nextInt)) - } - - return ints, nil -} diff --git a/pkg/config/load_test.go b/pkg/config/load_test.go index f784e3f3..c0011c6f 100644 --- a/pkg/config/load_test.go +++ b/pkg/config/load_test.go @@ -57,49 +57,47 @@ func TestLoadTopicsFile(t *testing.T) { require.NoError(t, err) topicConfig.SetDefaults() - assert.Equal( - t, - TopicConfig{ - Meta: TopicMeta{ - Name: "topic-test", - Cluster: "test-cluster", - Region: "test-region", - Environment: "test-env", - Description: "Test topic\n", + want := TopicConfig{ + Meta: TopicMeta{ + Name: "topic-test", + Cluster: "test-cluster", + Region: "test-region", + Environment: "test-env", + Description: "Test topic\n", + }, + Spec: TopicSpec{ + Partitions: 9, + ReplicationFactor: 2, + RetentionMinutes: 100, + PlacementConfig: TopicPlacementConfig{ + Strategy: PlacementStrategyInRack, + Picker: PickerMethodRandomized, }, - Spec: TopicSpec{ - Partitions: 9, - ReplicationFactor: 2, - RetentionMinutes: 100, - PlacementConfig: TopicPlacementConfig{ - Strategy: PlacementStrategyInRack, - Picker: PickerMethodRandomized, - }, - MigrationConfig: &TopicMigrationConfig{ - PartitionBatchSize: 1, - }, - Settings: TopicSettings{ - "cleanup.policy": "compact", - "follower.replication.throttled.replicas": []interface{}{ - "1:3", - "4:5", - }, - "max.compaction.lag.ms": 12345.0, + MigrationConfig: &TopicMigrationConfig{ + PartitionBatchSize: 1, + }, + Settings: TopicSettings{ + "cleanup.policy": "compact", + "follower.replication.throttled.replicas": []interface{}{ + "1:3", + "4:5", }, + "max.compaction.lag.ms": 12345.0, }, }, - topicConfig, - ) + } + + assert.Equal(t, want, topicConfig) assert.NoError(t, topicConfig.Validate(3)) topicConfigs, err = LoadTopicsFile("testdata/test-cluster/topics/topic-test-invalid.yaml") - assert.Equal(t, 1, len(topicConfigs)) - topicConfig = topicConfigs[0] require.NoError(t, err) - assert.Error(t, topicConfig.Validate(3)) + require.Equal(t, 1, len(topicConfigs)) + assert.Error(t, topicConfigs[0].Validate(3)) topicConfigs, err = LoadTopicsFile("testdata/test-cluster/topics/topic-test-multi.yaml") - assert.Equal(t, 2, len(topicConfigs)) + require.NoError(t, err) + require.Equal(t, 2, len(topicConfigs)) assert.Equal(t, "topic-test1", topicConfigs[0].Meta.Name) assert.Equal(t, "topic-test2", topicConfigs[1].Meta.Name) } diff --git a/pkg/util/durations.go b/pkg/util/durations.go index 73447e6d..6257418b 100644 --- a/pkg/util/durations.go +++ b/pkg/util/durations.go @@ -5,8 +5,8 @@ import ( "time" ) -// PrettyDuration returns a human-formatted duration string given an golang -// duration value. +// PrettyDuration returns a human-formatted duration string +// given a Go time.Duration value. func PrettyDuration(duration time.Duration) string { seconds := duration.Seconds() @@ -33,17 +33,18 @@ func PrettyRate(count int64, duration time.Duration) string { ratePerMin := float64(count) / duration.Minutes() ratePerHour := float64(count) / duration.Hours() - if ratePerSec >= 10.0 { + switch { + case ratePerSec >= 10.0: return fmt.Sprintf("%d/sec", int(ratePerSec)) - } else if ratePerSec >= 1.0 { + case ratePerSec >= 1.0: return fmt.Sprintf("%0.1f/sec", ratePerSec) - } else if ratePerMin >= 10.0 { + case ratePerMin >= 10.0: return fmt.Sprintf("%d/min", int(ratePerMin)) - } else if ratePerMin >= 1.0 { + case ratePerMin >= 1.0: return fmt.Sprintf("%0.1f/min", ratePerMin) - } else if ratePerHour >= 0.1 { + case ratePerHour >= 0.1: return fmt.Sprintf("%0.1f/hour", ratePerHour) - } else { - return fmt.Sprintf("~0") } + + return "~0" } diff --git a/pkg/util/progress.go b/pkg/util/progress.go index ee559fc2..0c888a9b 100644 --- a/pkg/util/progress.go +++ b/pkg/util/progress.go @@ -2,8 +2,9 @@ package util import ( "context" - log "github.com/sirupsen/logrus" "time" + + log "github.com/sirupsen/logrus" ) // Rebalance topic progress Config diff --git a/pkg/util/terminal.go b/pkg/util/terminal.go index a0358eb0..da2b5638 100644 --- a/pkg/util/terminal.go +++ b/pkg/util/terminal.go @@ -3,12 +3,12 @@ package util import ( "os" - "golang.org/x/crypto/ssh/terminal" + "golang.org/x/term" ) // InTerminal determines whether we're running in a terminal or not. // // Implementation from https://rosettacode.org/wiki/Check_output_device_is_a_terminal#Go. func InTerminal() bool { - return terminal.IsTerminal(int(os.Stdout.Fd())) + return term.IsTerminal(int(os.Stdout.Fd())) } From ec7a9a9ebac28b46a1a31cf9f568fbd0931c30c2 Mon Sep 17 00:00:00 2001 From: Kevin Gillette Date: Tue, 12 Dec 2023 13:32:44 -0700 Subject: [PATCH 3/4] reset-offsets: support --before-earliest and --after-latest --- README.md | 65 +++++++++-- cmd/topicctl/subcmd/reset.go | 216 ++++++++++++++++++++++++----------- pkg/cli/cli.go | 13 +-- pkg/groups/format.go | 33 +++++- pkg/groups/groups.go | 78 +++++++------ pkg/groups/groups_test.go | 27 +++-- pkg/groups/types.go | 4 +- pkg/messages/bounds.go | 7 +- 8 files changed, 309 insertions(+), 134 deletions(-) diff --git a/README.md b/README.md index 7e77dcc6..4ea7a112 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,7 @@ docker-compose up -d 3. Apply the topic configs in [`examples/local-cluster/topics`](/examples/local-cluster/topics): ``` -topicctl apply --skip-confirm examples/local-cluster/topics/*yaml +topicctl apply --skip-confirm examples/local-cluster/topics/*.yaml ``` 4. Send some test messages to the `topic-default` topic: @@ -205,13 +205,62 @@ subcommands interactively. topicctl reset-offsets [topic] [group] [flags] ``` -The `reset-offsets` subcommand allows resetting the offsets for a consumer group in a topic. There are 2 main approaches for setting the offsets: - -1. Use a combination of `--partitions`, `--offset`, `--to-earliest` and `--to-latest` flags. `--partitions` flag specifies a list of partitions to be reset e.g. `1,2,3 ...`. If not used, the command defaults to resetting consumer group offsets for ALL of the partitions. `--offset` flag indicates the specific value that all desired consumer group partitions will be set to. If not set, it will default to -2. Finally, `--to-earliest` flag resets offsets of consumer group members to earliest offsets of partitions while `--to-latest` resets offsets of consumer group members to latest offsets of partitions. However, only one of the `--to-earliest`, `--to-latest` and `--offset` flags can be used at a time. This approach is easy to use but lacks the ability for detailed offset configuration. - -2. Use `--partition-offset-map` flag to specify a detailed offset configuration for individual partitions. For example, `1=5,2=10,7=12,...` means that the consumer group offset for partition 1 must be set to 5, partition 2 to offset 10, partition 7 to offset 12 and so on. This approach provides greater flexibility and fine-grained control for this operation. Note that `--partition-offset-map` flag is standalone and cannot be coupled with any of the previous flags. - - +The `reset-offsets` subcommand allows resetting the offsets +for a consumer group in a topic. +There are a few typical approaches for setting the offsets: + +1. Use `--delete` alongside `--before-earliest`: + This will unblock consumers which are stuck on an offsets + which are no longer in range, + without affecting healthy consumers. + Typically this follows an outage or sustained slow consumption. +2. Use one of the partition selectors: + `--before-earliest`, `--after-latest`, or `--partitions`, + and combine it with one of the offset operators: + `--delete`, `--offset`, `--to-earliest` or `--to-latest`. + Aside from `--to-latest`, this is a legacy approach that is largely + superseded by approach 1. +3. Use `--partition-offset-map` to pass specific offsets per partition. + For example, `1=5,2=10` means that the consumer group offset + for partition 1 must be set to 5, and partition 2 to offset 10. + This is mainly used for replays of specific traffic, + such as when a deploy has mishandled or corrupted state, + and the prior release must be rerun + starting at a specific offset per partition. + This is the most flexible approach for offset setting. + +Note that `--partition-offset-map` flag is standalone +and cannot be coupled with other flags. + +##### Partition selection flags + +At most one of the following may be selected: + +* `--partitions` specifies a comma-separated list of partitions IDs. +* `--before-earliest` selects partitions whose group offset is older + than the oldest still-retained offset. +* `--after-latest` selects partitions whose group offset is newer + than the newest offset that has been published to the topic. + +If none of these are specified, +the command defaults to selecting ALL of the partitions. + +##### Offset selection flags + +At most one of the following may be selected: + +* `--delete` removes stored group offsets. + This will generally have the same effect as `--to-earliest` or `--to-latest`, + depending on the consumer group configuration. + However, `--delete` is more reliable and convenient, + since `--to-earliest` in particular involves a race with message retention + that may require numerous attempts. +* `--offset` indicates the specific value that all selected + consumer group partitions will be set to. +* `--to-earliest` resets group offsets to oldest still-retained per partition. +* `--to-latest` resets group offsets to newest per partitions. + +If none of these are specified, `--to-earliest` will be the default. #### tail diff --git a/cmd/topicctl/subcmd/reset.go b/cmd/topicctl/subcmd/reset.go index 4fa4981e..4b49f8c0 100644 --- a/cmd/topicctl/subcmd/reset.go +++ b/cmd/topicctl/subcmd/reset.go @@ -6,7 +6,6 @@ import ( "fmt" "strconv" - "github.com/segmentio/topicctl/pkg/admin" "github.com/segmentio/topicctl/pkg/apply" "github.com/segmentio/topicctl/pkg/cli" "github.com/segmentio/topicctl/pkg/groups" @@ -26,6 +25,8 @@ type resetOffsetsCmdConfig struct { offset int64 partitions []int partitionOffsetMap map[string]int64 + beforeEarliest bool + afterLatest bool toEarliest bool toLatest bool delete bool @@ -36,65 +37,99 @@ type resetOffsetsCmdConfig struct { var resetOffsetsConfig resetOffsetsCmdConfig func init() { - resetOffsetsCmd.Flags().Int64Var( - &resetOffsetsConfig.offset, + cfg := &resetOffsetsConfig + cmd := resetOffsetsCmd + flags := cmd.Flags() + + flags.Int64Var( + &cfg.offset, "offset", -2, "Desired offset for the target partitions", ) - resetOffsetsCmd.Flags().IntSliceVar( - &resetOffsetsConfig.partitions, + flags.IntSliceVar( + &cfg.partitions, "partitions", []int{}, "List of partitions to reset e.g. 1,2,3,.. (defaults to all)", ) - resetOffsetsCmd.Flags().StringToInt64Var( - &resetOffsetsConfig.partitionOffsetMap, + flags.StringToInt64Var( + &cfg.partitionOffsetMap, "partition-offset-map", map[string]int64{}, "Map of partition IDs to their corresponding desired offsets e.g. 1=5,2=10,3=12,...", ) - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.toEarliest, + flags.BoolVar( + &cfg.beforeEarliest, + "before-earliest", + false, + "Apply only to offsets below the partition minimum", + ) + flags.BoolVar( + &cfg.afterLatest, + "after-latest", + false, + "Apply only to offsets above the partition maximum", + ) + flags.BoolVar( + &cfg.toEarliest, "to-earliest", false, - "Resets offsets of consumer group members to earliest offsets of partitions") - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.toLatest, + "Resets offsets of consumer group members to earliest offsets of partitions", + ) + flags.BoolVar( + &cfg.toLatest, "to-latest", false, - "Resets offsets of consumer group members to latest offsets of partitions") - resetOffsetsCmd.Flags().BoolVar( - &resetOffsetsConfig.delete, + "Resets offsets of consumer group members to latest offsets of partitions", + ) + flags.BoolVar( + &cfg.delete, "delete", false, - "Deletes offsets for the given consumer group") + "Deletes offsets for the given consumer group", + ) - addSharedFlags(resetOffsetsCmd, &resetOffsetsConfig.shared) - RootCmd.AddCommand(resetOffsetsCmd) + addSharedFlags(cmd, &resetOffsetsConfig.shared) + RootCmd.AddCommand(cmd) } func resetOffsetsPreRun(cmd *cobra.Command, args []string) error { - resetOffsetSpec := "You must choose only one of the following " + - "reset-offset specifications: --delete, --to-earliest, --to-latest, " + - "--offset, or --partition-offset-map." - offsetMapSpec := "--partition-offset-map option cannot be used with --partitions." + const ( + resetOffsetSpec = "You must choose only one of the following " + + "reset-offset specifications: --delete, --to-earliest, --to-latest, " + + "--offset, or --partition-offset-map" + offsetMapSpec = "--partition-offset-map option cannot be used with " + + "--partitions, --before-earliest, or --after-latest" + rangeSpec = "--before-earliest cannot be combined with --after-latest" + ) cfg := resetOffsetsConfig + hasMap := len(cfg.partitionOffsetMap) > 0 + hasSlice := len(cfg.partitions) > 0 + numOffsetSpecs := numTrue( cfg.toEarliest, cfg.toLatest, cfg.delete, cmd.Flags().Changed("offset"), - len(cfg.partitionOffsetMap) > 0, + hasMap, ) if numOffsetSpecs > 1 { return errors.New(resetOffsetSpec) } - if len(cfg.partitionOffsetMap) > 0 && len(cfg.partitions) > 0 { + if cfg.beforeEarliest && cfg.afterLatest { + return errors.New(rangeSpec) + } + + if hasMap && hasSlice { + return errors.New(offsetMapSpec) + } + + if numTrue(hasMap, cfg.beforeEarliest, cfg.afterLatest) > 1 { return errors.New(offsetMapSpec) } @@ -105,6 +140,9 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + topic := args[0] + group := args[1] + cfg := resetOffsetsConfig adminClient, err := cfg.shared.getAdminClient(ctx, nil, true) @@ -116,30 +154,24 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { connector := adminClient.GetConnector() - topic := args[0] - group := args[1] + getLagsInput := groups.GetMemberLagsInput{ + GroupID: group, + Topic: topic, - topicInfo, err := adminClient.GetTopic(ctx, topic, false) - if err != nil { - return err + // We need partition-accurate range bounds, + // but don't care about consumer-group message timings. + FullRange: true, } - partitionIDsMap := make(map[int]struct{}, len(topicInfo.Partitions)) - for _, partitionInfo := range topicInfo.Partitions { - partitionIDsMap[partitionInfo.ID] = struct{}{} + partitionLags, err := groups.GetMemberLags(ctx, connector, &getLagsInput) + if err != nil { + return err } - var strategy string - - switch { - case resetOffsetsConfig.toLatest: - strategy = groups.LatestResetOffsetsStrategy - case resetOffsetsConfig.toEarliest: - strategy = groups.EarliestResetOffsetsStrategy - } + infoByPartition := sliceToMapKeyFunc(partitionLags, func(v *groups.MemberPartitionLag) int { return v.Partition }) // If explicit per-partition offsets were specified, set them now. - partitionOffsets, err := parsePartitionOffsetMap(partitionIDsMap, cfg.partitionOffsetMap) + partitionOffsets, err := parsePartitionOffsetMap(infoByPartition, cfg.partitionOffsetMap) if err != nil { return err } @@ -149,47 +181,67 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { // these will only take effect of per-partition offsets were not specified. partitions := cfg.partitions if len(partitions) == 0 && len(partitionOffsets) == 0 { - convert := func(info admin.PartitionInfo) int { return info.ID } - partitions = convertSlice(topicInfo.Partitions, convert) + partitions = mapKeys(infoByPartition) } - for _, partition := range partitions { - _, ok := partitionIDsMap[partition] - if !ok { + // Re-append applicable partitions back over the same slice. + n := len(partitions) + partitions = partitions[:0] + + for _, partition := range partitions[:n] { + info := infoByPartition[partition] + if info == nil { format := "Partition %d not found in topic %s" return fmt.Errorf(format, partition, topic) } - if strategy == "" { - partitionOffsets[partition] = cfg.offset - return nil + // Skip partitions with in-range group offsets. + switch { + case cfg.beforeEarliest && info.MemberOffset >= info.OldestOffset: + continue + case cfg.afterLatest && info.MemberOffset <= info.NewestOffset: + continue } - input := groups.GetEarliestOrLatestOffsetInput{ - Connector: connector, - Strategy: strategy, - Topic: topic, - Partition: partition, - } + partitions = append(partitions, partition) - offset, err := groups.GetEarliestOrLatestOffset(ctx, &input) - if err != nil { - return err + offset := cfg.offset + switch { + case cfg.delete: + continue // storing an offset is not applicable when deleting. + case cfg.toEarliest: + offset = info.OldestOffset + case cfg.toLatest: + offset = info.NewestOffset } partitionOffsets[partition] = offset } - log.Infof( - "This will reset the offsets for the following partitions "+ - "in topic %s for group %s:\n%s", - topic, - group, - groups.FormatPartitionOffsets(partitionOffsets), - ) + if cfg.delete { - log.Info("Please ensure that all other consumers are stopped, " + - "otherwise the reset might be overridden.") + } + + message := "This will reset the offsets for the following partitions " + + "in topic %s for group %s:\n%s" + formatTable := func() string { return groups.FormatPartitionOffsets(partitionOffsets) } + + if cfg.delete { + message = "This will delete the offsets for the following partitions " + + "in topic %s for group %s:\n%s" + formatTable = func() string { return groups.FormatPartitions(partitions) } + } + + log.Infof(message, topic, group, formatTable()) + + // Stopping consumers is typically only relevant to resets, + // since deleting offsets is usually just for unblocking stuck partitions: + // if the group offset for a partition is being actively updated, + // then it's not stuck. + if !cfg.delete { + log.Info("Please ensure that all other consumers are stopped, " + + "otherwise the reset might be overridden.") + } ok, _ := apply.Confirm("OK to continue?", false) if !ok { @@ -198,7 +250,7 @@ func resetOffsetsRun(cmd *cobra.Command, args []string) error { cliRunner := cli.NewCLIRunner(adminClient, log.Infof, !noSpinner) - if resetOffsetsConfig.delete { + if cfg.delete { input := groups.DeleteOffsetsInput{ GroupID: group, Topic: topic, @@ -228,6 +280,32 @@ func numTrue(bools ...bool) int { return n } +func mapKeys[K comparable, V any](m map[K]V) []K { + s := make([]K, 0, len(m)) + + for k := range m { + s = append(s, k) + } + + return s +} + +func sliceToMapKeyFunc[K comparable, V any](s []V, fn func(*V) K) map[K]*V { + return sliceToMapFunc(s, func(v *V) (K, *V) { return fn(v), v }) +} + +func sliceToMapFunc[K comparable, V1, V2 any](s []V1, fn func(*V1) (K, V2)) map[K]V2 { + m := make(map[K]V2, len(s)) + + for i := range s { + v1 := &s[i] + k, v2 := fn(v1) + m[k] = v2 + } + + return m +} + func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { out := make([]T2, len(input)) @@ -238,7 +316,7 @@ func convertSlice[T1, T2 any](input []T1, fn func(T1) T2) []T2 { return out } -func parsePartitionOffsetMap(partitionIDsMap map[int]struct{}, input map[string]int64) (map[int]int64, error) { +func parsePartitionOffsetMap[T any](partitionIDsMap map[int]T, input map[string]int64) (map[int]int64, error) { out := make(map[int]int64, len(input)) for partition, offset := range input { diff --git a/pkg/cli/cli.go b/pkg/cli/cli.go index 0684208e..2135d878 100644 --- a/pkg/cli/cli.go +++ b/pkg/cli/cli.go @@ -372,14 +372,13 @@ func (c *CLIRunner) GetMemberLags( return fmt.Errorf("Error fetching topic info: %+v", err) } - memberLags, err := groups.GetMemberLags( - ctx, - c.adminClient.GetConnector(), - topic, - groupID, - ) - c.stopSpinner() + getLagsInput := groups.GetMemberLagsInput{ + GroupID: groupID, + Topic: topic, + } + memberLags, err := groups.GetMemberLags(ctx, c.adminClient.GetConnector(), &getLagsInput) + c.stopSpinner() if err != nil { return err } diff --git a/pkg/groups/format.go b/pkg/groups/format.go index c1ea801c..28780560 100644 --- a/pkg/groups/format.go +++ b/pkg/groups/format.go @@ -3,7 +3,9 @@ package groups import ( "bytes" "fmt" + "slices" "sort" + "strconv" "strings" "time" @@ -270,12 +272,39 @@ func FormatMemberLags(memberLags []MemberPartitionLag, full bool) string { return string(bytes.TrimRight(buf.Bytes(), "\n")) } +// FormatPartitions generates a pretty table that shows a list of partitions. +func FormatPartitions(partitions []int) string { + var buf bytes.Buffer + + table := tablewriter.NewWriter(&buf) + table.SetHeader([]string{"Partition"}) + table.SetAutoWrapText(false) + table.SetColumnAlignment([]int{tablewriter.ALIGN_RIGHT}) + + table.SetBorders(tablewriter.Border{ + Left: false, + Top: true, + Right: false, + Bottom: true, + }) + + slices.Sort(partitions) + + for _, partition := range partitions { + table.Append([]string{strconv.Itoa(partition)}) + } + + table.Render() + + return string(bytes.TrimRight(buf.Bytes(), "\n")) +} + // FormatPartitionOffsets generates a pretty table that shows the proposed offsets for each // partition in a reset. func FormatPartitionOffsets(partitionOffsets map[int]int64) string { - buf := &bytes.Buffer{} + var buf bytes.Buffer - table := tablewriter.NewWriter(buf) + table := tablewriter.NewWriter(&buf) table.SetHeader( []string{ "Partition", diff --git a/pkg/groups/groups.go b/pkg/groups/groups.go index fe7ae7fc..b05f19c7 100644 --- a/pkg/groups/groups.go +++ b/pkg/groups/groups.go @@ -74,11 +74,7 @@ func GetGroups( } // GetGroupDetails returns the details (membership, etc.) for a single consumer group. -func GetGroupDetails( - ctx context.Context, - connector *admin.Connector, - groupID string, -) (*GroupDetails, error) { +func GetGroupDetails(ctx context.Context, connector *admin.Connector, groupID string) (*GroupDetails, error) { req := kafka.DescribeGroupsRequest{ GroupIDs: []string{groupID}, } @@ -135,15 +131,21 @@ func GetGroupDetails( return &groupDetails, nil } -// GetMemberLags returns the lag for each partition being consumed by the argument group in the -// argument topic. -func GetMemberLags( - ctx context.Context, - connector *admin.Connector, - topic string, - groupID string, -) ([]MemberPartitionLag, error) { - groupDetails, err := GetGroupDetails(ctx, connector, groupID) +// GetMemberLagsInput configures a call to [GetMemberLags]. +type GetMemberLagsInput struct { + GroupID string + Topic string + + // FullRange will make fetched partition ranges accurate + // from the partition's perspective, ignoring consumer group: + // this has the downside of potentially making MemberTime inaccurate. + FullRange bool +} + +// GetMemberLags returns the lag for each partition on the given topic, +// being consumed by the given group in the argument topic. +func GetMemberLags(ctx context.Context, connector *admin.Connector, input *GetMemberLagsInput) ([]MemberPartitionLag, error) { + groupDetails, err := GetGroupDetails(ctx, connector, input.GroupID) if err != nil { return nil, err } @@ -152,40 +154,49 @@ func GetMemberLags( return nil, errors.New("Group state is dead; check that group ID is valid") } - partitionMembers := groupDetails.PartitionMembers(topic) + partitionMembers := groupDetails.PartitionMembers(input.Topic) - offsets, err := connector.KafkaClient.ConsumerOffsets( - ctx, kafka.TopicAndGroup{ - Topic: topic, - GroupId: groupID, - }, - ) + offsetInput := kafka.TopicAndGroup{ + GroupId: input.GroupID, + Topic: input.Topic, + } + + offsets, err := connector.KafkaClient.ConsumerOffsets(ctx, offsetInput) if err != nil { return nil, err } - bounds, err := messages.GetAllPartitionBounds(ctx, connector, topic, offsets) + boundsOffsetsInput := offsets + if input.FullRange { + boundsOffsetsInput = nil + } + + bounds, err := messages.GetAllPartitionBounds(ctx, connector, input.Topic, boundsOffsetsInput) if err != nil { return nil, err } - partitionLags := []MemberPartitionLag{} + partitionLags := make([]MemberPartitionLag, len(bounds)) - for _, bound := range bounds { - partitionLag := MemberPartitionLag{ - Topic: topic, + for i, bound := range bounds { + lag := &partitionLags[i] + *lag = MemberPartitionLag{ + Topic: input.Topic, Partition: bound.Partition, MemberID: partitionMembers[bound.Partition].MemberID, - MemberOffset: offsets[bound.Partition], + OldestOffset: bound.FirstOffset, NewestOffset: bound.LastOffset, + MemberOffset: offsets[bound.Partition], + OldestTime: bound.FirstTime, NewestTime: bound.LastTime, } - if bound.FirstOffset == offsets[bound.Partition] { - partitionLag.MemberTime = bound.FirstTime + switch lag.MemberOffset { + case bound.LastOffset: + lag.MemberTime = bound.LastTime + case bound.FirstOffset: + lag.MemberTime = bound.FirstTime } - - partitionLags = append(partitionLags, partitionLag) } return partitionLags, nil @@ -260,7 +271,6 @@ func ResetOffsets(ctx context.Context, connector *admin.Connector, input *ResetO // GetEarliestOrLatestOffsetInput configures a call to [GetEarliestOrLatestOffset]. type GetEarliestOrLatestOffsetInput struct { - Connector *admin.Connector Strategy string Topic string Partition int @@ -268,12 +278,12 @@ type GetEarliestOrLatestOffsetInput struct { // GetEarliestorLatestOffset gets earliest/latest offset // for a given topic partition for resetting offsets of consumer group. -func GetEarliestOrLatestOffset(ctx context.Context, input *GetEarliestOrLatestOffsetInput) (int64, error) { +func GetEarliestOrLatestOffset(ctx context.Context, connector *admin.Connector, input *GetEarliestOrLatestOffsetInput) (int64, error) { if !isValidOffsetStrategy(input.Strategy) { return 0, errors.New("Invalid reset offset strategy provided.") } - partitionBound, err := messages.GetPartitionBounds(ctx, input.Connector, input.Topic, input.Partition, 0) + partitionBound, err := messages.GetPartitionBounds(ctx, connector, input.Topic, input.Partition, 0) if err != nil { return 0, err } diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index 6dab4c00..4bca1277 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -285,7 +285,12 @@ func TestGetLags(t *testing.T) { require.NoError(t, err) } - lags, err := GetMemberLags(ctx, connector, topicName, groupID) + getLagsInput := GetMemberLagsInput{ + GroupID: groupID, + Topic: topicName, + } + + lags, err := GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -332,18 +337,17 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { for _, partition := range groupPartitions { input := GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Partition: partition, } input.Strategy = LatestResetOffsetsStrategy - offset, err := GetEarliestOrLatestOffset(ctx, &input) + offset, err := GetEarliestOrLatestOffset(ctx, connector, &input) require.NoError(t, err) assert.Equal(t, int64(4), offset) input.Strategy = EarliestResetOffsetsStrategy - offset, err = GetEarliestOrLatestOffset(ctx, &input) + offset, err = GetEarliestOrLatestOffset(ctx, connector, &input) require.NoError(t, err) assert.Equal(t, int64(0), offset) } @@ -390,7 +394,12 @@ func TestResetOffsets(t *testing.T) { err = ResetOffsets(ctx, connector, &input) require.NoError(t, err) - lags, err := GetMemberLags(ctx, connector, topicName, groupID) + getLagsInput := GetMemberLagsInput{ + GroupID: groupID, + Topic: topicName, + } + + lags, err := GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) @@ -399,24 +408,22 @@ func TestResetOffsets(t *testing.T) { // latest offset of partition 0 getInput := GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Strategy: LatestResetOffsetsStrategy, Partition: 0, } - latestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) + latestOffset, err := GetEarliestOrLatestOffset(ctx, connector, &getInput) require.NoError(t, err) // earliest offset of partition 1 getInput = GetEarliestOrLatestOffsetInput{ - Connector: connector, Topic: topicName, Strategy: EarliestResetOffsetsStrategy, Partition: 1, } - earliestOffset, err := GetEarliestOrLatestOffset(ctx, &getInput) + earliestOffset, err := GetEarliestOrLatestOffset(ctx, connector, &getInput) require.NoError(t, err) resetInput := ResetOffsetsInput{ @@ -431,7 +438,7 @@ func TestResetOffsets(t *testing.T) { err = ResetOffsets(ctx, connector, &resetInput) require.NoError(t, err) - lags, err = GetMemberLags(ctx, connector, topicName, groupID) + lags, err = GetMemberLags(ctx, connector, &getLagsInput) require.NoError(t, err) require.Equal(t, 2, len(lags)) diff --git a/pkg/groups/types.go b/pkg/groups/types.go index 989bddc9..8073be6d 100644 --- a/pkg/groups/types.go +++ b/pkg/groups/types.go @@ -83,9 +83,11 @@ type MemberPartitionLag struct { Topic string Partition int MemberID string + OldestOffset int64 NewestOffset int64 - NewestTime time.Time MemberOffset int64 + OldestTime time.Time + NewestTime time.Time MemberTime time.Time } diff --git a/pkg/messages/bounds.go b/pkg/messages/bounds.go index 8ad973f6..26396322 100644 --- a/pkg/messages/bounds.go +++ b/pkg/messages/bounds.go @@ -1,9 +1,10 @@ package messages import ( + "cmp" "context" "fmt" - "sort" + "slices" "time" "github.com/segmentio/kafka-go" @@ -119,8 +120,8 @@ func GetAllPartitionBounds( } } - sort.Slice(allBounds, func(a, b int) bool { - return allBounds[a].Partition < allBounds[b].Partition + slices.SortFunc(allBounds, func(a, b Bounds) int { + return cmp.Compare(a.Partition, b.Partition) }) return allBounds, nil From c8376f5411ddeb7186e95cd818188404c88a00d1 Mon Sep 17 00:00:00 2001 From: Kevin Gillette Date: Tue, 12 Dec 2023 23:13:23 -0700 Subject: [PATCH 4/4] update CI Go build version to 1.21 --- .github/workflows/ci.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 18183e0b..bc055149 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -28,7 +28,7 @@ jobs: test010: runs-on: ubuntu-latest container: - image: cimg/go:1.19 + image: cimg/go:1.21 env: GO111MODULE: "on" KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181 @@ -39,7 +39,7 @@ jobs: - name: Go setup uses: actions/setup-go@v3 with: - go-version: 1.19 + go-version: 1.21 - name: Display Go version run: go version - name: Run tests @@ -124,7 +124,7 @@ jobs: test271: runs-on: ubuntu-latest container: - image: cimg/go:1.19 + image: cimg/go:1.21 env: GO111MODULE: "on" KAFKA_TOPICS_TEST_ZK_ADDR: zookeeper:2181 @@ -135,7 +135,7 @@ jobs: - name: Go setup uses: actions/setup-go@v3 with: - go-version: 1.19 + go-version: 1.21 - name: Display Go version run: go version - name: Run tests