Skip to content

Commit afbc014

Browse files
authored
Merge pull request #20192 from Jille/lgMu
client: Use an atomic.Pointer instead of a mutex to guard the logger
2 parents 7d7f951 + 24d2d31 commit afbc014

File tree

7 files changed

+32
-36
lines changed

7 files changed

+32
-36
lines changed

client/v3/client.go

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"fmt"
2121
"strings"
2222
"sync"
23+
"sync/atomic"
2324
"time"
2425

2526
"github.com/coreos/go-semver/semver"
@@ -78,8 +79,7 @@ type Client struct {
7879

7980
callOpts []grpc.CallOption
8081

81-
lgMu *sync.RWMutex
82-
lg *zap.Logger
82+
lg atomic.Pointer[zap.Logger]
8383
}
8484

8585
// New creates a new etcdv3 client from a given configuration.
@@ -96,12 +96,12 @@ func New(cfg Config) (*Client, error) {
9696
// service interface implementations and do not need connection management.
9797
func NewCtxClient(ctx context.Context, opts ...Option) *Client {
9898
cctx, cancel := context.WithCancel(ctx)
99-
c := &Client{ctx: cctx, cancel: cancel, lgMu: new(sync.RWMutex), epMu: new(sync.RWMutex)}
99+
c := &Client{ctx: cctx, cancel: cancel, epMu: new(sync.RWMutex)}
100100
for _, opt := range opts {
101101
opt(c)
102102
}
103-
if c.lg == nil {
104-
c.lg = zap.NewNop()
103+
if c.lg.Load() == nil {
104+
c.lg.Store(zap.NewNop())
105105
}
106106
return c
107107
}
@@ -122,7 +122,7 @@ func NewFromURLs(urls []string) (*Client, error) {
122122
// WithZapLogger is a NewCtxClient option that overrides the logger
123123
func WithZapLogger(lg *zap.Logger) Option {
124124
return func(c *Client) {
125-
c.lg = lg
125+
c.lg.Store(lg)
126126
}
127127
}
128128

@@ -133,19 +133,14 @@ func WithZapLogger(lg *zap.Logger) Option {
133133
// Does not changes grpcLogger, that can be explicitly configured
134134
// using grpc_zap.ReplaceGrpcLoggerV2(..) method.
135135
func (c *Client) WithLogger(lg *zap.Logger) *Client {
136-
c.lgMu.Lock()
137-
c.lg = lg
138-
c.lgMu.Unlock()
136+
c.lg.Store(lg)
139137
return c
140138
}
141139

142140
// GetLogger gets the logger.
143141
// NOTE: This method is for internal use of etcd-client library and should not be used as general-purpose logger.
144142
func (c *Client) GetLogger() *zap.Logger {
145-
c.lgMu.RLock()
146-
l := c.lg
147-
c.lgMu.RUnlock()
148-
return l
143+
return c.lg.Load()
149144
}
150145

151146
// Close shuts down the client's etcd connections.
@@ -205,7 +200,7 @@ func (c *Client) Sync(ctx context.Context) error {
205200
return len(eps) > 0, nil
206201
})
207202
c.SetEndpoints(eps...)
208-
c.lg.Debug("set etcd endpoints by autoSync", zap.Strings("endpoints", eps))
203+
c.GetLogger().Debug("set etcd endpoints by autoSync", zap.Strings("endpoints", eps))
209204
return nil
210205
}
211206

@@ -223,7 +218,7 @@ func (c *Client) autoSync() {
223218
err := c.Sync(ctx)
224219
cancel()
225220
if err != nil && !errors.Is(err, c.ctx.Err()) {
226-
c.lg.Info("Auto sync endpoints failed.", zap.Error(err))
221+
c.GetLogger().Info("Auto sync endpoints failed.", zap.Error(err))
227222
}
228223
}
229224
}
@@ -402,23 +397,24 @@ func newClient(cfg *Config) (*Client, error) {
402397
cancel: cancel,
403398
epMu: new(sync.RWMutex),
404399
callOpts: defaultCallOpts,
405-
lgMu: new(sync.RWMutex),
406400
}
407401

408402
var err error
403+
var lg *zap.Logger
409404
if cfg.Logger != nil {
410-
client.lg = cfg.Logger
405+
lg = cfg.Logger
411406
} else if cfg.LogConfig != nil {
412-
client.lg, err = cfg.LogConfig.Build()
407+
lg, err = cfg.LogConfig.Build()
413408
} else {
414-
client.lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
415-
if client.lg != nil {
416-
client.lg = client.lg.Named("etcd-client")
409+
lg, err = logutil.CreateDefaultZapLogger(etcdClientDebugLevel())
410+
if lg != nil {
411+
lg = lg.Named("etcd-client")
417412
}
418413
}
419414
if err != nil {
420415
return nil, err
421416
}
417+
client.lg.Store(lg)
422418

423419
if cfg.Username != "" && cfg.Password != "" {
424420
client.Username = cfg.Username
@@ -508,10 +504,10 @@ func (c *Client) roundRobinQuorumBackoff(waitBetween time.Duration, jitterFracti
508504
n := uint(len(c.Endpoints()))
509505
quorum := (n/2 + 1)
510506
if attempt%quorum == 0 {
511-
c.lg.Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
507+
c.GetLogger().Debug("backoff", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum), zap.Duration("waitBetween", waitBetween), zap.Float64("jitterFraction", jitterFraction))
512508
return jitterUp(waitBetween, jitterFraction)
513509
}
514-
c.lg.Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
510+
c.GetLogger().Debug("backoff skipped", zap.Uint("attempt", attempt), zap.Uint("quorum", quorum))
515511
return 0
516512
}
517513
}

client/v3/client_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -257,12 +257,12 @@ func TestCloseCtxClient(t *testing.T) {
257257
func TestWithLogger(t *testing.T) {
258258
ctx := t.Context()
259259
c := NewCtxClient(ctx)
260-
if c.lg == nil {
260+
if c.lg.Load() == nil {
261261
t.Errorf("unexpected nil in *zap.Logger")
262262
}
263263

264264
c.WithLogger(nil)
265-
if c.lg != nil {
265+
if c.GetLogger() != nil {
266266
t.Errorf("WithLogger should modify *zap.Logger")
267267
}
268268
}
@@ -272,7 +272,7 @@ func TestZapWithLogger(t *testing.T) {
272272
lg := zap.NewNop()
273273
c := NewCtxClient(ctx, WithZapLogger(lg))
274274

275-
if c.lg != lg {
275+
if c.GetLogger() != lg {
276276
t.Errorf("WithZapLogger should modify *zap.Logger")
277277
}
278278
}

client/v3/lease.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout
204204
l.firstKeepAliveTimeout = defaultTTL
205205
}
206206
if c != nil {
207-
l.lg = c.lg
207+
l.lg = c.GetLogger()
208208
l.callOpts = c.callOpts
209209
}
210210
reqLeaderCtx := WithRequireLeader(context.Background())

client/v3/maintenance.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ type maintenance struct {
113113

114114
func NewMaintenance(c *Client) Maintenance {
115115
api := &maintenance{
116-
lg: c.lg,
116+
lg: c.GetLogger(),
117117
dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
118118
conn, err := c.Dial(endpoint)
119119
if err != nil {
@@ -140,7 +140,7 @@ func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client)
140140
}
141141
if c != nil {
142142
api.callOpts = c.callOpts
143-
api.lg = c.lg
143+
api.lg = c.GetLogger()
144144
}
145145
return api
146146
}

client/v3/retry_interceptor.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -240,12 +240,12 @@ func (s *serverStreamingRetryingStream) RecvMsg(m any) error {
240240
}
241241
newStream, err := s.reestablishStreamAndResendBuffer(s.ctx)
242242
if err != nil {
243-
s.client.lg.Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
243+
s.client.GetLogger().Error("failed reestablishStreamAndResendBuffer", zap.Error(err))
244244
return err // TODO(mwitkow): Maybe dial and transport errors should be retriable?
245245
}
246246
s.setStream(newStream)
247247

248-
s.client.lg.Warn("retrying RecvMsg", zap.Error(lastErr))
248+
s.client.GetLogger().Warn("retrying RecvMsg", zap.Error(lastErr))
249249
attemptRetry, lastErr = s.receiveMsgAndIndicateRetry(m)
250250
if !attemptRetry {
251251
return lastErr
@@ -278,7 +278,7 @@ func (s *serverStreamingRetryingStream) receiveMsgAndIndicateRetry(m any) (bool,
278278
if s.client.shouldRefreshToken(err, s.callOpts) {
279279
gtErr := s.client.refreshToken(s.ctx)
280280
if gtErr != nil {
281-
s.client.lg.Warn("retry failed to fetch new auth token", zap.Error(gtErr))
281+
s.client.GetLogger().Warn("retry failed to fetch new auth token", zap.Error(gtErr))
282282
return false, err // return the original error for simplicity
283283
}
284284
return true, err
@@ -344,7 +344,7 @@ func isSafeRetry(c *Client, err error, callOpts *options) bool {
344344
case nonRepeatable:
345345
return isSafeRetryMutableRPC(err)
346346
default:
347-
c.lg.Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
347+
c.GetLogger().Warn("unrecognized retry policy", zap.String("retryPolicy", callOpts.retryPolicy.String()))
348348
return false
349349
}
350350
}

client/v3/watch.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
254254
}
255255
if c != nil {
256256
w.callOpts = c.callOpts
257-
w.lg = c.lg
257+
w.lg = c.GetLogger()
258258
}
259259
return w
260260
}

tests/robustness/client/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
// allow for concurrent requests to conform to model.AppendableHistory requirements.
3636
type RecordingClient struct {
3737
ID int
38-
client clientv3.Client
38+
client *clientv3.Client
3939
// using baseTime time-measuring operation to get monotonic clock reading
4040
// see https://github.com/golang/go/blob/master/src/time/time.go#L17
4141
baseTime time.Time
@@ -66,7 +66,7 @@ func NewRecordingClient(endpoints []string, ids identity.Provider, baseTime time
6666
}
6767
return &RecordingClient{
6868
ID: ids.NewClientID(),
69-
client: *cc,
69+
client: cc,
7070
kvOperations: model.NewAppendableHistory(ids),
7171
baseTime: baseTime,
7272
}, nil

0 commit comments

Comments
 (0)