...

Source file src/k8s.io/client-go/tools/watch/informerwatcher.go

Documentation: k8s.io/client-go/tools/watch

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package watch
    18  
    19  import (
    20  	"sync"
    21  
    22  	"k8s.io/apimachinery/pkg/runtime"
    23  	"k8s.io/apimachinery/pkg/watch"
    24  	"k8s.io/client-go/tools/cache"
    25  )
    26  
    27  func newEventProcessor(out chan<- watch.Event) *eventProcessor {
    28  	return &eventProcessor{
    29  		out:  out,
    30  		cond: sync.NewCond(&sync.Mutex{}),
    31  		done: make(chan struct{}),
    32  	}
    33  }
    34  
    35  // eventProcessor buffers events and writes them to an out chan when a reader
    36  // is waiting. Because of the requirement to buffer events, it synchronizes
    37  // input with a condition, and synchronizes output with a channels. It needs to
    38  // be able to yield while both waiting on an input condition and while blocked
    39  // on writing to the output channel.
    40  type eventProcessor struct {
    41  	out chan<- watch.Event
    42  
    43  	cond *sync.Cond
    44  	buff []watch.Event
    45  
    46  	done chan struct{}
    47  }
    48  
    49  func (e *eventProcessor) run() {
    50  	for {
    51  		batch := e.takeBatch()
    52  		e.writeBatch(batch)
    53  		if e.stopped() {
    54  			return
    55  		}
    56  	}
    57  }
    58  
    59  func (e *eventProcessor) takeBatch() []watch.Event {
    60  	e.cond.L.Lock()
    61  	defer e.cond.L.Unlock()
    62  
    63  	for len(e.buff) == 0 && !e.stopped() {
    64  		e.cond.Wait()
    65  	}
    66  
    67  	batch := e.buff
    68  	e.buff = nil
    69  	return batch
    70  }
    71  
    72  func (e *eventProcessor) writeBatch(events []watch.Event) {
    73  	for _, event := range events {
    74  		select {
    75  		case e.out <- event:
    76  		case <-e.done:
    77  			return
    78  		}
    79  	}
    80  }
    81  
    82  func (e *eventProcessor) push(event watch.Event) {
    83  	e.cond.L.Lock()
    84  	defer e.cond.L.Unlock()
    85  	defer e.cond.Signal()
    86  	e.buff = append(e.buff, event)
    87  }
    88  
    89  func (e *eventProcessor) stopped() bool {
    90  	select {
    91  	case <-e.done:
    92  		return true
    93  	default:
    94  		return false
    95  	}
    96  }
    97  
    98  func (e *eventProcessor) stop() {
    99  	close(e.done)
   100  	e.cond.Signal()
   101  }
   102  
   103  // NewIndexerInformerWatcher will create an IndexerInformer and wrap it into watch.Interface
   104  // so you can use it anywhere where you'd have used a regular Watcher returned from Watch method.
   105  // it also returns a channel you can use to wait for the informers to fully shutdown.
   106  func NewIndexerInformerWatcher(lw cache.ListerWatcher, objType runtime.Object) (cache.Indexer, cache.Controller, watch.Interface, <-chan struct{}) {
   107  	ch := make(chan watch.Event)
   108  	w := watch.NewProxyWatcher(ch)
   109  	e := newEventProcessor(ch)
   110  
   111  	indexer, informer := cache.NewIndexerInformer(lw, objType, 0, cache.ResourceEventHandlerFuncs{
   112  		AddFunc: func(obj interface{}) {
   113  			e.push(watch.Event{
   114  				Type:   watch.Added,
   115  				Object: obj.(runtime.Object),
   116  			})
   117  		},
   118  		UpdateFunc: func(old, new interface{}) {
   119  			e.push(watch.Event{
   120  				Type:   watch.Modified,
   121  				Object: new.(runtime.Object),
   122  			})
   123  		},
   124  		DeleteFunc: func(obj interface{}) {
   125  			staleObj, stale := obj.(cache.DeletedFinalStateUnknown)
   126  			if stale {
   127  				// We have no means of passing the additional information down using
   128  				// watch API based on watch.Event but the caller can filter such
   129  				// objects by checking if metadata.deletionTimestamp is set
   130  				obj = staleObj.Obj
   131  			}
   132  
   133  			e.push(watch.Event{
   134  				Type:   watch.Deleted,
   135  				Object: obj.(runtime.Object),
   136  			})
   137  		},
   138  	}, cache.Indexers{})
   139  
   140  	go e.run()
   141  
   142  	doneCh := make(chan struct{})
   143  	go func() {
   144  		defer close(doneCh)
   145  		defer e.stop()
   146  		informer.Run(w.StopChan())
   147  	}()
   148  
   149  	return indexer, informer, w, doneCh
   150  }
   151  

View as plain text