Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/root/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/apache/apisix-ingress-controller/internal/controller/config"
"github.com/apache/apisix-ingress-controller/internal/manager"
"github.com/apache/apisix-ingress-controller/internal/version"
"github.com/api7/gopkg/pkg/log"
)

type GatewayConfigsFlag struct {
Expand Down Expand Up @@ -115,6 +114,7 @@ func newAPISIXIngressController() *cobra.Command {
return err
}

<<<<<<< HEAD
// dashboard sdk log
l, err := log.NewLogger(
log.WithOutputFile("stderr"),
Expand All @@ -126,6 +126,8 @@ func newAPISIXIngressController() *cobra.Command {
}
log.DefaultLogger = l

=======
>>>>>>> d9550d88 (chore: unify the logging component (#2584))
// controllers log
core := zapcore.NewCore(
zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()),
Expand Down
27 changes: 14 additions & 13 deletions internal/adc/cache/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@ import (
"fmt"
"sync"

"github.com/api7/gopkg/pkg/log"
"github.com/go-logr/logr"
"github.com/google/uuid"
"go.uber.org/zap"

adctypes "github.com/apache/apisix-ingress-controller/api/adc"
"github.com/apache/apisix-ingress-controller/internal/controller/label"
Expand All @@ -34,12 +33,14 @@ type Store struct {
pluginMetadataMap map[string]adctypes.PluginMetadata

sync.Mutex
log logr.Logger
}

func NewStore() *Store {
func NewStore(log logr.Logger) *Store {
return &Store{
cacheMap: make(map[string]Cache),
pluginMetadataMap: make(map[string]adctypes.PluginMetadata),
log: log,
}
}

Expand All @@ -55,7 +56,7 @@ func (s *Store) Insert(name string, resourceTypes []string, resources *adctypes.
s.cacheMap[name] = db
targetCache = s.cacheMap[name]
}
log.Debugw("Inserting resources into cache for", zap.String("name", name))
s.log.V(1).Info("Inserting resources into cache", "name", name, "resourceTypes", resourceTypes, "Labels", Labels)
selector := &KindLabelSelector{
Kind: Labels[label.LabelKind],
Name: Labels[label.LabelName],
Expand Down Expand Up @@ -162,41 +163,41 @@ func (s *Store) Delete(name string, resourceTypes []string, Labels map[string]st
case adctypes.TypeService:
services, err := targetCache.ListServices(selector)
if err != nil {
log.Errorw("failed to list services", zap.Error(err))
s.log.Error(err, "failed to list services")
}
for _, service := range services {
if err := targetCache.DeleteService(service); err != nil {
log.Errorw("failed to delete service", zap.Error(err), zap.String("service", service.ID))
s.log.Error(err, "failed to delete service", "service", service.ID)
}
}
case adctypes.TypeSSL:
ssls, err := targetCache.ListSSL(selector)
if err != nil {
log.Errorw("failed to list ssl", zap.Error(err))
s.log.Error(err, "failed to list ssl")
}
for _, ssl := range ssls {
if err := targetCache.DeleteSSL(ssl); err != nil {
log.Errorw("failed to delete ssl", zap.Error(err), zap.String("ssl", ssl.ID))
s.log.Error(err, "failed to delete ssl", "ssl", ssl.ID)
}
}
case adctypes.TypeConsumer:
consumers, err := targetCache.ListConsumers(selector)
if err != nil {
log.Errorw("failed to list consumers", zap.Error(err))
s.log.Error(err, "failed to list consumers")
}
for _, consumer := range consumers {
if err := targetCache.DeleteConsumer(consumer); err != nil {
log.Errorw("failed to delete consumer", zap.Error(err), zap.String("consumer", consumer.Username))
s.log.Error(err, "failed to delete consumer", "consumer", consumer.Username)
}
}
case adctypes.TypeGlobalRule:
globalRules, err := targetCache.ListGlobalRules(selector)
if err != nil {
log.Errorw("failed to list global rules", zap.Error(err))
s.log.Error(err, "failed to list global rules")
}
for _, globalRule := range globalRules {
if err := targetCache.DeleteGlobalRule(globalRule); err != nil {
log.Errorw("failed to delete global rule", zap.Error(err), zap.String("global rule", globalRule.ID))
s.log.Error(err, "failed to delete global rule", "global rule", globalRule.ID)
}
}
case adctypes.TypePluginMetadata:
Expand Down Expand Up @@ -229,7 +230,7 @@ func (s *Store) GetResources(name string) (*adctypes.Resources, error) {
}
globalrule = adctypes.GlobalRule(merged)
}
log.Debugw("get resources global rule items", zap.Any("globalRuleItems", globalRuleItems))
s.log.V(1).Info("GetResources fetched global rule items", "items", globalRuleItems, "gobalrule", globalrule)
if meta, ok := s.pluginMetadataMap[name]; ok {
metadata = meta.DeepCopy()
}
Expand Down
90 changes: 49 additions & 41 deletions internal/adc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import (
"sync"
"time"

"github.com/api7/gopkg/pkg/log"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"go.uber.org/zap"

adctypes "github.com/apache/apisix-ingress-controller/api/adc"
"github.com/apache/apisix-ingress-controller/internal/adc/cache"
Expand All @@ -48,23 +47,33 @@ type Client struct {

ConfigManager *common.ConfigManager[types.NamespacedNameKind, adctypes.Config]
ADCDebugProvider *common.ADCDebugProvider

log logr.Logger
}

func New(mode string, timeout time.Duration) (*Client, error) {
func New(log logr.Logger, mode string, timeout time.Duration) (*Client, error) {
serverURL := os.Getenv("ADC_SERVER_URL")
if serverURL == "" {
serverURL = defaultHTTPADCExecutorAddr
}
<<<<<<< HEAD

store := cache.NewStore()
=======
store := cache.NewStore(log)
>>>>>>> d9550d88 (chore: unify the logging component (#2584))
configManager := common.NewConfigManager[types.NamespacedNameKind, adctypes.Config]()
log.Infow("using HTTP ADC Executor", zap.String("server_url", serverURL))

logger := log.WithName("client")
logger.Info("ADC client initialized", "mode", mode)

return &Client{
Store: store,
executor: NewHTTPADCExecutor(serverURL, timeout),
executor: NewHTTPADCExecutor(log, serverURL, timeout),
BackendMode: mode,
ConfigManager: configManager,
ADCDebugProvider: common.NewADCDebugProvider(store, configManager),
log: logger,
}, nil
}

Expand All @@ -82,55 +91,55 @@ type StoreDelta struct {
Applied map[types.NamespacedNameKind]adctypes.Config
}

func (d *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) {
d.mu.Lock()
defer d.mu.Unlock()
func (c *Client) applyStoreChanges(args Task, isDelete bool) (StoreDelta, error) {
c.mu.Lock()
defer c.mu.Unlock()

var delta StoreDelta

if isDelete {
delta.Deleted = d.ConfigManager.Get(args.Key)
d.ConfigManager.Delete(args.Key)
delta.Deleted = c.ConfigManager.Get(args.Key)
c.ConfigManager.Delete(args.Key)
} else {
deleted := d.ConfigManager.Update(args.Key, args.Configs)
deleted := c.ConfigManager.Update(args.Key, args.Configs)
delta.Deleted = deleted
delta.Applied = args.Configs
}

for _, cfg := range delta.Deleted {
if err := d.Store.Delete(cfg.Name, args.ResourceTypes, args.Labels); err != nil {
log.Errorw("store delete failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args))
if err := c.Store.Delete(cfg.Name, args.ResourceTypes, args.Labels); err != nil {
c.log.Error(err, "store delete failed", "cfg", cfg, "args", args)
return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store delete failed for config %s", cfg.Name))
}
}

for _, cfg := range delta.Applied {
if err := d.Insert(cfg.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil {
log.Errorw("store insert failed", zap.Error(err), zap.Any("cfg", cfg), zap.Any("args", args))
if err := c.Insert(cfg.Name, args.ResourceTypes, args.Resources, args.Labels); err != nil {
c.log.Error(err, "store insert failed", "cfg", cfg, "args", args)
return StoreDelta{}, errors.Wrap(err, fmt.Sprintf("store insert failed for config %s", cfg.Name))
}
}

return delta, nil
}

func (d *Client) applySync(ctx context.Context, args Task, delta StoreDelta) error {
d.syncMu.RLock()
defer d.syncMu.RUnlock()
func (c *Client) applySync(ctx context.Context, args Task, delta StoreDelta) error {
c.syncMu.RLock()
defer c.syncMu.RUnlock()

if len(delta.Deleted) > 0 {
if err := d.sync(ctx, Task{
if err := c.sync(ctx, Task{
Name: args.Name,
Labels: args.Labels,
ResourceTypes: args.ResourceTypes,
Configs: delta.Deleted,
}); err != nil {
log.Warnw("failed to sync deleted configs", zap.Error(err))
c.log.Error(err, "failed to sync deleted configs", "args", args, "delta", delta)
}
}

if len(delta.Applied) > 0 {
return d.sync(ctx, Task{
return c.sync(ctx, Task{
Name: args.Name,
Labels: args.Labels,
ResourceTypes: args.ResourceTypes,
Expand All @@ -141,53 +150,53 @@ func (d *Client) applySync(ctx context.Context, args Task, delta StoreDelta) err
return nil
}

func (d *Client) Update(ctx context.Context, args Task) error {
delta, err := d.applyStoreChanges(args, false)
func (c *Client) Update(ctx context.Context, args Task) error {
delta, err := c.applyStoreChanges(args, false)
if err != nil {
return err
}
return d.applySync(ctx, args, delta)
return c.applySync(ctx, args, delta)
}

func (d *Client) UpdateConfig(ctx context.Context, args Task) error {
_, err := d.applyStoreChanges(args, false)
func (c *Client) UpdateConfig(ctx context.Context, args Task) error {
_, err := c.applyStoreChanges(args, false)
return err
}

func (d *Client) Delete(ctx context.Context, args Task) error {
delta, err := d.applyStoreChanges(args, true)
func (c *Client) Delete(ctx context.Context, args Task) error {
delta, err := c.applyStoreChanges(args, true)
if err != nil {
return err
}
return d.applySync(ctx, args, delta)
return c.applySync(ctx, args, delta)
}

func (d *Client) DeleteConfig(ctx context.Context, args Task) error {
_, err := d.applyStoreChanges(args, true)
func (c *Client) DeleteConfig(ctx context.Context, args Task) error {
_, err := c.applyStoreChanges(args, true)
return err
}

func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors, error) {
c.syncMu.Lock()
defer c.syncMu.Unlock()
log.Debug("syncing all resources")
c.log.Info("syncing all resources")

configs := c.ConfigManager.List()

if len(configs) == 0 {
log.Warn("no GatewayProxy configs provided")
c.log.Info("no GatewayProxy configs provided")
return nil, nil
}

log.Debugw("syncing resources with multiple configs", zap.Any("configs", configs))
c.log.V(1).Info("syncing resources with multiple configs", "configs", configs)

failedMap := map[string]types.ADCExecutionErrors{}
var failedConfigs []string
for _, config := range configs {
name := config.Name
resources, err := c.GetResources(name)
if err != nil {
log.Errorw("failed to get resources from store", zap.String("name", name), zap.Error(err))
c.log.Error(err, "failed to get resources from store", "name", name)
failedConfigs = append(failedConfigs, name)
continue
}
Expand All @@ -202,7 +211,7 @@ func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors,
},
Resources: resources,
}); err != nil {
log.Errorw("failed to sync resources", zap.String("name", name), zap.Error(err))
c.log.Error(err, "failed to sync resources", "name", name)
failedConfigs = append(failedConfigs, name)
var execErrs types.ADCExecutionErrors
if errors.As(err, &execErrs) {
Expand All @@ -221,10 +230,10 @@ func (c *Client) Sync(ctx context.Context) (map[string]types.ADCExecutionErrors,
}

func (c *Client) sync(ctx context.Context, task Task) error {
log.Debugw("syncing resources", zap.Any("task", task))
c.log.V(1).Info("syncing resources", "task", task)

if len(task.Configs) == 0 {
log.Warnw("no adc configs provided", zap.Any("task", task))
c.log.Info("no adc configs provided")
return nil
}

Expand Down Expand Up @@ -309,6 +318,7 @@ func (c *Client) sync(ctx context.Context, task Task) error {
}
pkgmetrics.RecordFileIODuration("prepare_sync_file", adctypes.StatusSuccess, time.Since(fileIOStart).Seconds())
defer cleanup()
c.log.V(1).Info("prepared sync file", "path", syncFilePath)

args := BuildADCExecuteArgs(syncFilePath, task.Labels, task.ResourceTypes)

Expand All @@ -326,7 +336,7 @@ func (c *Client) sync(ctx context.Context, task Task) error {
status := adctypes.StatusSuccess
if err != nil {
status = "failure"
log.Errorw("failed to execute adc command", zap.Error(err), zap.Any("config", config))
c.log.Error(err, "failed to execute adc command", "config", config)

var execErr types.ADCExecutionError
if errors.As(err, &execErr) {
Expand Down Expand Up @@ -366,7 +376,5 @@ func prepareSyncFile(resources any) (string, func(), error) {
return "", nil, err
}

log.Debugw("generated adc file", zap.String("filename", tmpFile.Name()), zap.String("json", string(data)))

return tmpFile.Name(), cleanup, nil
}
Loading