1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package clientv3
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "sync"
22 "time"
23
24 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
25 "go.etcd.io/etcd/api/v3/mvccpb"
26 v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
27
28 "go.uber.org/zap"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/status"
33 )
34
35 const (
36 EventTypeDelete = mvccpb.DELETE
37 EventTypePut = mvccpb.PUT
38
39 closeSendErrTimeout = 250 * time.Millisecond
40
41
42
43 AutoWatchID = 0
44
45
46 InvalidWatchID = -1
47 )
48
49 type Event mvccpb.Event
50
51 type WatchChan <-chan WatchResponse
52
53 type Watcher interface {
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
83
84
85 RequestProgress(ctx context.Context) error
86
87
88 Close() error
89 }
90
91 type WatchResponse struct {
92 Header pb.ResponseHeader
93 Events []*Event
94
95
96 CompactRevision int64
97
98
99
100
101 Canceled bool
102
103
104 Created bool
105
106 closeErr error
107
108
109 cancelReason string
110 }
111
112
113 func (e *Event) IsCreate() bool {
114 return e.Type == EventTypePut && e.Kv.CreateRevision == e.Kv.ModRevision
115 }
116
117
118 func (e *Event) IsModify() bool {
119 return e.Type == EventTypePut && e.Kv.CreateRevision != e.Kv.ModRevision
120 }
121
122
123 func (wr *WatchResponse) Err() error {
124 switch {
125 case wr.closeErr != nil:
126 return v3rpc.Error(wr.closeErr)
127 case wr.CompactRevision != 0:
128 return v3rpc.ErrCompacted
129 case wr.Canceled:
130 if len(wr.cancelReason) != 0 {
131 return v3rpc.Error(status.Error(codes.FailedPrecondition, wr.cancelReason))
132 }
133 return v3rpc.ErrFutureRev
134 }
135 return nil
136 }
137
138
139 func (wr *WatchResponse) IsProgressNotify() bool {
140 return len(wr.Events) == 0 && !wr.Canceled && !wr.Created && wr.CompactRevision == 0 && wr.Header.Revision != 0
141 }
142
143
144 type watcher struct {
145 remote pb.WatchClient
146 callOpts []grpc.CallOption
147
148
149 mu sync.Mutex
150
151
152 streams map[string]*watchGrpcStream
153 lg *zap.Logger
154 }
155
156
157 type watchGrpcStream struct {
158 owner *watcher
159 remote pb.WatchClient
160 callOpts []grpc.CallOption
161
162
163 ctx context.Context
164
165 ctxKey string
166 cancel context.CancelFunc
167
168
169 substreams map[int64]*watcherStream
170
171 resuming []*watcherStream
172
173
174 reqc chan watchStreamRequest
175
176 respc chan *pb.WatchResponse
177
178 donec chan struct{}
179
180 errc chan error
181
182 closingc chan *watcherStream
183
184 wg sync.WaitGroup
185
186
187 resumec chan struct{}
188
189 closeErr error
190
191 lg *zap.Logger
192 }
193
194
195 type watchStreamRequest interface {
196 toPB() *pb.WatchRequest
197 }
198
199
200 type watchRequest struct {
201 ctx context.Context
202 key string
203 end string
204 rev int64
205
206
207 createdNotify bool
208
209 progressNotify bool
210
211
212
213 fragment bool
214
215
216 filters []pb.WatchCreateRequest_FilterType
217
218 prevKV bool
219
220 retc chan chan WatchResponse
221 }
222
223
224 type progressRequest struct {
225 }
226
227
228 type watcherStream struct {
229
230 initReq watchRequest
231
232
233 outc chan WatchResponse
234
235 recvc chan *WatchResponse
236
237 donec chan struct{}
238
239 closing bool
240
241 id int64
242
243
244 buf []*WatchResponse
245 }
246
247 func NewWatcher(c *Client) Watcher {
248 return NewWatchFromWatchClient(pb.NewWatchClient(c.conn), c)
249 }
250
251 func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher {
252 w := &watcher{
253 remote: wc,
254 streams: make(map[string]*watchGrpcStream),
255 }
256 if c != nil {
257 w.callOpts = c.callOpts
258 w.lg = c.lg
259 }
260 return w
261 }
262
263
264 var valCtxCh = make(chan struct{})
265 var zeroTime = time.Unix(0, 0)
266
267
268 type valCtx struct{ context.Context }
269
270 func (vc *valCtx) Deadline() (time.Time, bool) { return zeroTime, false }
271 func (vc *valCtx) Done() <-chan struct{} { return valCtxCh }
272 func (vc *valCtx) Err() error { return nil }
273
274 func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
275 ctx, cancel := context.WithCancel(&valCtx{inctx})
276 wgs := &watchGrpcStream{
277 owner: w,
278 remote: w.remote,
279 callOpts: w.callOpts,
280 ctx: ctx,
281 ctxKey: streamKeyFromCtx(inctx),
282 cancel: cancel,
283 substreams: make(map[int64]*watcherStream),
284 respc: make(chan *pb.WatchResponse),
285 reqc: make(chan watchStreamRequest),
286 donec: make(chan struct{}),
287 errc: make(chan error, 1),
288 closingc: make(chan *watcherStream),
289 resumec: make(chan struct{}),
290 lg: w.lg,
291 }
292 go wgs.run()
293 return wgs
294 }
295
296
297 func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
298 ow := opWatch(key, opts...)
299
300 var filters []pb.WatchCreateRequest_FilterType
301 if ow.filterPut {
302 filters = append(filters, pb.WatchCreateRequest_NOPUT)
303 }
304 if ow.filterDelete {
305 filters = append(filters, pb.WatchCreateRequest_NODELETE)
306 }
307
308 wr := &watchRequest{
309 ctx: ctx,
310 createdNotify: ow.createdNotify,
311 key: string(ow.key),
312 end: string(ow.end),
313 rev: ow.rev,
314 progressNotify: ow.progressNotify,
315 fragment: ow.fragment,
316 filters: filters,
317 prevKV: ow.prevKV,
318 retc: make(chan chan WatchResponse, 1),
319 }
320
321 ok := false
322 ctxKey := streamKeyFromCtx(ctx)
323
324 var closeCh chan WatchResponse
325 for {
326
327 w.mu.Lock()
328 if w.streams == nil {
329
330 w.mu.Unlock()
331 ch := make(chan WatchResponse)
332 close(ch)
333 return ch
334 }
335 wgs := w.streams[ctxKey]
336 if wgs == nil {
337 wgs = w.newWatcherGrpcStream(ctx)
338 w.streams[ctxKey] = wgs
339 }
340 donec := wgs.donec
341 reqc := wgs.reqc
342 w.mu.Unlock()
343
344
345 if closeCh == nil {
346 closeCh = make(chan WatchResponse, 1)
347 }
348
349
350 select {
351 case reqc <- wr:
352 ok = true
353 case <-wr.ctx.Done():
354 ok = false
355 case <-donec:
356 ok = false
357 if wgs.closeErr != nil {
358 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
359 break
360 }
361
362 continue
363 }
364
365
366 if ok {
367 select {
368 case ret := <-wr.retc:
369 return ret
370 case <-ctx.Done():
371 case <-donec:
372 if wgs.closeErr != nil {
373 closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
374 break
375 }
376
377 continue
378 }
379 }
380 break
381 }
382
383 close(closeCh)
384 return closeCh
385 }
386
387 func (w *watcher) Close() (err error) {
388 w.mu.Lock()
389 streams := w.streams
390 w.streams = nil
391 w.mu.Unlock()
392 for _, wgs := range streams {
393 if werr := wgs.close(); werr != nil {
394 err = werr
395 }
396 }
397
398 if err == context.Canceled {
399 err = nil
400 }
401 return err
402 }
403
404
405 func (w *watcher) RequestProgress(ctx context.Context) (err error) {
406 ctxKey := streamKeyFromCtx(ctx)
407
408 w.mu.Lock()
409 if w.streams == nil {
410 w.mu.Unlock()
411 return fmt.Errorf("no stream found for context")
412 }
413 wgs := w.streams[ctxKey]
414 if wgs == nil {
415 wgs = w.newWatcherGrpcStream(ctx)
416 w.streams[ctxKey] = wgs
417 }
418 donec := wgs.donec
419 reqc := wgs.reqc
420 w.mu.Unlock()
421
422 pr := &progressRequest{}
423
424 select {
425 case reqc <- pr:
426 return nil
427 case <-ctx.Done():
428 return ctx.Err()
429 case <-donec:
430 if wgs.closeErr != nil {
431 return wgs.closeErr
432 }
433
434 return w.RequestProgress(ctx)
435 }
436 }
437
438 func (w *watchGrpcStream) close() (err error) {
439 w.cancel()
440 <-w.donec
441 select {
442 case err = <-w.errc:
443 default:
444 }
445 return toErr(w.ctx, err)
446 }
447
448 func (w *watcher) closeStream(wgs *watchGrpcStream) {
449 w.mu.Lock()
450 close(wgs.donec)
451 wgs.cancel()
452 if w.streams != nil {
453 delete(w.streams, wgs.ctxKey)
454 }
455 w.mu.Unlock()
456 }
457
458 func (w *watchGrpcStream) addSubstream(resp *pb.WatchResponse, ws *watcherStream) {
459
460 if resp.WatchId == InvalidWatchID || (resp.Canceled && resp.CancelReason != "") {
461 w.closeErr = v3rpc.Error(errors.New(resp.CancelReason))
462
463 close(ws.recvc)
464 return
465 }
466 ws.id = resp.WatchId
467 w.substreams[ws.id] = ws
468 }
469
470 func (w *watchGrpcStream) sendCloseSubstream(ws *watcherStream, resp *WatchResponse) {
471 select {
472 case ws.outc <- *resp:
473 case <-ws.initReq.ctx.Done():
474 case <-time.After(closeSendErrTimeout):
475 }
476 close(ws.outc)
477 }
478
479 func (w *watchGrpcStream) closeSubstream(ws *watcherStream) {
480
481 select {
482 case ws.initReq.retc <- ws.outc:
483 default:
484 }
485
486 if closeErr := w.closeErr; closeErr != nil && ws.initReq.ctx.Err() == nil {
487 go w.sendCloseSubstream(ws, &WatchResponse{Canceled: true, closeErr: w.closeErr})
488 } else if ws.outc != nil {
489 close(ws.outc)
490 }
491 if ws.id != InvalidWatchID {
492 delete(w.substreams, ws.id)
493 return
494 }
495 for i := range w.resuming {
496 if w.resuming[i] == ws {
497 w.resuming[i] = nil
498 return
499 }
500 }
501 }
502
503
504 func (w *watchGrpcStream) run() {
505 var wc pb.Watch_WatchClient
506 var closeErr error
507
508
509
510 closing := make(map[*watcherStream]struct{})
511
512 defer func() {
513 w.closeErr = closeErr
514
515 for _, ws := range w.substreams {
516 if _, ok := closing[ws]; !ok {
517 close(ws.recvc)
518 closing[ws] = struct{}{}
519 }
520 }
521 for _, ws := range w.resuming {
522 if _, ok := closing[ws]; ws != nil && !ok {
523 close(ws.recvc)
524 closing[ws] = struct{}{}
525 }
526 }
527 w.joinSubstreams()
528 for range closing {
529 w.closeSubstream(<-w.closingc)
530 }
531 w.wg.Wait()
532 w.owner.closeStream(w)
533 }()
534
535
536 if wc, closeErr = w.newWatchClient(); closeErr != nil {
537 return
538 }
539
540 cancelSet := make(map[int64]struct{})
541
542 var cur *pb.WatchResponse
543 backoff := time.Millisecond
544 for {
545 select {
546
547 case req := <-w.reqc:
548 switch wreq := req.(type) {
549 case *watchRequest:
550 outc := make(chan WatchResponse, 1)
551
552 ws := &watcherStream{
553 initReq: *wreq,
554 id: InvalidWatchID,
555 outc: outc,
556
557 recvc: make(chan *WatchResponse),
558 }
559
560 ws.donec = make(chan struct{})
561 w.wg.Add(1)
562 go w.serveSubstream(ws, w.resumec)
563
564
565 w.resuming = append(w.resuming, ws)
566 if len(w.resuming) == 1 {
567
568 if err := wc.Send(ws.initReq.toPB()); err != nil {
569 w.lg.Debug("error when sending request", zap.Error(err))
570 }
571 }
572 case *progressRequest:
573 if err := wc.Send(wreq.toPB()); err != nil {
574 w.lg.Debug("error when sending request", zap.Error(err))
575 }
576 }
577
578
579 case pbresp := <-w.respc:
580 if cur == nil || pbresp.Created || pbresp.Canceled {
581 cur = pbresp
582 } else if cur != nil && cur.WatchId == pbresp.WatchId {
583
584 cur.Events = append(cur.Events, pbresp.Events...)
585
586 cur.Fragment = pbresp.Fragment
587 }
588
589 switch {
590 case pbresp.Created:
591
592 if len(w.resuming) != 0 {
593 if ws := w.resuming[0]; ws != nil {
594 w.addSubstream(pbresp, ws)
595 w.dispatchEvent(pbresp)
596 w.resuming[0] = nil
597 }
598 }
599
600 if ws := w.nextResume(); ws != nil {
601 if err := wc.Send(ws.initReq.toPB()); err != nil {
602 w.lg.Debug("error when sending request", zap.Error(err))
603 }
604 }
605
606
607 cur = nil
608
609 case pbresp.Canceled && pbresp.CompactRevision == 0:
610 delete(cancelSet, pbresp.WatchId)
611 if ws, ok := w.substreams[pbresp.WatchId]; ok {
612
613 close(ws.recvc)
614 closing[ws] = struct{}{}
615 }
616
617
618 cur = nil
619
620 case cur.Fragment:
621
622
623 continue
624
625 default:
626
627 ok := w.dispatchEvent(cur)
628
629
630 cur = nil
631
632 if ok {
633 break
634 }
635
636
637 if _, ok := cancelSet[pbresp.WatchId]; ok {
638 break
639 }
640
641 cancelSet[pbresp.WatchId] = struct{}{}
642 cr := &pb.WatchRequest_CancelRequest{
643 CancelRequest: &pb.WatchCancelRequest{
644 WatchId: pbresp.WatchId,
645 },
646 }
647 req := &pb.WatchRequest{RequestUnion: cr}
648 w.lg.Debug("sending watch cancel request for failed dispatch", zap.Int64("watch-id", pbresp.WatchId))
649 if err := wc.Send(req); err != nil {
650 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", pbresp.WatchId), zap.Error(err))
651 }
652 }
653
654
655 case err := <-w.errc:
656 if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
657 closeErr = err
658 return
659 }
660 backoff = w.backoffIfUnavailable(backoff, err)
661 if wc, closeErr = w.newWatchClient(); closeErr != nil {
662 return
663 }
664 if ws := w.nextResume(); ws != nil {
665 if err := wc.Send(ws.initReq.toPB()); err != nil {
666 w.lg.Debug("error when sending request", zap.Error(err))
667 }
668 }
669 cancelSet = make(map[int64]struct{})
670
671 case <-w.ctx.Done():
672 return
673
674 case ws := <-w.closingc:
675 w.closeSubstream(ws)
676 delete(closing, ws)
677
678 if len(w.substreams)+len(w.resuming) == 0 {
679 return
680 }
681 if ws.id != InvalidWatchID {
682
683
684 cancelSet[ws.id] = struct{}{}
685 cr := &pb.WatchRequest_CancelRequest{
686 CancelRequest: &pb.WatchCancelRequest{
687 WatchId: ws.id,
688 },
689 }
690 req := &pb.WatchRequest{RequestUnion: cr}
691 w.lg.Debug("sending watch cancel request for closed watcher", zap.Int64("watch-id", ws.id))
692 if err := wc.Send(req); err != nil {
693 w.lg.Debug("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err))
694 }
695 }
696 }
697 }
698 }
699
700
701
702 func (w *watchGrpcStream) nextResume() *watcherStream {
703 for len(w.resuming) != 0 {
704 if w.resuming[0] != nil {
705 return w.resuming[0]
706 }
707 w.resuming = w.resuming[1:len(w.resuming)]
708 }
709 return nil
710 }
711
712
713 func (w *watchGrpcStream) dispatchEvent(pbresp *pb.WatchResponse) bool {
714 events := make([]*Event, len(pbresp.Events))
715 for i, ev := range pbresp.Events {
716 events[i] = (*Event)(ev)
717 }
718
719 wr := &WatchResponse{
720 Header: *pbresp.Header,
721 Events: events,
722 CompactRevision: pbresp.CompactRevision,
723 Created: pbresp.Created,
724 Canceled: pbresp.Canceled,
725 cancelReason: pbresp.CancelReason,
726 }
727
728
729
730 if wr.IsProgressNotify() && pbresp.WatchId == InvalidWatchID {
731 return w.broadcastResponse(wr)
732 }
733
734 return w.unicastResponse(wr, pbresp.WatchId)
735
736 }
737
738
739 func (w *watchGrpcStream) broadcastResponse(wr *WatchResponse) bool {
740 for _, ws := range w.substreams {
741 select {
742 case ws.recvc <- wr:
743 case <-ws.donec:
744 }
745 }
746 return true
747 }
748
749
750 func (w *watchGrpcStream) unicastResponse(wr *WatchResponse, watchId int64) bool {
751 ws, ok := w.substreams[watchId]
752 if !ok {
753 return false
754 }
755 select {
756 case ws.recvc <- wr:
757 case <-ws.donec:
758 return false
759 }
760 return true
761 }
762
763
764 func (w *watchGrpcStream) serveWatchClient(wc pb.Watch_WatchClient) {
765 for {
766 resp, err := wc.Recv()
767 if err != nil {
768 select {
769 case w.errc <- err:
770 case <-w.donec:
771 }
772 return
773 }
774 select {
775 case w.respc <- resp:
776 case <-w.donec:
777 return
778 }
779 }
780 }
781
782
783 func (w *watchGrpcStream) serveSubstream(ws *watcherStream, resumec chan struct{}) {
784 if ws.closing {
785 panic("created substream goroutine but substream is closing")
786 }
787
788
789 nextRev := ws.initReq.rev
790 resuming := false
791 defer func() {
792 if !resuming {
793 ws.closing = true
794 }
795 close(ws.donec)
796 if !resuming {
797 w.closingc <- ws
798 }
799 w.wg.Done()
800 }()
801
802 emptyWr := &WatchResponse{}
803 for {
804 curWr := emptyWr
805 outc := ws.outc
806
807 if len(ws.buf) > 0 {
808 curWr = ws.buf[0]
809 } else {
810 outc = nil
811 }
812 select {
813 case outc <- *curWr:
814 if ws.buf[0].Err() != nil {
815 return
816 }
817 ws.buf[0] = nil
818 ws.buf = ws.buf[1:]
819 case wr, ok := <-ws.recvc:
820 if !ok {
821
822 return
823 }
824
825 if wr.Created {
826 if ws.initReq.retc != nil {
827 ws.initReq.retc <- ws.outc
828
829
830 ws.initReq.retc = nil
831
832
833 if ws.initReq.createdNotify {
834 ws.outc <- *wr
835 }
836
837
838
839
840
841
842
843
844
845 if ws.initReq.rev == 0 {
846 nextRev = wr.Header.Revision
847 }
848 }
849 } else {
850
851 nextRev = wr.Header.Revision + 1
852 }
853
854 if len(wr.Events) > 0 {
855 nextRev = wr.Events[len(wr.Events)-1].Kv.ModRevision + 1
856 }
857 ws.initReq.rev = nextRev
858
859
860
861 if wr.Created {
862 continue
863 }
864
865
866 ws.buf = append(ws.buf, wr)
867 case <-w.ctx.Done():
868 return
869 case <-ws.initReq.ctx.Done():
870 return
871 case <-resumec:
872 resuming = true
873 return
874 }
875 }
876
877 }
878
879 func (w *watchGrpcStream) newWatchClient() (pb.Watch_WatchClient, error) {
880
881 close(w.resumec)
882 w.resumec = make(chan struct{})
883 w.joinSubstreams()
884 for _, ws := range w.substreams {
885 ws.id = InvalidWatchID
886 w.resuming = append(w.resuming, ws)
887 }
888
889 var resuming []*watcherStream
890 for _, ws := range w.resuming {
891 if ws != nil {
892 resuming = append(resuming, ws)
893 }
894 }
895 w.resuming = resuming
896 w.substreams = make(map[int64]*watcherStream)
897
898
899 stopc := make(chan struct{})
900 donec := w.waitCancelSubstreams(stopc)
901 wc, err := w.openWatchClient()
902 close(stopc)
903 <-donec
904
905
906
907 for _, ws := range w.resuming {
908 if ws.closing {
909 continue
910 }
911 ws.donec = make(chan struct{})
912 w.wg.Add(1)
913 go w.serveSubstream(ws, w.resumec)
914 }
915
916 if err != nil {
917 return nil, v3rpc.Error(err)
918 }
919
920
921 go w.serveWatchClient(wc)
922 return wc, nil
923 }
924
925 func (w *watchGrpcStream) waitCancelSubstreams(stopc <-chan struct{}) <-chan struct{} {
926 var wg sync.WaitGroup
927 wg.Add(len(w.resuming))
928 donec := make(chan struct{})
929 for i := range w.resuming {
930 go func(ws *watcherStream) {
931 defer wg.Done()
932 if ws.closing {
933 if ws.initReq.ctx.Err() != nil && ws.outc != nil {
934 close(ws.outc)
935 ws.outc = nil
936 }
937 return
938 }
939 select {
940 case <-ws.initReq.ctx.Done():
941
942 ws.closing = true
943 close(ws.outc)
944 ws.outc = nil
945 w.wg.Add(1)
946 go func() {
947 defer w.wg.Done()
948 w.closingc <- ws
949 }()
950 case <-stopc:
951 }
952 }(w.resuming[i])
953 }
954 go func() {
955 defer close(donec)
956 wg.Wait()
957 }()
958 return donec
959 }
960
961
962 func (w *watchGrpcStream) joinSubstreams() {
963 for _, ws := range w.substreams {
964 <-ws.donec
965 }
966 for _, ws := range w.resuming {
967 if ws != nil {
968 <-ws.donec
969 }
970 }
971 }
972
973 var maxBackoff = 100 * time.Millisecond
974
975 func (w *watchGrpcStream) backoffIfUnavailable(backoff time.Duration, err error) time.Duration {
976 if isUnavailableErr(w.ctx, err) {
977
978 if backoff < maxBackoff {
979
980 backoff = backoff + backoff/4
981 if backoff > maxBackoff {
982 backoff = maxBackoff
983 }
984 }
985 time.Sleep(backoff)
986 }
987 return backoff
988 }
989
990
991
992
993 func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) {
994 backoff := time.Millisecond
995 for {
996 select {
997 case <-w.ctx.Done():
998 if err == nil {
999 return nil, w.ctx.Err()
1000 }
1001 return nil, err
1002 default:
1003 }
1004 if ws, err = w.remote.Watch(w.ctx, w.callOpts...); ws != nil && err == nil {
1005 break
1006 }
1007 if isHaltErr(w.ctx, err) {
1008 return nil, v3rpc.Error(err)
1009 }
1010 backoff = w.backoffIfUnavailable(backoff, err)
1011 }
1012 return ws, nil
1013 }
1014
1015
1016 func (wr *watchRequest) toPB() *pb.WatchRequest {
1017 req := &pb.WatchCreateRequest{
1018 StartRevision: wr.rev,
1019 Key: []byte(wr.key),
1020 RangeEnd: []byte(wr.end),
1021 ProgressNotify: wr.progressNotify,
1022 Filters: wr.filters,
1023 PrevKv: wr.prevKV,
1024 Fragment: wr.fragment,
1025 }
1026 cr := &pb.WatchRequest_CreateRequest{CreateRequest: req}
1027 return &pb.WatchRequest{RequestUnion: cr}
1028 }
1029
1030
1031 func (pr *progressRequest) toPB() *pb.WatchRequest {
1032 req := &pb.WatchProgressRequest{}
1033 cr := &pb.WatchRequest_ProgressRequest{ProgressRequest: req}
1034 return &pb.WatchRequest{RequestUnion: cr}
1035 }
1036
1037 func streamKeyFromCtx(ctx context.Context) string {
1038 if md, ok := metadata.FromOutgoingContext(ctx); ok {
1039 return fmt.Sprintf("%+v", md)
1040 }
1041 return ""
1042 }
1043
View as plain text