diff --git a/dgraph/cmd/zero/oracle.go b/dgraph/cmd/zero/oracle.go index 4fac71cbd4e..3ce5ab28345 100644 --- a/dgraph/cmd/zero/oracle.go +++ b/dgraph/cmd/zero/oracle.go @@ -225,26 +225,56 @@ func (o *Oracle) sendDeltasToSubscribers() { // Let's ensure that we have all the commits up until the max here. // Otherwise, we'll be sending commit timestamps out of order, which // would cause Alphas to drop some of them, during writes to Badger. + + newDelta := &pb.OracleDelta{} + useNewDelta := false if o.doneUntil.DoneUntil() < waitFor() { - continue // The for loop doing blocking reads from o.updates. - // We need at least one entry from the updates channel to pick up a missing update. - // Don't goto slurp_loop, because it would break from select immediately. + if len(delta.Txns) > 10 { + replacementTxn := []*pb.TxnStatus{} + + ts := o.doneUntil.DoneUntil() + for _, txn := range delta.Txns { + if txn.CommitTs > ts { + replacementTxn = append(replacementTxn, txn) + } else { + newDelta.Txns = append(newDelta.Txns, txn) + newDelta.MaxAssigned = x.Max(newDelta.MaxAssigned, txn.CommitTs) + } + } + + if len(newDelta.Txns) == 0 { + continue + } + + useNewDelta = true + delta.Txns = replacementTxn + } else { + continue // The for loop doing blocking reads from o.updates. + // We need at least one entry from the updates channel to pick up a missing update. + // Don't goto slurp_loop, because it would break from select immediately. + } } if glog.V(3) { glog.Infof("DoneUntil: %d. Sending delta: %+v\n", o.doneUntil.DoneUntil(), delta) } o.Lock() + k := *delta + if useNewDelta { + k = *newDelta + } for id, ch := range o.subscribers { select { - case ch <- *delta: + case ch <- k: default: close(ch) delete(o.subscribers, id) } } o.Unlock() - delta = &pb.OracleDelta{} + if useNewDelta { + delta = &pb.OracleDelta{} + } } } diff --git a/posting/oracle.go b/posting/oracle.go index 5b6c4510c22..06fbee3df4d 100644 --- a/posting/oracle.go +++ b/posting/oracle.go @@ -16,6 +16,7 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/golang/glog" ostats "go.opencensus.io/stats" + "go.opentelemetry.io/otel/trace" "github.com/hypermodeinc/dgraph/v25/protos/pb" "github.com/hypermodeinc/dgraph/v25/tok/index" @@ -54,6 +55,8 @@ type Txn struct { lastUpdate time.Time cache *LocalCache // This pointer does not get modified. + + Span trace.Span } // struct to implement Txn interface from vector-indexer diff --git a/worker/draft.go b/worker/draft.go index 97a7ea9d43e..6df9342e0dd 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -876,6 +876,12 @@ func (n *node) processApplyCh() { glog.V(3).Infof("handling element in applyCh with #entries %v", len(entries)) defer glog.V(3).Infof("done handling element in applyCh") + _, spanHandler := otel.Tracer("applyCh").Start(context.Background(), "Alpha.processApplyCh") + defer spanHandler.End() + + spanHandler.AddEvent("handling element in applyCh with #entries %v", trace.WithAttributes( + attribute.Int64("numEntries", int64(len(entries))))) + var totalSize int64 for _, entry := range entries { x.AssertTrue(len(entry.Data) > 0) @@ -909,7 +915,7 @@ func (n *node) processApplyCh() { p := &P{err: perr, size: psz, seen: time.Now()} previous[key] = p } - span := trace.SpanFromContext(n.ctx) + span := trace.SpanFromContext(n.Ctx(key)) if perr != nil { glog.Errorf("Applying proposal. Error: %v. Proposal: %q.", perr, &proposal) span.AddEvent(fmt.Sprintf("Applying proposal failed. Error: %v Proposal: %q", perr, &proposal)) @@ -919,11 +925,22 @@ func (n *node) processApplyCh() { attribute.Int64("key", int64(key)), attribute.Int64("index", int64(proposal.Index)), )) + spanHandler.AddEvent("Applied proposal with key: %d, index: %d. Err: %v", + trace.WithAttributes( + attribute.Int64("key", int64(key)), + attribute.Int64("index", int64(proposal.Index)), + )) var tags []tag.Mutator switch { case proposal.Mutations != nil: tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Mutations")) + span.SetAttributes(attribute.Int64("start_ts", int64(proposal.Mutations.StartTs))) + txn := posting.Oracle().GetTxn(proposal.Mutations.StartTs) + if txn != nil { + txn.Span = span + } + case proposal.Delta != nil: tags = append(tags, tag.Upsert(x.KeyMethod, "apply.Delta")) } @@ -966,10 +983,15 @@ func (n *node) processApplyCh() { // TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused. func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { + pctx, span := otel.Tracer("alpha.CommitLoop").Start(context.Background(), "alpha.commitOrAbort") + defer span.End() + x.PrintOracleDelta(delta) // First let's commit all mutations to disk. writer := posting.NewTxnWriter(pstore) toDisk := func(start, commit uint64) { + _, tspan := otel.Tracer("alpha.CommitLoop").Start(pctx, "alpha.toDisk") + defer tspan.End() txn := posting.Oracle().GetTxn(start) if txn == nil || commit == 0 { return @@ -995,6 +1017,12 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { start, commit, err) panic(err) } + + tspan.AddEvent("Committed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes( + attribute.Int64("start_ts", int64(start)), + attribute.Int64("commit_ts", int64(commit)), + )) + tspan.SetAttributes(attribute.Int64("start_ts", int64(start)), attribute.Int64("commit_ts", int64(commit))) } for _, status := range delta.Txns { @@ -1004,6 +1032,17 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error { return errors.Wrapf(err, "while flushing to disk") } + span.AddEvent("Flushed to disk") + for _, status := range delta.Txns { + txn := posting.Oracle().GetTxn(status.StartTs) + if txn != nil && txn.Span != nil { + txn.Span.AddEvent("Flushed txn with start_ts: %d, commit_ts: %d", trace.WithAttributes( + attribute.Int64("start_ts", int64(status.StartTs)), + attribute.Int64("commit_ts", int64(status.CommitTs)), + )) + } + } + if x.WorkerConfig.HardSync { if err := pstore.Sync(); err != nil { glog.Errorf("Error while calling Sync while commitOrAbort: %v", err) diff --git a/worker/proposal.go b/worker/proposal.go index 07d531b3a4a..6c00fadca43 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -221,6 +221,8 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr cctx, cancel := context.WithCancel(context.Background()) defer cancel() + cctx = trace.ContextWithSpan(cctx, span) + errCh := make(chan error, 1) pctx := &conn.ProposalCtx{ ErrCh: errCh,