1 // The kates package is a library for writing kubernetes extensions. The library provides a number 2 // of capabilities: 3 // 4 // - Consistent bootstrap of multiple resources 5 // - Graceful Load Shedding via Coalesced Events 6 // - Read/write coherence 7 // - Grouping 8 // - Works well with typed (e.g. corev1.Pod) and untyped 9 // (e.g. map[string]interface{}) representations of k8s resources. 10 // 11 // It does not provide codegen or admissions controllers, for those we should use kubebuilder. 12 // 13 // Comparison to other libraries: 14 // - higher level, simpler, and more idiomatic than client-go 15 // - lower level (and more flexible) than operator-sdk or kubebuilder 16 // 17 // # Constructing a Client 18 // 19 // The primary entrypoint for the kates library is the Client type. A Client is constructed by 20 // passing in the ClientConfig struct with a path to a kubeconfig file: 21 // 22 // client, err := NewClient(ClientConfig{Kubeconfig: "path/to/kubeconfig"}) // or NewClient(ClientConfig{}) for defaults 23 // 24 // # Creating, Modifying, and Deleting Resources 25 // 26 // A client can be used to Create, Update, and/or Delete any kubernetes resource. Each of the "CRUD" 27 // methods will, upon success, store an updated copy of the resource into the object referenced by 28 // the last argument. This will typically be different than the value you supplied if e.g. the 29 // server defaults fields, updates the resource version, assigns UIDs, etc. 30 // 31 // var result kates.Pod 32 // err = client.Create(ctx, &kates.Pod{...}, &result) 33 // err = client.Update(ctx, result, &result) 34 // err = client.UpdateStatus(ctx, result, &result) 35 // err = client.Delete(ctx, result, &result) 36 // 37 // You can pass both typed and untyped values into the APIs. The only requirement is that the values 38 // you pass will json.Marshal to and json.Unmarshal from something that looks like a kubernetes 39 // resource: 40 // 41 // pod := kates.Pod{...} 42 // err := client.Create(ctx, &pod, &pod) 43 // // -or- 44 // pod := map[string]interface{}{"kind": "Pod", ...} 45 // err := client.Create(ctx, &pod, &pod) 46 // 47 // # Watching Resources 48 // 49 // The client can be used to watch sets of multiple related resources. This is accomplished via the 50 // Accumulator type. An accumulator tracks events coming from the API server for the indicated 51 // resources, and merges those events with any locally initiated changes made via the client in 52 // order to maintain a snapshot that is coherent. 53 // 54 // You can construct an Accumulator by invoking the Client's Watch method: 55 // 56 // accumulator = client.Watch(ctx, 57 // Query{Name: "Services", Kind: "svc"}, 58 // Query{Name: "Deployments", Kind: "deploy"}) 59 // 60 // The Accumulator will bootstrap a complete list of each supplied Query, and then provide 61 // continuing notifications if any of the resources change. Notifications that the initial bootstrap 62 // is complete as well as notifications of any subsequent changes are indicated by sending an empty 63 // struct down the Accumulator.Changed() channel: 64 // 65 // <-accumulator.Changed() // Wait until all Queries have been initialized. 66 // 67 // The Accumulator provides access to the values it tracks via the Accumulator.Update(&snapshot) 68 // method. The Update() method expects a pointer to a snapshot that is defined by the caller. The 69 // caller must supply a pointer to a struct with fields that match the names of the Query structs 70 // used to create the Accumulator. The types of the snapshot fields are free to be anything that 71 // will json.Unmarshal from a slice of kubernetes resources: 72 // 73 // // initialize an empty snapshot 74 // snapshot := struct { 75 // Services []*kates.Service 76 // Deployments []*kates.Deployment 77 // }{} 78 // 79 // accumulator.Update(&snapshot) 80 // 81 // The first call to update will populate the snapshot with the bootstrapped values. At this point 82 // any startup logic can be performed with confidence that the snapshot represents a complete and 83 // recent view of cluster state: 84 // 85 // // perform any startup logic 86 // ... 87 // 88 // The same APIs can then be used to watch for and reconcile subsequent changes: 89 // 90 // // reconcile ongoing changes 91 // for { 92 // select { 93 // case <-accumulator.Changed(): 94 // wasChanged = accumulator.Update(&snapshot) 95 // if wasChanged { 96 // reconcile(snapshot) 97 // } 98 // case <-ctx.Done(): 99 // break 100 // } 101 // } 102 // 103 // The Accumulator will provide read/write coherence for any changes made using the client from 104 // which the Accumulator was created. This means that any snapshot produced by the Accumulator is 105 // guaranteed to include all the Create, Update, UpdateStatus, and/or Delete operations that were 106 // performed using the client. (The underlying client-go CRUD and Watch operations do not provide 107 // this guarantee, so a straighforward reconcile operation will often end up creating duplicate 108 // objects and/or performing updates on stale versions.) 109 // 110 // # Event Coalescing for Graceful Load Shedding 111 // 112 // The Accumulator automatically coalesces events behind the scenes in order to facilitate graceful 113 // load shedding for situations where the reconcile operation takes a long time relative to the 114 // number of incoming events. This allows the Accumulator.Update(&snapshot) method to provide a 115 // guarantee that when it returns the snapshot will contain the most recent view of cluster state 116 // regardless of how slowly and infrequently we read from the Accumulator.Changed() channel: 117 // 118 // snapshot := Snapshot{} 119 // for { 120 // <-accumulator.Changed() 121 // wasChanged := accumulator.Update(&snapshot) // Guaranteed to return the most recent view of cluster state. 122 // if wasChanged { 123 // slowReconcile(&snapshot) 124 // } 125 // } 126 package kates 127 128 // TODO: 129 // - Comment explaining what the different client-go pieces are, what the pieces here are, and how they fit together: What is an "informer", what is a "RESTMapper", what is an "accumulator"? How do they fit together? 130 // - FieldSelector is omitted. 131 // - LabelSelector is stringly typed. 132 // - Add tests to prove that Update followed by Get/List is actually synchronous and doesn't require patchWatch type functionality. 133 134 /** XXX: thoughts... 135 * 136 * Problems with the way we currently write controllers: 137 * 138 * - delayed write propagation 139 * - typed vs untyped 140 * - detecting no resources 141 * - detecting when multiple watches are synced 142 * - fetching references 143 * - handling conflicts, fundamentally need to retry, but at what granularity? 144 * - resilience to poison inputs 145 * - garbage collection/ownership 146 * 147 * With a partition function, this could get a lot more efficient and resilient. 148 * What would a partition function look like? 149 * - index pattern? f(item) -> list of accumulators 150 * - single kind is easy, right now f(item) -> constant, f(mapping)->prefix 151 * - how does multiple work? 152 * 153 * Accumulator can probabily be merged with client since we don't really need inner and outer in the same snapshot. 154 * 155 * cli := ... 156 * 157 * acc := cli.Watch(ctx, ...) // somehow include partition factory and index function? 158 * 159 * cli.CRUD(ctx, ...) 160 * 161 * partition := <-acc.Changed() // returns active partition? 162 * 163 * acc.Update(partition) 164 * 165 * -- 166 * 167 * project, revision, jobs, jobs-podses, mapping, service, deployments, deployments-podses 168 * 169 * simple: f(obj) -> partition-key(s) 170 * 171 * escape hatches: 172 * - f(obj) -> * (every partition gets them), f(obj) -> "" no partition gets them but you can query them 173 * - one partition 174 * -- 175 * 176 */ 177