...

Source file src/github.com/godbus/dbus/v5/sequential_handler_test.go

Documentation: github.com/godbus/dbus/v5

     1  package dbus
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"testing"
     8  	"time"
     9  )
    10  
    11  // Verifies that no signals are dropped, even if there is not enough space
    12  // in the destination channel.
    13  func TestSequentialHandlerNoDrop(t *testing.T) {
    14  	t.Parallel()
    15  
    16  	handler := NewSequentialSignalHandler()
    17  
    18  	channel := make(chan *Signal, 2)
    19  	handler.(SignalRegistrar).AddSignal(channel)
    20  
    21  	writeSignals(handler, 1000)
    22  
    23  	if err := readSignals(t, channel, 1000); err != nil {
    24  		t.Error(err)
    25  	}
    26  }
    27  
    28  // Verifies that signals are written to the destination channel in the
    29  // order they are received, in a typical concurrent reader/writer scenario.
    30  func TestSequentialHandlerSequential(t *testing.T) {
    31  	t.Parallel()
    32  
    33  	handler := NewSequentialSignalHandler()
    34  
    35  	channel := make(chan *Signal, 10)
    36  	handler.(SignalRegistrar).AddSignal(channel)
    37  
    38  	done := make(chan struct{})
    39  
    40  	// Concurrently read and write signals
    41  	go func() {
    42  		if err := readSignals(t, channel, 1000); err != nil {
    43  			t.Error(err)
    44  		}
    45  		close(done)
    46  	}()
    47  	writeSignals(handler, 1000)
    48  	<-done
    49  }
    50  
    51  // Test that in the case of multiple destination channels, one channel
    52  // being blocked does not prevent the other channel receiving messages.
    53  func TestSequentialHandlerMultipleChannel(t *testing.T) {
    54  	t.Parallel()
    55  
    56  	handler := NewSequentialSignalHandler()
    57  
    58  	channelOne := make(chan *Signal)
    59  	handler.(SignalRegistrar).AddSignal(channelOne)
    60  
    61  	channelTwo := make(chan *Signal, 10)
    62  	handler.(SignalRegistrar).AddSignal(channelTwo)
    63  
    64  	writeSignals(handler, 1000)
    65  
    66  	if err := readSignals(t, channelTwo, 1000); err != nil {
    67  		t.Error(err)
    68  	}
    69  }
    70  
    71  // Test that removing one channel results in no more messages being
    72  // written to that channel.
    73  func TestSequentialHandler_RemoveOneChannelOfOne(t *testing.T) {
    74  	t.Parallel()
    75  	handler := NewSequentialSignalHandler()
    76  
    77  	channelOne := make(chan *Signal)
    78  	handler.(SignalRegistrar).AddSignal(channelOne)
    79  
    80  	writeSignals(handler, 1000)
    81  
    82  	handler.(SignalRegistrar).RemoveSignal(channelOne)
    83  
    84  	count, closed := countSignals(channelOne)
    85  	if count > 1 {
    86  		t.Error("handler continued writing to channel after removal")
    87  	}
    88  	if closed {
    89  		t.Error("handler closed channel on .RemoveChannel()")
    90  	}
    91  }
    92  
    93  // Test that removing one channel results in no more messages being
    94  // written to that channel, and the other channels are unaffected.
    95  func TestSequentialHandler_RemoveOneChannelOfMany(t *testing.T) {
    96  	t.Parallel()
    97  	handler := NewSequentialSignalHandler()
    98  
    99  	channelOne := make(chan *Signal)
   100  	handler.(SignalRegistrar).AddSignal(channelOne)
   101  
   102  	channelTwo := make(chan *Signal, 10)
   103  	handler.(SignalRegistrar).AddSignal(channelTwo)
   104  
   105  	channelThree := make(chan *Signal, 2)
   106  	handler.(SignalRegistrar).AddSignal(channelThree)
   107  
   108  	writeSignals(handler, 1000)
   109  
   110  	handler.(SignalRegistrar).RemoveSignal(channelTwo)
   111  	defer close(channelTwo)
   112  
   113  	count, closed := countSignals(channelTwo)
   114  	if count > 10 {
   115  		t.Error("handler continued writing to channel after removal")
   116  	}
   117  	if closed {
   118  		t.Error("handler closed channel on .RemoveChannel()")
   119  	}
   120  
   121  	// Check that closing channel two does not close channel one.
   122  	if err := readSignals(t, channelOne, 1000); err != nil {
   123  		t.Error(err)
   124  	}
   125  
   126  	// Check that closing channel two does not close channel three.
   127  	if err := readSignals(t, channelThree, 1000); err != nil {
   128  		t.Error(err)
   129  	}
   130  }
   131  
   132  // Test that Terminate() closes all channels that were attached at the time.
   133  func TestSequentialHandler_TerminateClosesAllChannels(t *testing.T) {
   134  	t.Parallel()
   135  	handler := NewSequentialSignalHandler()
   136  
   137  	channelOne := make(chan *Signal)
   138  	handler.(SignalRegistrar).AddSignal(channelOne)
   139  
   140  	channelTwo := make(chan *Signal, 10)
   141  	handler.(SignalRegistrar).AddSignal(channelTwo)
   142  
   143  	writeSignals(handler, 1000)
   144  
   145  	handler.(Terminator).Terminate()
   146  
   147  	count, closed := countSignals(channelOne)
   148  	if count > 1 {
   149  		t.Errorf("handler continued writing to channel after termination; read %v signals", count)
   150  	}
   151  	if !closed {
   152  		t.Error("handler failed to close channel on .Terminate()")
   153  	}
   154  
   155  	count, closed = countSignals(channelTwo)
   156  	if count > 10 {
   157  		t.Errorf("handler continued writing to channel after termination; read %v signals", count)
   158  	}
   159  	if !closed {
   160  		t.Error("handler failed to close channel on .Terminate()")
   161  	}
   162  }
   163  
   164  // Verifies that after termination, the handler does not process any further signals.
   165  func TestSequentialHandler_TerminateTerminates(t *testing.T) {
   166  	t.Parallel()
   167  	handler := NewSequentialSignalHandler()
   168  	handler.(Terminator).Terminate()
   169  
   170  	channelOne := make(chan *Signal)
   171  	handler.(SignalRegistrar).AddSignal(channelOne)
   172  
   173  	writeSignals(handler, 10)
   174  
   175  	count, _ := countSignals(channelOne)
   176  	if count > 0 {
   177  		t.Errorf("handler continued operating after termination; read %v signals", count)
   178  	}
   179  }
   180  
   181  // Verifies calling .Terminate() more than once is equivalent to calling it just once.
   182  func TestSequentialHandler_TerminateIdemopotent(t *testing.T) {
   183  	t.Parallel()
   184  	handler := NewSequentialSignalHandler()
   185  	handler.(Terminator).Terminate()
   186  	handler.(Terminator).Terminate()
   187  
   188  	channelOne := make(chan *Signal)
   189  	handler.(SignalRegistrar).AddSignal(channelOne)
   190  	writeSignals(handler, 10)
   191  
   192  	count, _ := countSignals(channelOne)
   193  	if count > 0 {
   194  		t.Errorf("handler continued operating after termination; read %v signals", count)
   195  	}
   196  }
   197  
   198  // Verifies calling RemoveSignal after Terminate() does not cause any unusual
   199  // behaviour (panics, etc.).
   200  func TestSequentialHandler_RemoveAfterTerminate(t *testing.T) {
   201  	t.Parallel()
   202  	handler := NewSequentialSignalHandler()
   203  	handler.(Terminator).Terminate()
   204  	handler.(Terminator).Terminate()
   205  
   206  	channelOne := make(chan *Signal)
   207  	handler.(SignalRegistrar).AddSignal(channelOne)
   208  	handler.(SignalRegistrar).RemoveSignal(channelOne)
   209  	writeSignals(handler, 10)
   210  
   211  	count, _ := countSignals(channelOne)
   212  	if count > 0 {
   213  		t.Errorf("handler continued operating after termination; read %v signals", count)
   214  	}
   215  }
   216  
   217  func writeSignals(handler SignalHandler, count int) {
   218  	for i := 1; i <= count; i++ {
   219  		signal := &Signal{Sequence: Sequence(i)}
   220  		handler.DeliverSignal("iface", "name", signal)
   221  	}
   222  }
   223  
   224  func readSignals(t *testing.T, channel <-chan *Signal, count int) error {
   225  	// Overly generous timeout
   226  	ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
   227  	defer cancel()
   228  	for i := 1; i <= count; i++ {
   229  		select {
   230  		case signal := <-channel:
   231  			if signal.Sequence != Sequence(i) {
   232  				return fmt.Errorf("Received signal out of order. Expected %v, got %v", i, signal.Sequence)
   233  			}
   234  		case <-ctx.Done():
   235  			return errors.New("Timeout occurred before all messages received")
   236  		}
   237  	}
   238  	return nil
   239  }
   240  
   241  func countSignals(channel <-chan *Signal) (count int, closed bool) {
   242  	count = 0
   243  	ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
   244  	defer cancel()
   245  	for {
   246  		select {
   247  		case _, ok := <-channel:
   248  			if ok {
   249  				count++
   250  			} else {
   251  				// Channel closed
   252  				return count, true
   253  			}
   254  		case <-ctx.Done():
   255  			return count, false
   256  		}
   257  	}
   258  }
   259  

View as plain text