...

Source file src/go.etcd.io/etcd/client/v3/experimental/recipes/key.go

Documentation: go.etcd.io/etcd/client/v3/experimental/recipes

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package recipe
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"strings"
    21  	"time"
    22  
    23  	v3 "go.etcd.io/etcd/client/v3"
    24  	"go.etcd.io/etcd/client/v3/concurrency"
    25  )
    26  
    27  // RemoteKV is a key/revision pair created by the client and stored on etcd
    28  type RemoteKV struct {
    29  	kv  v3.KV
    30  	key string
    31  	rev int64
    32  	val string
    33  }
    34  
    35  func newKey(kv v3.KV, key string, leaseID v3.LeaseID) (*RemoteKV, error) {
    36  	return newKV(kv, key, "", leaseID)
    37  }
    38  
    39  func newKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (*RemoteKV, error) {
    40  	rev, err := putNewKV(kv, key, val, leaseID)
    41  	if err != nil {
    42  		return nil, err
    43  	}
    44  	return &RemoteKV{kv, key, rev, val}, nil
    45  }
    46  
    47  func newUniqueKV(kv v3.KV, prefix string, val string) (*RemoteKV, error) {
    48  	for {
    49  		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
    50  		rev, err := putNewKV(kv, newKey, val, v3.NoLease)
    51  		if err == nil {
    52  			return &RemoteKV{kv, newKey, rev, val}, nil
    53  		}
    54  		if err != ErrKeyExists {
    55  			return nil, err
    56  		}
    57  	}
    58  }
    59  
    60  // putNewKV attempts to create the given key, only succeeding if the key did
    61  // not yet exist.
    62  func putNewKV(kv v3.KV, key, val string, leaseID v3.LeaseID) (int64, error) {
    63  	cmp := v3.Compare(v3.Version(key), "=", 0)
    64  	req := v3.OpPut(key, val, v3.WithLease(leaseID))
    65  	txnresp, err := kv.Txn(context.TODO()).If(cmp).Then(req).Commit()
    66  	if err != nil {
    67  		return 0, err
    68  	}
    69  	if !txnresp.Succeeded {
    70  		return 0, ErrKeyExists
    71  	}
    72  	return txnresp.Header.Revision, nil
    73  }
    74  
    75  // newSequentialKV allocates a new sequential key <prefix>/nnnnn with a given
    76  // prefix and value. Note: a bookkeeping node __<prefix> is also allocated.
    77  func newSequentialKV(kv v3.KV, prefix, val string) (*RemoteKV, error) {
    78  	resp, err := kv.Get(context.TODO(), prefix, v3.WithLastKey()...)
    79  	if err != nil {
    80  		return nil, err
    81  	}
    82  
    83  	// add 1 to last key, if any
    84  	newSeqNum := 0
    85  	if len(resp.Kvs) != 0 {
    86  		fields := strings.Split(string(resp.Kvs[0].Key), "/")
    87  		_, serr := fmt.Sscanf(fields[len(fields)-1], "%d", &newSeqNum)
    88  		if serr != nil {
    89  			return nil, serr
    90  		}
    91  		newSeqNum++
    92  	}
    93  	newKey := fmt.Sprintf("%s/%016d", prefix, newSeqNum)
    94  
    95  	// base prefix key must be current (i.e., <=) with the server update;
    96  	// the base key is important to avoid the following:
    97  	// N1: LastKey() == 1, start txn.
    98  	// N2: new Key 2, new Key 3, Delete Key 2
    99  	// N1: txn succeeds allocating key 2 when it shouldn't
   100  	baseKey := "__" + prefix
   101  
   102  	// current revision might contain modification so +1
   103  	cmp := v3.Compare(v3.ModRevision(baseKey), "<", resp.Header.Revision+1)
   104  	reqPrefix := v3.OpPut(baseKey, "")
   105  	reqnewKey := v3.OpPut(newKey, val)
   106  
   107  	txn := kv.Txn(context.TODO())
   108  	txnresp, err := txn.If(cmp).Then(reqPrefix, reqnewKey).Commit()
   109  	if err != nil {
   110  		return nil, err
   111  	}
   112  	if !txnresp.Succeeded {
   113  		return newSequentialKV(kv, prefix, val)
   114  	}
   115  	return &RemoteKV{kv, newKey, txnresp.Header.Revision, val}, nil
   116  }
   117  
   118  func (rk *RemoteKV) Key() string     { return rk.key }
   119  func (rk *RemoteKV) Revision() int64 { return rk.rev }
   120  func (rk *RemoteKV) Value() string   { return rk.val }
   121  
   122  func (rk *RemoteKV) Delete() error {
   123  	if rk.kv == nil {
   124  		return nil
   125  	}
   126  	_, err := rk.kv.Delete(context.TODO(), rk.key)
   127  	rk.kv = nil
   128  	return err
   129  }
   130  
   131  func (rk *RemoteKV) Put(val string) error {
   132  	_, err := rk.kv.Put(context.TODO(), rk.key, val)
   133  	return err
   134  }
   135  
   136  // EphemeralKV is a new key associated with a session lease
   137  type EphemeralKV struct{ RemoteKV }
   138  
   139  // newEphemeralKV creates a new key/value pair associated with a session lease
   140  func newEphemeralKV(s *concurrency.Session, key, val string) (*EphemeralKV, error) {
   141  	k, err := newKV(s.Client(), key, val, s.Lease())
   142  	if err != nil {
   143  		return nil, err
   144  	}
   145  	return &EphemeralKV{*k}, nil
   146  }
   147  
   148  // newUniqueEphemeralKey creates a new unique valueless key associated with a session lease
   149  func newUniqueEphemeralKey(s *concurrency.Session, prefix string) (*EphemeralKV, error) {
   150  	return newUniqueEphemeralKV(s, prefix, "")
   151  }
   152  
   153  // newUniqueEphemeralKV creates a new unique key/value pair associated with a session lease
   154  func newUniqueEphemeralKV(s *concurrency.Session, prefix, val string) (ek *EphemeralKV, err error) {
   155  	for {
   156  		newKey := fmt.Sprintf("%s/%v", prefix, time.Now().UnixNano())
   157  		ek, err = newEphemeralKV(s, newKey, val)
   158  		if err == nil || err != ErrKeyExists {
   159  			break
   160  		}
   161  	}
   162  	return ek, err
   163  }
   164  

View as plain text