...

Source file src/go.etcd.io/etcd/client/v3/op.go

Documentation: go.etcd.io/etcd/client/v3

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package clientv3
    16  
    17  import pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    18  
    19  type opType int
    20  
    21  const (
    22  	// A default Op has opType 0, which is invalid.
    23  	tRange opType = iota + 1
    24  	tPut
    25  	tDeleteRange
    26  	tTxn
    27  )
    28  
    29  var noPrefixEnd = []byte{0}
    30  
    31  // Op represents an Operation that kv can execute.
    32  type Op struct {
    33  	t   opType
    34  	key []byte
    35  	end []byte
    36  
    37  	// for range
    38  	limit        int64
    39  	sort         *SortOption
    40  	serializable bool
    41  	keysOnly     bool
    42  	countOnly    bool
    43  	minModRev    int64
    44  	maxModRev    int64
    45  	minCreateRev int64
    46  	maxCreateRev int64
    47  
    48  	// for range, watch
    49  	rev int64
    50  
    51  	// for watch, put, delete
    52  	prevKV bool
    53  
    54  	// for watch
    55  	// fragmentation should be disabled by default
    56  	// if true, split watch events when total exceeds
    57  	// "--max-request-bytes" flag value + 512-byte
    58  	fragment bool
    59  
    60  	// for put
    61  	ignoreValue bool
    62  	ignoreLease bool
    63  
    64  	// progressNotify is for progress updates.
    65  	progressNotify bool
    66  	// createdNotify is for created event
    67  	createdNotify bool
    68  	// filters for watchers
    69  	filterPut    bool
    70  	filterDelete bool
    71  
    72  	// for put
    73  	val     []byte
    74  	leaseID LeaseID
    75  
    76  	// txn
    77  	cmps    []Cmp
    78  	thenOps []Op
    79  	elseOps []Op
    80  
    81  	isOptsWithFromKey bool
    82  	isOptsWithPrefix  bool
    83  }
    84  
    85  // accessors / mutators
    86  
    87  // IsTxn returns true if the "Op" type is transaction.
    88  func (op Op) IsTxn() bool {
    89  	return op.t == tTxn
    90  }
    91  
    92  // Txn returns the comparison(if) operations, "then" operations, and "else" operations.
    93  func (op Op) Txn() ([]Cmp, []Op, []Op) {
    94  	return op.cmps, op.thenOps, op.elseOps
    95  }
    96  
    97  // KeyBytes returns the byte slice holding the Op's key.
    98  func (op Op) KeyBytes() []byte { return op.key }
    99  
   100  // WithKeyBytes sets the byte slice for the Op's key.
   101  func (op *Op) WithKeyBytes(key []byte) { op.key = key }
   102  
   103  // RangeBytes returns the byte slice holding with the Op's range end, if any.
   104  func (op Op) RangeBytes() []byte { return op.end }
   105  
   106  // Rev returns the requested revision, if any.
   107  func (op Op) Rev() int64 { return op.rev }
   108  
   109  // IsPut returns true iff the operation is a Put.
   110  func (op Op) IsPut() bool { return op.t == tPut }
   111  
   112  // IsGet returns true iff the operation is a Get.
   113  func (op Op) IsGet() bool { return op.t == tRange }
   114  
   115  // IsDelete returns true iff the operation is a Delete.
   116  func (op Op) IsDelete() bool { return op.t == tDeleteRange }
   117  
   118  // IsSerializable returns true if the serializable field is true.
   119  func (op Op) IsSerializable() bool { return op.serializable }
   120  
   121  // IsKeysOnly returns whether keysOnly is set.
   122  func (op Op) IsKeysOnly() bool { return op.keysOnly }
   123  
   124  // IsCountOnly returns whether countOnly is set.
   125  func (op Op) IsCountOnly() bool { return op.countOnly }
   126  
   127  // MinModRev returns the operation's minimum modify revision.
   128  func (op Op) MinModRev() int64 { return op.minModRev }
   129  
   130  // MaxModRev returns the operation's maximum modify revision.
   131  func (op Op) MaxModRev() int64 { return op.maxModRev }
   132  
   133  // MinCreateRev returns the operation's minimum create revision.
   134  func (op Op) MinCreateRev() int64 { return op.minCreateRev }
   135  
   136  // MaxCreateRev returns the operation's maximum create revision.
   137  func (op Op) MaxCreateRev() int64 { return op.maxCreateRev }
   138  
   139  // WithRangeBytes sets the byte slice for the Op's range end.
   140  func (op *Op) WithRangeBytes(end []byte) { op.end = end }
   141  
   142  // ValueBytes returns the byte slice holding the Op's value, if any.
   143  func (op Op) ValueBytes() []byte { return op.val }
   144  
   145  // WithValueBytes sets the byte slice for the Op's value.
   146  func (op *Op) WithValueBytes(v []byte) { op.val = v }
   147  
   148  func (op Op) toRangeRequest() *pb.RangeRequest {
   149  	if op.t != tRange {
   150  		panic("op.t != tRange")
   151  	}
   152  	r := &pb.RangeRequest{
   153  		Key:               op.key,
   154  		RangeEnd:          op.end,
   155  		Limit:             op.limit,
   156  		Revision:          op.rev,
   157  		Serializable:      op.serializable,
   158  		KeysOnly:          op.keysOnly,
   159  		CountOnly:         op.countOnly,
   160  		MinModRevision:    op.minModRev,
   161  		MaxModRevision:    op.maxModRev,
   162  		MinCreateRevision: op.minCreateRev,
   163  		MaxCreateRevision: op.maxCreateRev,
   164  	}
   165  	if op.sort != nil {
   166  		r.SortOrder = pb.RangeRequest_SortOrder(op.sort.Order)
   167  		r.SortTarget = pb.RangeRequest_SortTarget(op.sort.Target)
   168  	}
   169  	return r
   170  }
   171  
   172  func (op Op) toTxnRequest() *pb.TxnRequest {
   173  	thenOps := make([]*pb.RequestOp, len(op.thenOps))
   174  	for i, tOp := range op.thenOps {
   175  		thenOps[i] = tOp.toRequestOp()
   176  	}
   177  	elseOps := make([]*pb.RequestOp, len(op.elseOps))
   178  	for i, eOp := range op.elseOps {
   179  		elseOps[i] = eOp.toRequestOp()
   180  	}
   181  	cmps := make([]*pb.Compare, len(op.cmps))
   182  	for i := range op.cmps {
   183  		cmps[i] = (*pb.Compare)(&op.cmps[i])
   184  	}
   185  	return &pb.TxnRequest{Compare: cmps, Success: thenOps, Failure: elseOps}
   186  }
   187  
   188  func (op Op) toRequestOp() *pb.RequestOp {
   189  	switch op.t {
   190  	case tRange:
   191  		return &pb.RequestOp{Request: &pb.RequestOp_RequestRange{RequestRange: op.toRangeRequest()}}
   192  	case tPut:
   193  		r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
   194  		return &pb.RequestOp{Request: &pb.RequestOp_RequestPut{RequestPut: r}}
   195  	case tDeleteRange:
   196  		r := &pb.DeleteRangeRequest{Key: op.key, RangeEnd: op.end, PrevKv: op.prevKV}
   197  		return &pb.RequestOp{Request: &pb.RequestOp_RequestDeleteRange{RequestDeleteRange: r}}
   198  	case tTxn:
   199  		return &pb.RequestOp{Request: &pb.RequestOp_RequestTxn{RequestTxn: op.toTxnRequest()}}
   200  	default:
   201  		panic("Unknown Op")
   202  	}
   203  }
   204  
   205  func (op Op) isWrite() bool {
   206  	if op.t == tTxn {
   207  		for _, tOp := range op.thenOps {
   208  			if tOp.isWrite() {
   209  				return true
   210  			}
   211  		}
   212  		for _, tOp := range op.elseOps {
   213  			if tOp.isWrite() {
   214  				return true
   215  			}
   216  		}
   217  		return false
   218  	}
   219  	return op.t != tRange
   220  }
   221  
   222  func NewOp() *Op {
   223  	return &Op{key: []byte("")}
   224  }
   225  
   226  // OpGet returns "get" operation based on given key and operation options.
   227  func OpGet(key string, opts ...OpOption) Op {
   228  	// WithPrefix and WithFromKey are not supported together
   229  	if IsOptsWithPrefix(opts) && IsOptsWithFromKey(opts) {
   230  		panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
   231  	}
   232  	ret := Op{t: tRange, key: []byte(key)}
   233  	ret.applyOpts(opts)
   234  	return ret
   235  }
   236  
   237  // OpDelete returns "delete" operation based on given key and operation options.
   238  func OpDelete(key string, opts ...OpOption) Op {
   239  	// WithPrefix and WithFromKey are not supported together
   240  	if IsOptsWithPrefix(opts) && IsOptsWithFromKey(opts) {
   241  		panic("`WithPrefix` and `WithFromKey` cannot be set at the same time, choose one")
   242  	}
   243  	ret := Op{t: tDeleteRange, key: []byte(key)}
   244  	ret.applyOpts(opts)
   245  	switch {
   246  	case ret.leaseID != 0:
   247  		panic("unexpected lease in delete")
   248  	case ret.limit != 0:
   249  		panic("unexpected limit in delete")
   250  	case ret.rev != 0:
   251  		panic("unexpected revision in delete")
   252  	case ret.sort != nil:
   253  		panic("unexpected sort in delete")
   254  	case ret.serializable:
   255  		panic("unexpected serializable in delete")
   256  	case ret.countOnly:
   257  		panic("unexpected countOnly in delete")
   258  	case ret.minModRev != 0, ret.maxModRev != 0:
   259  		panic("unexpected mod revision filter in delete")
   260  	case ret.minCreateRev != 0, ret.maxCreateRev != 0:
   261  		panic("unexpected create revision filter in delete")
   262  	case ret.filterDelete, ret.filterPut:
   263  		panic("unexpected filter in delete")
   264  	case ret.createdNotify:
   265  		panic("unexpected createdNotify in delete")
   266  	}
   267  	return ret
   268  }
   269  
   270  // OpPut returns "put" operation based on given key-value and operation options.
   271  func OpPut(key, val string, opts ...OpOption) Op {
   272  	ret := Op{t: tPut, key: []byte(key), val: []byte(val)}
   273  	ret.applyOpts(opts)
   274  	switch {
   275  	case ret.end != nil:
   276  		panic("unexpected range in put")
   277  	case ret.limit != 0:
   278  		panic("unexpected limit in put")
   279  	case ret.rev != 0:
   280  		panic("unexpected revision in put")
   281  	case ret.sort != nil:
   282  		panic("unexpected sort in put")
   283  	case ret.serializable:
   284  		panic("unexpected serializable in put")
   285  	case ret.countOnly:
   286  		panic("unexpected countOnly in put")
   287  	case ret.minModRev != 0, ret.maxModRev != 0:
   288  		panic("unexpected mod revision filter in put")
   289  	case ret.minCreateRev != 0, ret.maxCreateRev != 0:
   290  		panic("unexpected create revision filter in put")
   291  	case ret.filterDelete, ret.filterPut:
   292  		panic("unexpected filter in put")
   293  	case ret.createdNotify:
   294  		panic("unexpected createdNotify in put")
   295  	}
   296  	return ret
   297  }
   298  
   299  // OpTxn returns "txn" operation based on given transaction conditions.
   300  func OpTxn(cmps []Cmp, thenOps []Op, elseOps []Op) Op {
   301  	return Op{t: tTxn, cmps: cmps, thenOps: thenOps, elseOps: elseOps}
   302  }
   303  
   304  func opWatch(key string, opts ...OpOption) Op {
   305  	ret := Op{t: tRange, key: []byte(key)}
   306  	ret.applyOpts(opts)
   307  	switch {
   308  	case ret.leaseID != 0:
   309  		panic("unexpected lease in watch")
   310  	case ret.limit != 0:
   311  		panic("unexpected limit in watch")
   312  	case ret.sort != nil:
   313  		panic("unexpected sort in watch")
   314  	case ret.serializable:
   315  		panic("unexpected serializable in watch")
   316  	case ret.countOnly:
   317  		panic("unexpected countOnly in watch")
   318  	case ret.minModRev != 0, ret.maxModRev != 0:
   319  		panic("unexpected mod revision filter in watch")
   320  	case ret.minCreateRev != 0, ret.maxCreateRev != 0:
   321  		panic("unexpected create revision filter in watch")
   322  	}
   323  	return ret
   324  }
   325  
   326  func (op *Op) applyOpts(opts []OpOption) {
   327  	for _, opt := range opts {
   328  		opt(op)
   329  	}
   330  }
   331  
   332  // OpOption configures Operations like Get, Put, Delete.
   333  type OpOption func(*Op)
   334  
   335  // WithLease attaches a lease ID to a key in 'Put' request.
   336  func WithLease(leaseID LeaseID) OpOption {
   337  	return func(op *Op) { op.leaseID = leaseID }
   338  }
   339  
   340  // WithLimit limits the number of results to return from 'Get' request.
   341  // If WithLimit is given a 0 limit, it is treated as no limit.
   342  func WithLimit(n int64) OpOption { return func(op *Op) { op.limit = n } }
   343  
   344  // WithRev specifies the store revision for 'Get' request.
   345  // Or the start revision of 'Watch' request.
   346  func WithRev(rev int64) OpOption { return func(op *Op) { op.rev = rev } }
   347  
   348  // WithSort specifies the ordering in 'Get' request. It requires
   349  // 'WithRange' and/or 'WithPrefix' to be specified too.
   350  // 'target' specifies the target to sort by: key, version, revisions, value.
   351  // 'order' can be either 'SortNone', 'SortAscend', 'SortDescend'.
   352  func WithSort(target SortTarget, order SortOrder) OpOption {
   353  	return func(op *Op) {
   354  		if target == SortByKey && order == SortAscend {
   355  			// If order != SortNone, server fetches the entire key-space,
   356  			// and then applies the sort and limit, if provided.
   357  			// Since by default the server returns results sorted by keys
   358  			// in lexicographically ascending order, the client should ignore
   359  			// SortOrder if the target is SortByKey.
   360  			order = SortNone
   361  		}
   362  		op.sort = &SortOption{target, order}
   363  	}
   364  }
   365  
   366  // GetPrefixRangeEnd gets the range end of the prefix.
   367  // 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
   368  func GetPrefixRangeEnd(prefix string) string {
   369  	return string(getPrefix([]byte(prefix)))
   370  }
   371  
   372  func getPrefix(key []byte) []byte {
   373  	end := make([]byte, len(key))
   374  	copy(end, key)
   375  	for i := len(end) - 1; i >= 0; i-- {
   376  		if end[i] < 0xff {
   377  			end[i] = end[i] + 1
   378  			end = end[:i+1]
   379  			return end
   380  		}
   381  	}
   382  	// next prefix does not exist (e.g., 0xffff);
   383  	// default to WithFromKey policy
   384  	return noPrefixEnd
   385  }
   386  
   387  // WithPrefix enables 'Get', 'Delete', or 'Watch' requests to operate
   388  // on the keys with matching prefix. For example, 'Get(foo, WithPrefix())'
   389  // can return 'foo1', 'foo2', and so on.
   390  func WithPrefix() OpOption {
   391  	return func(op *Op) {
   392  		op.isOptsWithPrefix = true
   393  		if len(op.key) == 0 {
   394  			op.key, op.end = []byte{0}, []byte{0}
   395  			return
   396  		}
   397  		op.end = getPrefix(op.key)
   398  	}
   399  }
   400  
   401  // WithRange specifies the range of 'Get', 'Delete', 'Watch' requests.
   402  // For example, 'Get' requests with 'WithRange(end)' returns
   403  // the keys in the range [key, end).
   404  // endKey must be lexicographically greater than start key.
   405  func WithRange(endKey string) OpOption {
   406  	return func(op *Op) { op.end = []byte(endKey) }
   407  }
   408  
   409  // WithFromKey specifies the range of 'Get', 'Delete', 'Watch' requests
   410  // to be equal or greater than the key in the argument.
   411  func WithFromKey() OpOption {
   412  	return func(op *Op) {
   413  		if len(op.key) == 0 {
   414  			op.key = []byte{0}
   415  		}
   416  		op.end = []byte("\x00")
   417  		op.isOptsWithFromKey = true
   418  	}
   419  }
   420  
   421  // WithSerializable makes 'Get' request serializable. By default,
   422  // it's linearizable. Serializable requests are better for lower latency
   423  // requirement.
   424  func WithSerializable() OpOption {
   425  	return func(op *Op) { op.serializable = true }
   426  }
   427  
   428  // WithKeysOnly makes the 'Get' request return only the keys and the corresponding
   429  // values will be omitted.
   430  func WithKeysOnly() OpOption {
   431  	return func(op *Op) { op.keysOnly = true }
   432  }
   433  
   434  // WithCountOnly makes the 'Get' request return only the count of keys.
   435  func WithCountOnly() OpOption {
   436  	return func(op *Op) { op.countOnly = true }
   437  }
   438  
   439  // WithMinModRev filters out keys for Get with modification revisions less than the given revision.
   440  func WithMinModRev(rev int64) OpOption { return func(op *Op) { op.minModRev = rev } }
   441  
   442  // WithMaxModRev filters out keys for Get with modification revisions greater than the given revision.
   443  func WithMaxModRev(rev int64) OpOption { return func(op *Op) { op.maxModRev = rev } }
   444  
   445  // WithMinCreateRev filters out keys for Get with creation revisions less than the given revision.
   446  func WithMinCreateRev(rev int64) OpOption { return func(op *Op) { op.minCreateRev = rev } }
   447  
   448  // WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
   449  func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
   450  
   451  // WithFirstCreate gets the key with the oldest creation revision in the request range.
   452  func WithFirstCreate() []OpOption { return withTop(SortByCreateRevision, SortAscend) }
   453  
   454  // WithLastCreate gets the key with the latest creation revision in the request range.
   455  func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
   456  
   457  // WithFirstKey gets the lexically first key in the request range.
   458  func WithFirstKey() []OpOption { return withTop(SortByKey, SortAscend) }
   459  
   460  // WithLastKey gets the lexically last key in the request range.
   461  func WithLastKey() []OpOption { return withTop(SortByKey, SortDescend) }
   462  
   463  // WithFirstRev gets the key with the oldest modification revision in the request range.
   464  func WithFirstRev() []OpOption { return withTop(SortByModRevision, SortAscend) }
   465  
   466  // WithLastRev gets the key with the latest modification revision in the request range.
   467  func WithLastRev() []OpOption { return withTop(SortByModRevision, SortDescend) }
   468  
   469  // withTop gets the first key over the get's prefix given a sort order
   470  func withTop(target SortTarget, order SortOrder) []OpOption {
   471  	return []OpOption{WithPrefix(), WithSort(target, order), WithLimit(1)}
   472  }
   473  
   474  // WithProgressNotify makes watch server send periodic progress updates
   475  // every 10 minutes when there is no incoming events.
   476  // Progress updates have zero events in WatchResponse.
   477  func WithProgressNotify() OpOption {
   478  	return func(op *Op) {
   479  		op.progressNotify = true
   480  	}
   481  }
   482  
   483  // WithCreatedNotify makes watch server sends the created event.
   484  func WithCreatedNotify() OpOption {
   485  	return func(op *Op) {
   486  		op.createdNotify = true
   487  	}
   488  }
   489  
   490  // WithFilterPut discards PUT events from the watcher.
   491  func WithFilterPut() OpOption {
   492  	return func(op *Op) { op.filterPut = true }
   493  }
   494  
   495  // WithFilterDelete discards DELETE events from the watcher.
   496  func WithFilterDelete() OpOption {
   497  	return func(op *Op) { op.filterDelete = true }
   498  }
   499  
   500  // WithPrevKV gets the previous key-value pair before the event happens. If the previous KV is already compacted,
   501  // nothing will be returned.
   502  func WithPrevKV() OpOption {
   503  	return func(op *Op) {
   504  		op.prevKV = true
   505  	}
   506  }
   507  
   508  // WithFragment to receive raw watch response with fragmentation.
   509  // Fragmentation is disabled by default. If fragmentation is enabled,
   510  // etcd watch server will split watch response before sending to clients
   511  // when the total size of watch events exceed server-side request limit.
   512  // The default server-side request limit is 1.5 MiB, which can be configured
   513  // as "--max-request-bytes" flag value + gRPC-overhead 512 bytes.
   514  // See "etcdserver/api/v3rpc/watch.go" for more details.
   515  func WithFragment() OpOption {
   516  	return func(op *Op) { op.fragment = true }
   517  }
   518  
   519  // WithIgnoreValue updates the key using its current value.
   520  // This option can not be combined with non-empty values.
   521  // Returns an error if the key does not exist.
   522  func WithIgnoreValue() OpOption {
   523  	return func(op *Op) {
   524  		op.ignoreValue = true
   525  	}
   526  }
   527  
   528  // WithIgnoreLease updates the key using its current lease.
   529  // This option can not be combined with WithLease.
   530  // Returns an error if the key does not exist.
   531  func WithIgnoreLease() OpOption {
   532  	return func(op *Op) {
   533  		op.ignoreLease = true
   534  	}
   535  }
   536  
   537  // LeaseOp represents an Operation that lease can execute.
   538  type LeaseOp struct {
   539  	id LeaseID
   540  
   541  	// for TimeToLive
   542  	attachedKeys bool
   543  }
   544  
   545  // LeaseOption configures lease operations.
   546  type LeaseOption func(*LeaseOp)
   547  
   548  func (op *LeaseOp) applyOpts(opts []LeaseOption) {
   549  	for _, opt := range opts {
   550  		opt(op)
   551  	}
   552  }
   553  
   554  // WithAttachedKeys makes TimeToLive list the keys attached to the given lease ID.
   555  func WithAttachedKeys() LeaseOption {
   556  	return func(op *LeaseOp) { op.attachedKeys = true }
   557  }
   558  
   559  func toLeaseTimeToLiveRequest(id LeaseID, opts ...LeaseOption) *pb.LeaseTimeToLiveRequest {
   560  	ret := &LeaseOp{id: id}
   561  	ret.applyOpts(opts)
   562  	return &pb.LeaseTimeToLiveRequest{ID: int64(id), Keys: ret.attachedKeys}
   563  }
   564  
   565  // IsOptsWithPrefix returns true if WithPrefix option is called in the given opts.
   566  func IsOptsWithPrefix(opts []OpOption) bool {
   567  	ret := NewOp()
   568  	for _, opt := range opts {
   569  		opt(ret)
   570  	}
   571  
   572  	return ret.isOptsWithPrefix
   573  }
   574  
   575  // IsOptsWithFromKey returns true if WithFromKey option is called in the given opts.
   576  func IsOptsWithFromKey(opts []OpOption) bool {
   577  	ret := NewOp()
   578  	for _, opt := range opts {
   579  		opt(ret)
   580  	}
   581  
   582  	return ret.isOptsWithFromKey
   583  }
   584  

View as plain text