...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/kates/accumulator.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/kates

     1  package kates
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"reflect"
     8  	"sync"
     9  	"time"
    10  
    11  	"k8s.io/apimachinery/pkg/api/meta"
    12  )
    13  
    14  // The Accumulator struct is used to efficiently maintain an in-memory copy of kubernetes resources
    15  // present in a cluster in a form that is easy for business logic to process. It functions as a
    16  // bridge between delta-based kubernetes watches on individual Kinds and the complete/consistent set
    17  // of objects on which business logic needs to operate. In that sense it accumulates both multiple
    18  // kinds of kubernetes resources into a single snapshot, as well as accumulating deltas on
    19  // individual objects into relevant sets of objects.
    20  //
    21  // The Goals/Requirements below are based heavily on the needs of Ambassador as they have evolved
    22  // over the years. A lot of this comes down to the fact that unlike the exemplary
    23  // deployment/replicaset controller examples which typically operate on a single resource and render
    24  // it into another (deployment -> N replicasets, replicaset -> N pods), Ambassador's controller
    25  // logic has some additional requirements:
    26  //
    27  //  1. Complete knowledge of resources in a cluster. Because many thousands of Mappings are
    28  //     ultimately assembled into a single envoy configuration responsible for ingress into the
    29  //     cluster, the consequences of producing an envoy configuration when you e.g. know about only
    30  //     half of those Mappings is catastrophic (you are black-holing half your traffic).
    31  //
    32  //  2. Complete knowledge of multiple resources. Instead of having one self contained input like a
    33  //     deployment or a replicaset, Ambassador's business logic has many inputs, and the consequence
    34  //     of producing an envoy without knowledge of *all* of those inputs is equally catastrophic,
    35  //     e.g. it's no use knowing about all the Mappings if you don't know about any of the Hosts yet.
    36  //
    37  // Goals/Requirements:
    38  //
    39  //  1. Bootstrap of a single Kind: the Accumulator will ensure that all pre-existing resources of
    40  //     that Kind have been loaded into memory prior to triggering any notifications. This guarantees
    41  //     we will never trigger business logic on an egregiously incomplete view of the cluster
    42  //     (e.g. when 500 out of 1000 Mappings have been loaded) and makes it safe for the business
    43  //     logic to assume complete knowledge of the cluster.
    44  //
    45  //  2. When multiple Kinds are needed by a controller, the Accumulator will not notify the
    46  //     controller until all the Kinds have been fully bootstrapped.
    47  //
    48  //  3. Graceful load shedding: When the rate of change of resources is very fast, the API and
    49  //     implementation are structured so that individual object deltas get coalesced into a single
    50  //     snapshot update. This prevents excessively triggering business logic to process an entire
    51  //     snapshot for each individual object change that occurs.
    52  type Accumulator struct {
    53  	client *Client
    54  	fields map[string]*field
    55  	// keyed by unKey(*Unstructured), tracks excluded resources for filtered updates
    56  	excluded map[string]bool
    57  	synced   int
    58  	changed  chan struct{}
    59  	mutex    sync.Mutex
    60  }
    61  
    62  type field struct {
    63  	query    Query
    64  	selector Selector
    65  	mapping  *meta.RESTMapping
    66  
    67  	// The values and deltas map are keyed by unKey(*Unstructured)
    68  	values map[string]*Unstructured
    69  	// The values map has a true for a new or update object, false for a deleted object.
    70  	deltas map[string]*Delta
    71  
    72  	synced      bool
    73  	firstUpdate bool
    74  }
    75  
    76  type DeltaType int
    77  
    78  const (
    79  	ObjectAdd DeltaType = iota
    80  	ObjectUpdate
    81  	ObjectDelete
    82  )
    83  
    84  type changeStatus int
    85  
    86  const (
    87  	awaitingDispatch changeStatus = iota
    88  	dispatched
    89  )
    90  
    91  func (dt DeltaType) MarshalJSON() ([]byte, error) {
    92  	switch dt {
    93  	case ObjectAdd:
    94  		return []byte(`"add"`), nil
    95  	case ObjectUpdate:
    96  		return []byte(`"update"`), nil
    97  	case ObjectDelete:
    98  		return []byte(`"delete"`), nil
    99  	default:
   100  		return nil, fmt.Errorf("invalid DeltaType enum: %d", dt)
   101  	}
   102  }
   103  
   104  func (dt *DeltaType) UnmarshalJSON(b []byte) error {
   105  	var str string
   106  	err := json.Unmarshal(b, &str)
   107  	if err != nil {
   108  		return err
   109  	}
   110  
   111  	switch str {
   112  	case "add":
   113  		*dt = ObjectAdd
   114  	case "update":
   115  		*dt = ObjectUpdate
   116  	case "delete":
   117  		*dt = ObjectDelete
   118  	default:
   119  		return fmt.Errorf("unrecognized delta type: %s", str)
   120  	}
   121  
   122  	return nil
   123  }
   124  
   125  type Delta struct {
   126  	TypeMeta   `json:""`
   127  	ObjectMeta `json:"metadata,omitempty"`
   128  	DeltaType  DeltaType `json:"deltaType"`
   129  }
   130  
   131  func NewDelta(deltaType DeltaType, obj *Unstructured) *Delta {
   132  	return newDelta(deltaType, obj)
   133  }
   134  
   135  func NewDeltaFromObject(deltaType DeltaType, obj Object) (*Delta, error) {
   136  	var un *Unstructured
   137  	err := convert(obj, &un)
   138  	if err != nil {
   139  		return nil, err
   140  	}
   141  	return NewDelta(deltaType, un), nil
   142  }
   143  
   144  func newDelta(deltaType DeltaType, obj *Unstructured) *Delta {
   145  	// We don't want all of the object, just a subset.
   146  	return &Delta{
   147  		TypeMeta: TypeMeta{
   148  			APIVersion: obj.GetAPIVersion(),
   149  			Kind:       obj.GetKind(),
   150  		},
   151  		ObjectMeta: ObjectMeta{
   152  			Name:      obj.GetName(),
   153  			Namespace: obj.GetNamespace(),
   154  			// Not sure we need this, but it marshals as null if we don't provide it.
   155  			CreationTimestamp: obj.GetCreationTimestamp(),
   156  		},
   157  		DeltaType: deltaType,
   158  	}
   159  }
   160  
   161  func newAccumulator(ctx context.Context, client *Client, queries ...Query) (*Accumulator, error) {
   162  	changed := make(chan struct{})
   163  
   164  	fields := make(map[string]*field)
   165  	rawUpdateCh := make(chan rawUpdate)
   166  
   167  	for _, q := range queries {
   168  		field, err := client.newField(q)
   169  		if err != nil {
   170  			return nil, err
   171  		}
   172  		fields[q.Name] = field
   173  		client.watchRaw(ctx, q, rawUpdateCh, client.cliFor(field.mapping, q.Namespace))
   174  	}
   175  
   176  	acc := &Accumulator{
   177  		client:   client,
   178  		fields:   fields,
   179  		excluded: map[string]bool{},
   180  		synced:   0,
   181  		changed:  changed,
   182  		mutex:    sync.Mutex{},
   183  	}
   184  
   185  	go acc.Listen(ctx, rawUpdateCh, client.maxAccumulatorInterval)
   186  
   187  	return acc, nil
   188  }
   189  
   190  // Listen for updates from rawUpdateCh and sends notifications, coalescing reads as neccessary.
   191  // This loop along with the logic in storeField isused to satisfy the 3 Goals/Requirements listed in
   192  // the documentation for the Accumulator struct, i.e. Ensuring all Kinds are bootstrapped before any
   193  // notification occurs, as well as ensuring that we continue to coalesce updates in the background while
   194  // business logic is executing in order to ensure graceful load shedding.
   195  func (a *Accumulator) Listen(ctx context.Context, rawUpdateCh <-chan rawUpdate, interval time.Duration) {
   196  	ticker := time.NewTicker(interval)
   197  	defer ticker.Stop()
   198  	var changeStatus changeStatus
   199  	var lastChangeSent time.Time
   200  	var synced bool
   201  
   202  	sendUpdate := func() {
   203  		a.changed <- struct{}{}
   204  		changeStatus = dispatched
   205  		lastChangeSent = time.Now()
   206  	}
   207  
   208  	for {
   209  		select {
   210  		// We have two paths here:
   211  		// 1. If we get new data and it has been past our set interval since we last updated anything,
   212  		//    we go ahead and immediately send that.
   213  		//
   214  		// 2. If we get new data but we just recently sent a change within our interval, we'll
   215  		//    wait until we get our next Tick before sending a change.
   216  		case rawUp := <-rawUpdateCh:
   217  			synced = a.storeUpdate(rawUp)
   218  			since := rawUp.ts.Sub(lastChangeSent)
   219  			if synced && since >= interval {
   220  				sendUpdate()
   221  			} else {
   222  				changeStatus = awaitingDispatch
   223  			}
   224  		case <-ticker.C:
   225  			if synced && changeStatus == awaitingDispatch {
   226  				sendUpdate()
   227  			}
   228  		case <-ctx.Done():
   229  			return
   230  		}
   231  	}
   232  }
   233  
   234  func (a *Accumulator) Changed() <-chan struct{} {
   235  	return a.changed
   236  }
   237  
   238  func (a *Accumulator) Update(ctx context.Context, target interface{}) (bool, error) {
   239  	return a.UpdateWithDeltas(ctx, target, nil)
   240  }
   241  
   242  func (a *Accumulator) UpdateWithDeltas(ctx context.Context, target interface{}, deltas *[]*Delta) (bool, error) {
   243  	return a.FilteredUpdate(ctx, target, deltas, nil)
   244  }
   245  
   246  // The FilteredUpdate method updates the target snapshot with only those resources for which
   247  // "predicate" returns true. The predicate is only called when objects are added/updated, it is not
   248  // repeatedly called on objects that have not changed. The predicate must not modify its argument.
   249  func (a *Accumulator) FilteredUpdate(ctx context.Context, target interface{}, deltas *[]*Delta, predicate func(*Unstructured) bool) (bool, error) {
   250  	a.mutex.Lock()
   251  	defer a.mutex.Unlock()
   252  	return a.update(ctx, reflect.ValueOf(target), deltas, predicate)
   253  }
   254  
   255  func (a *Accumulator) storeUpdate(update rawUpdate) bool {
   256  	a.mutex.Lock()
   257  	defer a.mutex.Unlock()
   258  	field := a.fields[update.name]
   259  	if update.new != nil {
   260  		key := unKey(update.new)
   261  		oldValue, oldExists := field.values[key]
   262  		field.values[key] = update.new
   263  
   264  		if oldExists && oldValue.GetResourceVersion() == update.new.GetResourceVersion() {
   265  			// no delta in this case, we have already delivered the new value and the delta via a
   266  			// patch
   267  		} else {
   268  			if update.old == nil {
   269  				field.deltas[key] = newDelta(ObjectAdd, update.new)
   270  			} else {
   271  				field.deltas[key] = newDelta(ObjectUpdate, update.new)
   272  			}
   273  		}
   274  	} else if update.old != nil {
   275  		key := unKey(update.old)
   276  		_, oldExists := field.values[key]
   277  		delete(field.values, key)
   278  
   279  		if !oldExists {
   280  			// no delta in this case, we have already delivered the deletion and the delta via a
   281  			// patch
   282  		} else {
   283  			field.deltas[key] = newDelta(ObjectDelete, update.old)
   284  		}
   285  	}
   286  	if update.synced && !field.synced {
   287  		field.synced = true
   288  		a.synced += 1
   289  	}
   290  	return a.synced >= len(a.fields)
   291  }
   292  
   293  func (a *Accumulator) updateField(
   294  	ctx context.Context,
   295  	target reflect.Value,
   296  	name string,
   297  	field *field,
   298  	deltas *[]*Delta,
   299  	predicate func(*Unstructured) bool,
   300  ) (bool, error) {
   301  	if err := a.client.patchWatch(ctx, field); err != nil {
   302  		return false, err
   303  	}
   304  
   305  	if field.firstUpdate && len(field.deltas) == 0 {
   306  		return false, nil
   307  	}
   308  
   309  	field.firstUpdate = true
   310  	for key, delta := range field.deltas {
   311  		delete(field.deltas, key)
   312  		if deltas != nil {
   313  			*deltas = append(*deltas, delta)
   314  		}
   315  
   316  		if predicate != nil {
   317  			if delta.DeltaType == ObjectDelete {
   318  				delete(a.excluded, key)
   319  			} else {
   320  				un := field.values[key]
   321  				if predicate(un) {
   322  					delete(a.excluded, key)
   323  				} else {
   324  					a.excluded[key] = true
   325  				}
   326  			}
   327  		}
   328  	}
   329  
   330  	var items []*Unstructured
   331  	for key, un := range field.values {
   332  		if a.excluded[key] {
   333  			continue
   334  		}
   335  		items = append(items, un)
   336  	}
   337  
   338  	jsonBytes, err := json.Marshal(items)
   339  	if err != nil {
   340  		return false, err
   341  	}
   342  
   343  	fieldEntry, ok := target.Type().Elem().FieldByName(name)
   344  	if !ok {
   345  		return false, fmt.Errorf("no such field: %q", name)
   346  	}
   347  
   348  	var val reflect.Value
   349  	if fieldEntry.Type.Kind() == reflect.Slice {
   350  		val = reflect.New(fieldEntry.Type)
   351  		err := json.Unmarshal(jsonBytes, val.Interface())
   352  		if err != nil {
   353  			return false, err
   354  		}
   355  	} else if fieldEntry.Type.Kind() == reflect.Map {
   356  		val = reflect.MakeMap(fieldEntry.Type)
   357  		for _, item := range items {
   358  			innerVal := reflect.New(fieldEntry.Type.Elem())
   359  			err := convert(item, innerVal.Interface())
   360  			if err != nil {
   361  				return false, err
   362  			}
   363  			val.SetMapIndex(reflect.ValueOf(item.GetName()), reflect.Indirect(innerVal))
   364  		}
   365  	} else {
   366  		return false, fmt.Errorf("don't know how to unmarshal to: %v", fieldEntry.Type)
   367  	}
   368  
   369  	target.Elem().FieldByName(name).Set(reflect.Indirect(val))
   370  
   371  	return true, nil
   372  }
   373  
   374  func (a *Accumulator) update(ctx context.Context, target reflect.Value, deltas *[]*Delta, predicate func(*Unstructured) bool) (bool, error) {
   375  	if deltas != nil {
   376  		*deltas = nil
   377  	}
   378  
   379  	updated := false
   380  	for name, field := range a.fields {
   381  		_updated, err := a.updateField(ctx, target, name, field, deltas, predicate)
   382  		if _updated {
   383  			updated = true
   384  		}
   385  		if err != nil {
   386  			return updated, err
   387  		}
   388  	}
   389  
   390  	return updated, nil
   391  }
   392  

View as plain text