1 package dbus
2
3 import (
4 "context"
5 "errors"
6 "io"
7 "os"
8 "strings"
9 "sync"
10 )
11
12 var (
13 systemBus *Conn
14 systemBusLck sync.Mutex
15 sessionBus *Conn
16 sessionBusLck sync.Mutex
17 )
18
19
20 var ErrClosed = errors.New("dbus: connection closed by user")
21
22
23
24
25
26
27
28
29
30 type Conn struct {
31 transport
32
33 ctx context.Context
34 cancelCtx context.CancelFunc
35
36 closeOnce sync.Once
37 closeErr error
38
39 busObj BusObject
40 unixFD bool
41 uuid string
42
43 handler Handler
44 signalHandler SignalHandler
45 serialGen SerialGenerator
46 inInt Interceptor
47 outInt Interceptor
48 auth []Auth
49
50 names *nameTracker
51 calls *callTracker
52 outHandler *outputHandler
53
54 eavesdropped chan<- *Message
55 eavesdroppedLck sync.Mutex
56 }
57
58
59
60 func SessionBus() (conn *Conn, err error) {
61 sessionBusLck.Lock()
62 defer sessionBusLck.Unlock()
63 if sessionBus != nil &&
64 sessionBus.Connected() {
65 return sessionBus, nil
66 }
67 defer func() {
68 if conn != nil {
69 sessionBus = conn
70 }
71 }()
72 conn, err = ConnectSessionBus()
73 return
74 }
75
76 func getSessionBusAddress(autolaunch bool) (string, error) {
77 if address := os.Getenv("DBUS_SESSION_BUS_ADDRESS"); address != "" && address != "autolaunch:" {
78 return address, nil
79
80 } else if address := tryDiscoverDbusSessionBusAddress(); address != "" {
81 os.Setenv("DBUS_SESSION_BUS_ADDRESS", address)
82 return address, nil
83 }
84 if !autolaunch {
85 return "", errors.New("dbus: couldn't determine address of session bus")
86 }
87 return getSessionBusPlatformAddress()
88 }
89
90
91 func SessionBusPrivate(opts ...ConnOption) (*Conn, error) {
92 address, err := getSessionBusAddress(true)
93 if err != nil {
94 return nil, err
95 }
96
97 return Dial(address, opts...)
98 }
99
100
101
102 func SessionBusPrivateNoAutoStartup(opts ...ConnOption) (*Conn, error) {
103 address, err := getSessionBusAddress(false)
104 if err != nil {
105 return nil, err
106 }
107
108 return Dial(address, opts...)
109 }
110
111
112
113
114 func SessionBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) {
115 return SessionBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler))
116 }
117
118
119
120 func SystemBus() (conn *Conn, err error) {
121 systemBusLck.Lock()
122 defer systemBusLck.Unlock()
123 if systemBus != nil &&
124 systemBus.Connected() {
125 return systemBus, nil
126 }
127 defer func() {
128 if conn != nil {
129 systemBus = conn
130 }
131 }()
132 conn, err = ConnectSystemBus()
133 return
134 }
135
136
137 func ConnectSessionBus(opts ...ConnOption) (*Conn, error) {
138 address, err := getSessionBusAddress(true)
139 if err != nil {
140 return nil, err
141 }
142 return Connect(address, opts...)
143 }
144
145
146 func ConnectSystemBus(opts ...ConnOption) (*Conn, error) {
147 return Connect(getSystemBusPlatformAddress(), opts...)
148 }
149
150
151
152
153
154 func Connect(address string, opts ...ConnOption) (*Conn, error) {
155 conn, err := Dial(address, opts...)
156 if err != nil {
157 return nil, err
158 }
159 if err = conn.Auth(conn.auth); err != nil {
160 _ = conn.Close()
161 return nil, err
162 }
163 if err = conn.Hello(); err != nil {
164 _ = conn.Close()
165 return nil, err
166 }
167 return conn, nil
168 }
169
170
171
172
173 func SystemBusPrivate(opts ...ConnOption) (*Conn, error) {
174 return Dial(getSystemBusPlatformAddress(), opts...)
175 }
176
177
178
179
180 func SystemBusPrivateHandler(handler Handler, signalHandler SignalHandler) (*Conn, error) {
181 return SystemBusPrivate(WithHandler(handler), WithSignalHandler(signalHandler))
182 }
183
184
185 func Dial(address string, opts ...ConnOption) (*Conn, error) {
186 tr, err := getTransport(address)
187 if err != nil {
188 return nil, err
189 }
190 return newConn(tr, opts...)
191 }
192
193
194
195
196 func DialHandler(address string, handler Handler, signalHandler SignalHandler) (*Conn, error) {
197 return Dial(address, WithHandler(handler), WithSignalHandler(signalHandler))
198 }
199
200
201 type ConnOption func(conn *Conn) error
202
203
204 func WithHandler(handler Handler) ConnOption {
205 return func(conn *Conn) error {
206 conn.handler = handler
207 return nil
208 }
209 }
210
211
212 func WithSignalHandler(handler SignalHandler) ConnOption {
213 return func(conn *Conn) error {
214 conn.signalHandler = handler
215 return nil
216 }
217 }
218
219
220 func WithSerialGenerator(gen SerialGenerator) ConnOption {
221 return func(conn *Conn) error {
222 conn.serialGen = gen
223 return nil
224 }
225 }
226
227
228 func WithAuth(methods ...Auth) ConnOption {
229 return func(conn *Conn) error {
230 conn.auth = methods
231 return nil
232 }
233 }
234
235
236 type Interceptor func(msg *Message)
237
238
239 func WithIncomingInterceptor(interceptor Interceptor) ConnOption {
240 return func(conn *Conn) error {
241 conn.inInt = interceptor
242 return nil
243 }
244 }
245
246
247 func WithOutgoingInterceptor(interceptor Interceptor) ConnOption {
248 return func(conn *Conn) error {
249 conn.outInt = interceptor
250 return nil
251 }
252 }
253
254
255 func WithContext(ctx context.Context) ConnOption {
256 return func(conn *Conn) error {
257 conn.ctx = ctx
258 return nil
259 }
260 }
261
262
263 func NewConn(conn io.ReadWriteCloser, opts ...ConnOption) (*Conn, error) {
264 return newConn(genericTransport{conn}, opts...)
265 }
266
267
268
269
270 func NewConnHandler(conn io.ReadWriteCloser, handler Handler, signalHandler SignalHandler) (*Conn, error) {
271 return NewConn(genericTransport{conn}, WithHandler(handler), WithSignalHandler(signalHandler))
272 }
273
274
275 func newConn(tr transport, opts ...ConnOption) (*Conn, error) {
276 conn := new(Conn)
277 conn.transport = tr
278 for _, opt := range opts {
279 if err := opt(conn); err != nil {
280 return nil, err
281 }
282 }
283 if conn.ctx == nil {
284 conn.ctx = context.Background()
285 }
286 conn.ctx, conn.cancelCtx = context.WithCancel(conn.ctx)
287
288 conn.calls = newCallTracker()
289 if conn.handler == nil {
290 conn.handler = NewDefaultHandler()
291 }
292 if conn.signalHandler == nil {
293 conn.signalHandler = NewDefaultSignalHandler()
294 }
295 if conn.serialGen == nil {
296 conn.serialGen = newSerialGenerator()
297 }
298 conn.outHandler = &outputHandler{conn: conn}
299 conn.names = newNameTracker()
300 conn.busObj = conn.Object("org.freedesktop.DBus", "/org/freedesktop/DBus")
301
302 go func() {
303 <-conn.ctx.Done()
304 conn.Close()
305 }()
306 return conn, nil
307 }
308
309
310
311 func (conn *Conn) BusObject() BusObject {
312 return conn.busObj
313 }
314
315
316
317
318 func (conn *Conn) Close() error {
319 conn.closeOnce.Do(func() {
320 conn.outHandler.close()
321 if term, ok := conn.signalHandler.(Terminator); ok {
322 term.Terminate()
323 }
324
325 if term, ok := conn.handler.(Terminator); ok {
326 term.Terminate()
327 }
328
329 conn.eavesdroppedLck.Lock()
330 if conn.eavesdropped != nil {
331 close(conn.eavesdropped)
332 }
333 conn.eavesdroppedLck.Unlock()
334
335 conn.cancelCtx()
336
337 conn.closeErr = conn.transport.Close()
338 })
339 return conn.closeErr
340 }
341
342
343
344 func (conn *Conn) Context() context.Context {
345 return conn.ctx
346 }
347
348
349 func (conn *Conn) Connected() bool {
350 return conn.ctx.Err() == nil
351 }
352
353
354
355
356
357
358
359
360
361 func (conn *Conn) Eavesdrop(ch chan<- *Message) {
362 conn.eavesdroppedLck.Lock()
363 conn.eavesdropped = ch
364 conn.eavesdroppedLck.Unlock()
365 }
366
367
368 func (conn *Conn) getSerial() uint32 {
369 return conn.serialGen.GetSerial()
370 }
371
372
373
374
375 func (conn *Conn) Hello() error {
376 var s string
377 err := conn.busObj.Call("org.freedesktop.DBus.Hello", 0).Store(&s)
378 if err != nil {
379 return err
380 }
381 conn.names.acquireUniqueConnectionName(s)
382 return nil
383 }
384
385
386
387 func (conn *Conn) inWorker() {
388 sequenceGen := newSequenceGenerator()
389 for {
390 msg, err := conn.ReadMessage()
391 if err != nil {
392 if _, ok := err.(InvalidMessageError); !ok {
393
394
395
396 conn.Close()
397 conn.calls.finalizeAllWithError(sequenceGen, err)
398 return
399 }
400
401 continue
402 }
403 conn.eavesdroppedLck.Lock()
404 if conn.eavesdropped != nil {
405 select {
406 case conn.eavesdropped <- msg:
407 default:
408 }
409 conn.eavesdroppedLck.Unlock()
410 continue
411 }
412 conn.eavesdroppedLck.Unlock()
413 dest, _ := msg.Headers[FieldDestination].value.(string)
414 found := dest == "" ||
415 !conn.names.uniqueNameIsKnown() ||
416 conn.names.isKnownName(dest)
417 if !found {
418
419
420 continue
421 }
422
423 if conn.inInt != nil {
424 conn.inInt(msg)
425 }
426 sequence := sequenceGen.next()
427 switch msg.Type {
428 case TypeError:
429 conn.serialGen.RetireSerial(conn.calls.handleDBusError(sequence, msg))
430 case TypeMethodReply:
431 conn.serialGen.RetireSerial(conn.calls.handleReply(sequence, msg))
432 case TypeSignal:
433 conn.handleSignal(sequence, msg)
434 case TypeMethodCall:
435 go conn.handleCall(msg)
436 }
437
438 }
439 }
440
441 func (conn *Conn) handleSignal(sequence Sequence, msg *Message) {
442 iface := msg.Headers[FieldInterface].value.(string)
443 member := msg.Headers[FieldMember].value.(string)
444
445
446 sender, _ := msg.Headers[FieldSender].value.(string)
447 if iface == "org.freedesktop.DBus" && sender == "org.freedesktop.DBus" {
448 if member == "NameLost" {
449
450
451 name, ok := msg.Body[0].(string)
452 if !ok {
453 panic("Unable to read the lost name")
454 }
455 conn.names.loseName(name)
456 } else if member == "NameAcquired" {
457
458
459 name, ok := msg.Body[0].(string)
460 if !ok {
461 panic("Unable to read the acquired name")
462 }
463 conn.names.acquireName(name)
464 }
465 }
466 signal := &Signal{
467 Sender: sender,
468 Path: msg.Headers[FieldPath].value.(ObjectPath),
469 Name: iface + "." + member,
470 Body: msg.Body,
471 Sequence: sequence,
472 }
473 conn.signalHandler.DeliverSignal(iface, member, signal)
474 }
475
476
477
478
479 func (conn *Conn) Names() []string {
480 return conn.names.listKnownNames()
481 }
482
483
484 func (conn *Conn) Object(dest string, path ObjectPath) BusObject {
485 return &Object{conn, dest, path}
486 }
487
488 func (conn *Conn) sendMessageAndIfClosed(msg *Message, ifClosed func()) {
489 if msg.serial == 0 {
490 msg.serial = conn.getSerial()
491 }
492 if conn.outInt != nil {
493 conn.outInt(msg)
494 }
495 err := conn.outHandler.sendAndIfClosed(msg, ifClosed)
496 if err != nil {
497 conn.handleSendError(msg, err)
498 } else if msg.Type != TypeMethodCall {
499 conn.serialGen.RetireSerial(msg.serial)
500 }
501 }
502
503 func (conn *Conn) handleSendError(msg *Message, err error) {
504 if msg.Type == TypeMethodCall {
505 conn.calls.handleSendError(msg, err)
506 } else if msg.Type == TypeMethodReply {
507 if _, ok := err.(FormatError); ok {
508 conn.sendError(err, msg.Headers[FieldDestination].value.(string), msg.Headers[FieldReplySerial].value.(uint32))
509 }
510 }
511 conn.serialGen.RetireSerial(msg.serial)
512 }
513
514
515
516
517
518
519
520 func (conn *Conn) Send(msg *Message, ch chan *Call) *Call {
521 return conn.send(context.Background(), msg, ch)
522 }
523
524
525 func (conn *Conn) SendWithContext(ctx context.Context, msg *Message, ch chan *Call) *Call {
526 return conn.send(ctx, msg, ch)
527 }
528
529 func (conn *Conn) send(ctx context.Context, msg *Message, ch chan *Call) *Call {
530 if ctx == nil {
531 panic("nil context")
532 }
533 if ch == nil {
534 ch = make(chan *Call, 1)
535 } else if cap(ch) == 0 {
536 panic("dbus: unbuffered channel passed to (*Conn).Send")
537 }
538
539 var call *Call
540 ctx, canceler := context.WithCancel(ctx)
541 msg.serial = conn.getSerial()
542 if msg.Type == TypeMethodCall && msg.Flags&FlagNoReplyExpected == 0 {
543 call = new(Call)
544 call.Destination, _ = msg.Headers[FieldDestination].value.(string)
545 call.Path, _ = msg.Headers[FieldPath].value.(ObjectPath)
546 iface, _ := msg.Headers[FieldInterface].value.(string)
547 member, _ := msg.Headers[FieldMember].value.(string)
548 call.Method = iface + "." + member
549 call.Args = msg.Body
550 call.Done = ch
551 call.ctx = ctx
552 call.ctxCanceler = canceler
553 conn.calls.track(msg.serial, call)
554 if ctx.Err() != nil {
555
556 conn.calls.handleSendError(msg, ctx.Err())
557 return call
558 }
559 go func() {
560 <-ctx.Done()
561 conn.calls.handleSendError(msg, ctx.Err())
562 }()
563 conn.sendMessageAndIfClosed(msg, func() {
564 conn.calls.handleSendError(msg, ErrClosed)
565 canceler()
566 })
567 } else {
568 canceler()
569 call = &Call{Err: nil, Done: ch}
570 ch <- call
571 conn.sendMessageAndIfClosed(msg, func() {
572 call = &Call{Err: ErrClosed}
573 })
574 }
575 return call
576 }
577
578
579
580 func (conn *Conn) sendError(err error, dest string, serial uint32) {
581 var e *Error
582 switch em := err.(type) {
583 case Error:
584 e = &em
585 case *Error:
586 e = em
587 case DBusError:
588 name, body := em.DBusError()
589 e = NewError(name, body)
590 default:
591 e = MakeFailedError(err)
592 }
593 msg := new(Message)
594 msg.Type = TypeError
595 msg.Headers = make(map[HeaderField]Variant)
596 if dest != "" {
597 msg.Headers[FieldDestination] = MakeVariant(dest)
598 }
599 msg.Headers[FieldErrorName] = MakeVariant(e.Name)
600 msg.Headers[FieldReplySerial] = MakeVariant(serial)
601 msg.Body = e.Body
602 if len(e.Body) > 0 {
603 msg.Headers[FieldSignature] = MakeVariant(SignatureOf(e.Body...))
604 }
605 conn.sendMessageAndIfClosed(msg, nil)
606 }
607
608
609
610 func (conn *Conn) sendReply(dest string, serial uint32, values ...interface{}) {
611 msg := new(Message)
612 msg.Type = TypeMethodReply
613 msg.Headers = make(map[HeaderField]Variant)
614 if dest != "" {
615 msg.Headers[FieldDestination] = MakeVariant(dest)
616 }
617 msg.Headers[FieldReplySerial] = MakeVariant(serial)
618 msg.Body = values
619 if len(values) > 0 {
620 msg.Headers[FieldSignature] = MakeVariant(SignatureOf(values...))
621 }
622 conn.sendMessageAndIfClosed(msg, nil)
623 }
624
625
626
627 func (conn *Conn) AddMatchSignal(options ...MatchOption) error {
628 return conn.AddMatchSignalContext(context.Background(), options...)
629 }
630
631
632 func (conn *Conn) AddMatchSignalContext(ctx context.Context, options ...MatchOption) error {
633 options = append([]MatchOption{withMatchType("signal")}, options...)
634 return conn.busObj.CallWithContext(
635 ctx,
636 "org.freedesktop.DBus.AddMatch", 0,
637 formatMatchOptions(options),
638 ).Store()
639 }
640
641
642 func (conn *Conn) RemoveMatchSignal(options ...MatchOption) error {
643 return conn.RemoveMatchSignalContext(context.Background(), options...)
644 }
645
646
647 func (conn *Conn) RemoveMatchSignalContext(ctx context.Context, options ...MatchOption) error {
648 options = append([]MatchOption{withMatchType("signal")}, options...)
649 return conn.busObj.CallWithContext(
650 ctx,
651 "org.freedesktop.DBus.RemoveMatch", 0,
652 formatMatchOptions(options),
653 ).Store()
654 }
655
656
657
658
659
660
661
662
663
664
665
666
667 func (conn *Conn) Signal(ch chan<- *Signal) {
668 handler, ok := conn.signalHandler.(SignalRegistrar)
669 if !ok {
670 panic("cannot use this method with a non SignalRegistrar handler")
671 }
672 handler.AddSignal(ch)
673 }
674
675
676
677
678 func (conn *Conn) RemoveSignal(ch chan<- *Signal) {
679 handler, ok := conn.signalHandler.(SignalRegistrar)
680 if !ok {
681 panic("cannot use this method with a non SignalRegistrar handler")
682 }
683 handler.RemoveSignal(ch)
684 }
685
686
687
688
689
690 func (conn *Conn) SupportsUnixFDs() bool {
691 return conn.unixFD
692 }
693
694
695 type Error struct {
696 Name string
697 Body []interface{}
698 }
699
700 func NewError(name string, body []interface{}) *Error {
701 return &Error{name, body}
702 }
703
704 func (e Error) Error() string {
705 if len(e.Body) >= 1 {
706 s, ok := e.Body[0].(string)
707 if ok {
708 return s
709 }
710 }
711 return e.Name
712 }
713
714
715
716 type Signal struct {
717 Sender string
718 Path ObjectPath
719 Name string
720 Body []interface{}
721 Sequence Sequence
722 }
723
724
725 type transport interface {
726
727 io.ReadWriteCloser
728
729
730 SendNullByte() error
731
732
733 SupportsUnixFDs() bool
734
735
736 EnableUnixFDs()
737
738
739 ReadMessage() (*Message, error)
740 SendMessage(*Message) error
741 }
742
743 var (
744 transports = make(map[string]func(string) (transport, error))
745 )
746
747 func getTransport(address string) (transport, error) {
748 var err error
749 var t transport
750
751 addresses := strings.Split(address, ";")
752 for _, v := range addresses {
753 i := strings.IndexRune(v, ':')
754 if i == -1 {
755 err = errors.New("dbus: invalid bus address (no transport)")
756 continue
757 }
758 f := transports[v[:i]]
759 if f == nil {
760 err = errors.New("dbus: invalid bus address (invalid or unsupported transport)")
761 continue
762 }
763 t, err = f(v[i+1:])
764 if err == nil {
765 return t, nil
766 }
767 }
768 return nil, err
769 }
770
771
772 func getKey(s, key string) string {
773 for _, keyEqualsValue := range strings.Split(s, ",") {
774 keyValue := strings.SplitN(keyEqualsValue, "=", 2)
775 if len(keyValue) == 2 && keyValue[0] == key {
776 val, err := UnescapeBusAddressValue(keyValue[1])
777 if err != nil {
778
779 return ""
780 }
781 return val
782 }
783 }
784 return ""
785 }
786
787 type outputHandler struct {
788 conn *Conn
789 sendLck sync.Mutex
790 closed struct {
791 isClosed bool
792 lck sync.RWMutex
793 }
794 }
795
796 func (h *outputHandler) sendAndIfClosed(msg *Message, ifClosed func()) error {
797 h.closed.lck.RLock()
798 defer h.closed.lck.RUnlock()
799 if h.closed.isClosed {
800 if ifClosed != nil {
801 ifClosed()
802 }
803 return nil
804 }
805 h.sendLck.Lock()
806 defer h.sendLck.Unlock()
807 return h.conn.SendMessage(msg)
808 }
809
810 func (h *outputHandler) close() {
811 h.closed.lck.Lock()
812 defer h.closed.lck.Unlock()
813 h.closed.isClosed = true
814 }
815
816 type serialGenerator struct {
817 lck sync.Mutex
818 nextSerial uint32
819 serialUsed map[uint32]bool
820 }
821
822 func newSerialGenerator() *serialGenerator {
823 return &serialGenerator{
824 serialUsed: map[uint32]bool{0: true},
825 nextSerial: 1,
826 }
827 }
828
829 func (gen *serialGenerator) GetSerial() uint32 {
830 gen.lck.Lock()
831 defer gen.lck.Unlock()
832 n := gen.nextSerial
833 for gen.serialUsed[n] {
834 n++
835 }
836 gen.serialUsed[n] = true
837 gen.nextSerial = n + 1
838 return n
839 }
840
841 func (gen *serialGenerator) RetireSerial(serial uint32) {
842 gen.lck.Lock()
843 defer gen.lck.Unlock()
844 delete(gen.serialUsed, serial)
845 }
846
847 type nameTracker struct {
848 lck sync.RWMutex
849 unique string
850 names map[string]struct{}
851 }
852
853 func newNameTracker() *nameTracker {
854 return &nameTracker{names: map[string]struct{}{}}
855 }
856 func (tracker *nameTracker) acquireUniqueConnectionName(name string) {
857 tracker.lck.Lock()
858 defer tracker.lck.Unlock()
859 tracker.unique = name
860 }
861 func (tracker *nameTracker) acquireName(name string) {
862 tracker.lck.Lock()
863 defer tracker.lck.Unlock()
864 tracker.names[name] = struct{}{}
865 }
866 func (tracker *nameTracker) loseName(name string) {
867 tracker.lck.Lock()
868 defer tracker.lck.Unlock()
869 delete(tracker.names, name)
870 }
871
872 func (tracker *nameTracker) uniqueNameIsKnown() bool {
873 tracker.lck.RLock()
874 defer tracker.lck.RUnlock()
875 return tracker.unique != ""
876 }
877 func (tracker *nameTracker) isKnownName(name string) bool {
878 tracker.lck.RLock()
879 defer tracker.lck.RUnlock()
880 _, ok := tracker.names[name]
881 return ok || name == tracker.unique
882 }
883 func (tracker *nameTracker) listKnownNames() []string {
884 tracker.lck.RLock()
885 defer tracker.lck.RUnlock()
886 out := make([]string, 0, len(tracker.names)+1)
887 out = append(out, tracker.unique)
888 for k := range tracker.names {
889 out = append(out, k)
890 }
891 return out
892 }
893
894 type callTracker struct {
895 calls map[uint32]*Call
896 lck sync.RWMutex
897 }
898
899 func newCallTracker() *callTracker {
900 return &callTracker{calls: map[uint32]*Call{}}
901 }
902
903 func (tracker *callTracker) track(sn uint32, call *Call) {
904 tracker.lck.Lock()
905 tracker.calls[sn] = call
906 tracker.lck.Unlock()
907 }
908
909 func (tracker *callTracker) handleReply(sequence Sequence, msg *Message) uint32 {
910 serial := msg.Headers[FieldReplySerial].value.(uint32)
911 tracker.lck.RLock()
912 _, ok := tracker.calls[serial]
913 tracker.lck.RUnlock()
914 if ok {
915 tracker.finalizeWithBody(serial, sequence, msg.Body)
916 }
917 return serial
918 }
919
920 func (tracker *callTracker) handleDBusError(sequence Sequence, msg *Message) uint32 {
921 serial := msg.Headers[FieldReplySerial].value.(uint32)
922 tracker.lck.RLock()
923 _, ok := tracker.calls[serial]
924 tracker.lck.RUnlock()
925 if ok {
926 name, _ := msg.Headers[FieldErrorName].value.(string)
927 tracker.finalizeWithError(serial, sequence, Error{name, msg.Body})
928 }
929 return serial
930 }
931
932 func (tracker *callTracker) handleSendError(msg *Message, err error) {
933 if err == nil {
934 return
935 }
936 tracker.lck.RLock()
937 _, ok := tracker.calls[msg.serial]
938 tracker.lck.RUnlock()
939 if ok {
940 tracker.finalizeWithError(msg.serial, NoSequence, err)
941 }
942 }
943
944
945 func (tracker *callTracker) finalize(sn uint32) {
946 tracker.lck.Lock()
947 defer tracker.lck.Unlock()
948 c, ok := tracker.calls[sn]
949 if ok {
950 delete(tracker.calls, sn)
951 c.ContextCancel()
952 }
953 }
954
955 func (tracker *callTracker) finalizeWithBody(sn uint32, sequence Sequence, body []interface{}) {
956 tracker.lck.Lock()
957 c, ok := tracker.calls[sn]
958 if ok {
959 delete(tracker.calls, sn)
960 }
961 tracker.lck.Unlock()
962 if ok {
963 c.Body = body
964 c.ResponseSequence = sequence
965 c.done()
966 }
967 }
968
969 func (tracker *callTracker) finalizeWithError(sn uint32, sequence Sequence, err error) {
970 tracker.lck.Lock()
971 c, ok := tracker.calls[sn]
972 if ok {
973 delete(tracker.calls, sn)
974 }
975 tracker.lck.Unlock()
976 if ok {
977 c.Err = err
978 c.ResponseSequence = sequence
979 c.done()
980 }
981 }
982
983 func (tracker *callTracker) finalizeAllWithError(sequenceGen *sequenceGenerator, err error) {
984 tracker.lck.Lock()
985 closedCalls := make([]*Call, 0, len(tracker.calls))
986 for sn := range tracker.calls {
987 closedCalls = append(closedCalls, tracker.calls[sn])
988 }
989 tracker.calls = map[uint32]*Call{}
990 tracker.lck.Unlock()
991 for _, call := range closedCalls {
992 call.Err = err
993 call.ResponseSequence = sequenceGen.next()
994 call.done()
995 }
996 }
997
View as plain text