...

Source file src/go.etcd.io/etcd/pkg/v3/wait/wait.go

Documentation: go.etcd.io/etcd/pkg/v3/wait

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package wait provides utility functions for polling, listening using Go
    16  // channel.
    17  package wait
    18  
    19  import (
    20  	"log"
    21  	"sync"
    22  )
    23  
    24  const (
    25  	// To avoid lock contention we use an array of list struct (rw mutex & map)
    26  	// for the id argument, we apply mod operation and uses its remainder to
    27  	// index into the array and find the corresponding element.
    28  	defaultListElementLength = 64
    29  )
    30  
    31  // Wait is an interface that provides the ability to wait and trigger events that
    32  // are associated with IDs.
    33  type Wait interface {
    34  	// Register waits returns a chan that waits on the given ID.
    35  	// The chan will be triggered when Trigger is called with
    36  	// the same ID.
    37  	Register(id uint64) <-chan interface{}
    38  	// Trigger triggers the waiting chans with the given ID.
    39  	Trigger(id uint64, x interface{})
    40  	IsRegistered(id uint64) bool
    41  }
    42  
    43  type list struct {
    44  	e []listElement
    45  }
    46  
    47  type listElement struct {
    48  	l sync.RWMutex
    49  	m map[uint64]chan interface{}
    50  }
    51  
    52  // New creates a Wait.
    53  func New() Wait {
    54  	res := list{
    55  		e: make([]listElement, defaultListElementLength),
    56  	}
    57  	for i := 0; i < len(res.e); i++ {
    58  		res.e[i].m = make(map[uint64]chan interface{})
    59  	}
    60  	return &res
    61  }
    62  
    63  func (w *list) Register(id uint64) <-chan interface{} {
    64  	idx := id % defaultListElementLength
    65  	newCh := make(chan interface{}, 1)
    66  	w.e[idx].l.Lock()
    67  	defer w.e[idx].l.Unlock()
    68  	if _, ok := w.e[idx].m[id]; !ok {
    69  		w.e[idx].m[id] = newCh
    70  	} else {
    71  		log.Panicf("dup id %x", id)
    72  	}
    73  	return newCh
    74  }
    75  
    76  func (w *list) Trigger(id uint64, x interface{}) {
    77  	idx := id % defaultListElementLength
    78  	w.e[idx].l.Lock()
    79  	ch := w.e[idx].m[id]
    80  	delete(w.e[idx].m, id)
    81  	w.e[idx].l.Unlock()
    82  	if ch != nil {
    83  		ch <- x
    84  		close(ch)
    85  	}
    86  }
    87  
    88  func (w *list) IsRegistered(id uint64) bool {
    89  	idx := id % defaultListElementLength
    90  	w.e[idx].l.RLock()
    91  	defer w.e[idx].l.RUnlock()
    92  	_, ok := w.e[idx].m[id]
    93  	return ok
    94  }
    95  
    96  type waitWithResponse struct {
    97  	ch <-chan interface{}
    98  }
    99  
   100  func NewWithResponse(ch <-chan interface{}) Wait {
   101  	return &waitWithResponse{ch: ch}
   102  }
   103  
   104  func (w *waitWithResponse) Register(id uint64) <-chan interface{} {
   105  	return w.ch
   106  }
   107  func (w *waitWithResponse) Trigger(id uint64, x interface{}) {}
   108  func (w *waitWithResponse) IsRegistered(id uint64) bool {
   109  	panic("waitWithResponse.IsRegistered() shouldn't be called")
   110  }
   111  

View as plain text