...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v2v3/store.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v2v3

     1  // Copyright 2017 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v2v3
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"path"
    21  	"sort"
    22  	"strings"
    23  	"time"
    24  
    25  	"go.etcd.io/etcd/api/v3/mvccpb"
    26  	"go.etcd.io/etcd/client/v3"
    27  	"go.etcd.io/etcd/client/v3/concurrency"
    28  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
    29  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    30  )
    31  
    32  // store implements the Store interface for V2 using
    33  // a v3 client.
    34  type v2v3Store struct {
    35  	c *clientv3.Client
    36  	// pfx is the v3 prefix where keys should be stored.
    37  	pfx string
    38  	ctx context.Context
    39  }
    40  
    41  const maxPathDepth = 63
    42  
    43  var errUnsupported = fmt.Errorf("TTLs are unsupported")
    44  
    45  func NewStore(c *clientv3.Client, pfx string) v2store.Store { return newStore(c, pfx) }
    46  
    47  func newStore(c *clientv3.Client, pfx string) *v2v3Store { return &v2v3Store{c, pfx, c.Ctx()} }
    48  
    49  func (s *v2v3Store) Index() uint64 { panic("STUB") }
    50  
    51  func (s *v2v3Store) Get(nodePath string, recursive, sorted bool) (*v2store.Event, error) {
    52  	key := s.mkPath(nodePath)
    53  	resp, err := s.c.Txn(s.ctx).Then(
    54  		clientv3.OpGet(key+"/"),
    55  		clientv3.OpGet(key),
    56  	).Commit()
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  
    61  	if kvs := resp.Responses[0].GetResponseRange().Kvs; len(kvs) != 0 || isRoot(nodePath) {
    62  		nodes, err := s.getDir(nodePath, recursive, sorted, resp.Header.Revision)
    63  		if err != nil {
    64  			return nil, err
    65  		}
    66  		cidx, midx := uint64(0), uint64(0)
    67  		if len(kvs) > 0 {
    68  			cidx, midx = mkV2Rev(kvs[0].CreateRevision), mkV2Rev(kvs[0].ModRevision)
    69  		}
    70  		return &v2store.Event{
    71  			Action: v2store.Get,
    72  			Node: &v2store.NodeExtern{
    73  				Key:           nodePath,
    74  				Dir:           true,
    75  				Nodes:         nodes,
    76  				CreatedIndex:  cidx,
    77  				ModifiedIndex: midx,
    78  			},
    79  			EtcdIndex: mkV2Rev(resp.Header.Revision),
    80  		}, nil
    81  	}
    82  
    83  	kvs := resp.Responses[1].GetResponseRange().Kvs
    84  	if len(kvs) == 0 {
    85  		return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
    86  	}
    87  
    88  	return &v2store.Event{
    89  		Action:    v2store.Get,
    90  		Node:      s.mkV2Node(kvs[0]),
    91  		EtcdIndex: mkV2Rev(resp.Header.Revision),
    92  	}, nil
    93  }
    94  
    95  func (s *v2v3Store) getDir(nodePath string, recursive, sorted bool, rev int64) ([]*v2store.NodeExtern, error) {
    96  	rootNodes, err := s.getDirDepth(nodePath, 1, rev)
    97  	if err != nil || !recursive {
    98  		if sorted {
    99  			sort.Sort(v2store.NodeExterns(rootNodes))
   100  		}
   101  		return rootNodes, err
   102  	}
   103  	nextNodes := rootNodes
   104  	nodes := make(map[string]*v2store.NodeExtern)
   105  	// Breadth walk the subdirectories
   106  	for i := 2; len(nextNodes) > 0; i++ {
   107  		for _, n := range nextNodes {
   108  			nodes[n.Key] = n
   109  			if parent := nodes[path.Dir(n.Key)]; parent != nil {
   110  				parent.Nodes = append(parent.Nodes, n)
   111  			}
   112  		}
   113  		if nextNodes, err = s.getDirDepth(nodePath, i, rev); err != nil {
   114  			return nil, err
   115  		}
   116  	}
   117  
   118  	if sorted {
   119  		sort.Sort(v2store.NodeExterns(rootNodes))
   120  	}
   121  	return rootNodes, nil
   122  }
   123  
   124  func (s *v2v3Store) getDirDepth(nodePath string, depth int, rev int64) ([]*v2store.NodeExtern, error) {
   125  	pd := s.mkPathDepth(nodePath, depth)
   126  	resp, err := s.c.Get(s.ctx, pd, clientv3.WithPrefix(), clientv3.WithRev(rev))
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  
   131  	nodes := make([]*v2store.NodeExtern, len(resp.Kvs))
   132  	for i, kv := range resp.Kvs {
   133  		nodes[i] = s.mkV2Node(kv)
   134  	}
   135  	return nodes, nil
   136  }
   137  
   138  func (s *v2v3Store) Set(
   139  	nodePath string,
   140  	dir bool,
   141  	value string,
   142  	expireOpts v2store.TTLOptionSet,
   143  ) (*v2store.Event, error) {
   144  	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
   145  		return nil, errUnsupported
   146  	}
   147  
   148  	if isRoot(nodePath) {
   149  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   150  	}
   151  
   152  	ecode := 0
   153  	applyf := func(stm concurrency.STM) error {
   154  		// build path if any directories in path do not exist
   155  		dirs := []string{}
   156  		for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
   157  			pp := s.mkPath(p)
   158  			if stm.Rev(pp) > 0 {
   159  				ecode = v2error.EcodeNotDir
   160  				return nil
   161  			}
   162  			if stm.Rev(pp+"/") == 0 {
   163  				dirs = append(dirs, pp+"/")
   164  			}
   165  		}
   166  		for _, d := range dirs {
   167  			stm.Put(d, "")
   168  		}
   169  
   170  		key := s.mkPath(nodePath)
   171  		if dir {
   172  			if stm.Rev(key) != 0 {
   173  				// exists as non-dir
   174  				ecode = v2error.EcodeNotDir
   175  				return nil
   176  			}
   177  			key = key + "/"
   178  		} else if stm.Rev(key+"/") != 0 {
   179  			ecode = v2error.EcodeNotFile
   180  			return nil
   181  		}
   182  		stm.Put(key, value, clientv3.WithPrevKV())
   183  		stm.Put(s.mkActionKey(), v2store.Set)
   184  		return nil
   185  	}
   186  
   187  	resp, err := s.newSTM(applyf)
   188  	if err != nil {
   189  		return nil, err
   190  	}
   191  	if ecode != 0 {
   192  		return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
   193  	}
   194  
   195  	createRev := resp.Header.Revision
   196  	var pn *v2store.NodeExtern
   197  	if pkv := prevKeyFromPuts(resp); pkv != nil {
   198  		pn = s.mkV2Node(pkv)
   199  		createRev = pkv.CreateRevision
   200  	}
   201  
   202  	vp := &value
   203  	if dir {
   204  		vp = nil
   205  	}
   206  	return &v2store.Event{
   207  		Action: v2store.Set,
   208  		Node: &v2store.NodeExtern{
   209  			Key:           nodePath,
   210  			Value:         vp,
   211  			Dir:           dir,
   212  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   213  			CreatedIndex:  mkV2Rev(createRev),
   214  		},
   215  		PrevNode:  pn,
   216  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   217  	}, nil
   218  }
   219  
   220  func (s *v2v3Store) Update(nodePath, newValue string, expireOpts v2store.TTLOptionSet) (*v2store.Event, error) {
   221  	if isRoot(nodePath) {
   222  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   223  	}
   224  
   225  	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
   226  		return nil, errUnsupported
   227  	}
   228  
   229  	key := s.mkPath(nodePath)
   230  	ecode := 0
   231  	applyf := func(stm concurrency.STM) error {
   232  		if rev := stm.Rev(key + "/"); rev != 0 {
   233  			ecode = v2error.EcodeNotFile
   234  			return nil
   235  		}
   236  		if rev := stm.Rev(key); rev == 0 {
   237  			ecode = v2error.EcodeKeyNotFound
   238  			return nil
   239  		}
   240  		stm.Put(key, newValue, clientv3.WithPrevKV())
   241  		stm.Put(s.mkActionKey(), v2store.Update)
   242  		return nil
   243  	}
   244  
   245  	resp, err := s.newSTM(applyf)
   246  	if err != nil {
   247  		return nil, err
   248  	}
   249  	if ecode != 0 {
   250  		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
   251  	}
   252  
   253  	pkv := prevKeyFromPuts(resp)
   254  	return &v2store.Event{
   255  		Action: v2store.Update,
   256  		Node: &v2store.NodeExtern{
   257  			Key:           nodePath,
   258  			Value:         &newValue,
   259  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   260  			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
   261  		},
   262  		PrevNode:  s.mkV2Node(pkv),
   263  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   264  	}, nil
   265  }
   266  
   267  func (s *v2v3Store) Create(
   268  	nodePath string,
   269  	dir bool,
   270  	value string,
   271  	unique bool,
   272  	expireOpts v2store.TTLOptionSet,
   273  ) (*v2store.Event, error) {
   274  	if isRoot(nodePath) {
   275  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   276  	}
   277  	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
   278  		return nil, errUnsupported
   279  	}
   280  	ecode := 0
   281  	applyf := func(stm concurrency.STM) error {
   282  		ecode = 0
   283  		key := s.mkPath(nodePath)
   284  		if unique {
   285  			// append unique item under the node path
   286  			for {
   287  				key = nodePath + "/" + fmt.Sprintf("%020s", time.Now())
   288  				key = path.Clean(path.Join("/", key))
   289  				key = s.mkPath(key)
   290  				if stm.Rev(key) == 0 {
   291  					break
   292  				}
   293  			}
   294  		}
   295  		if stm.Rev(key) > 0 || stm.Rev(key+"/") > 0 {
   296  			ecode = v2error.EcodeNodeExist
   297  			return nil
   298  		}
   299  		// build path if any directories in path do not exist
   300  		dirs := []string{}
   301  		for p := path.Dir(nodePath); !isRoot(p); p = path.Dir(p) {
   302  			pp := s.mkPath(p)
   303  			if stm.Rev(pp) > 0 {
   304  				ecode = v2error.EcodeNotDir
   305  				return nil
   306  			}
   307  			if stm.Rev(pp+"/") == 0 {
   308  				dirs = append(dirs, pp+"/")
   309  			}
   310  		}
   311  		for _, d := range dirs {
   312  			stm.Put(d, "")
   313  		}
   314  
   315  		if dir {
   316  			// directories marked with extra slash in key name
   317  			key += "/"
   318  		}
   319  		stm.Put(key, value)
   320  		stm.Put(s.mkActionKey(), v2store.Create)
   321  		return nil
   322  	}
   323  
   324  	resp, err := s.newSTM(applyf)
   325  	if err != nil {
   326  		return nil, err
   327  	}
   328  	if ecode != 0 {
   329  		return nil, v2error.NewError(ecode, nodePath, mkV2Rev(resp.Header.Revision))
   330  	}
   331  
   332  	var v *string
   333  	if !dir {
   334  		v = &value
   335  	}
   336  
   337  	return &v2store.Event{
   338  		Action: v2store.Create,
   339  		Node: &v2store.NodeExtern{
   340  			Key:           nodePath,
   341  			Value:         v,
   342  			Dir:           dir,
   343  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   344  			CreatedIndex:  mkV2Rev(resp.Header.Revision),
   345  		},
   346  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   347  	}, nil
   348  }
   349  
   350  func (s *v2v3Store) CompareAndSwap(
   351  	nodePath string,
   352  	prevValue string,
   353  	prevIndex uint64,
   354  	value string,
   355  	expireOpts v2store.TTLOptionSet,
   356  ) (*v2store.Event, error) {
   357  	if isRoot(nodePath) {
   358  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   359  	}
   360  	if expireOpts.Refresh || !expireOpts.ExpireTime.IsZero() {
   361  		return nil, errUnsupported
   362  	}
   363  
   364  	key := s.mkPath(nodePath)
   365  	resp, err := s.c.Txn(s.ctx).If(
   366  		s.mkCompare(nodePath, prevValue, prevIndex)...,
   367  	).Then(
   368  		clientv3.OpPut(key, value, clientv3.WithPrevKV()),
   369  		clientv3.OpPut(s.mkActionKey(), v2store.CompareAndSwap),
   370  	).Else(
   371  		clientv3.OpGet(key),
   372  		clientv3.OpGet(key+"/"),
   373  	).Commit()
   374  
   375  	if err != nil {
   376  		return nil, err
   377  	}
   378  	if !resp.Succeeded {
   379  		return nil, compareFail(nodePath, prevValue, prevIndex, resp)
   380  	}
   381  
   382  	pkv := resp.Responses[0].GetResponsePut().PrevKv
   383  	return &v2store.Event{
   384  		Action: v2store.CompareAndSwap,
   385  		Node: &v2store.NodeExtern{
   386  			Key:           nodePath,
   387  			Value:         &value,
   388  			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
   389  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   390  		},
   391  		PrevNode:  s.mkV2Node(pkv),
   392  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   393  	}, nil
   394  }
   395  
   396  func (s *v2v3Store) Delete(nodePath string, dir, recursive bool) (*v2store.Event, error) {
   397  	if isRoot(nodePath) {
   398  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   399  	}
   400  	if !dir && !recursive {
   401  		return s.deleteNode(nodePath)
   402  	}
   403  	if !recursive {
   404  		return s.deleteEmptyDir(nodePath)
   405  	}
   406  
   407  	dels := make([]clientv3.Op, maxPathDepth+1)
   408  	dels[0] = clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV())
   409  	for i := 1; i < maxPathDepth; i++ {
   410  		dels[i] = clientv3.OpDelete(s.mkPathDepth(nodePath, i), clientv3.WithPrefix())
   411  	}
   412  	dels[maxPathDepth] = clientv3.OpPut(s.mkActionKey(), v2store.Delete)
   413  
   414  	resp, err := s.c.Txn(s.ctx).If(
   415  		clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), ">", 0),
   416  		clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, maxPathDepth)+"/"), "=", 0),
   417  	).Then(
   418  		dels...,
   419  	).Commit()
   420  	if err != nil {
   421  		return nil, err
   422  	}
   423  	if !resp.Succeeded {
   424  		return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
   425  	}
   426  	dresp := resp.Responses[0].GetResponseDeleteRange()
   427  	return &v2store.Event{
   428  		Action:    v2store.Delete,
   429  		PrevNode:  s.mkV2Node(dresp.PrevKvs[0]),
   430  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   431  	}, nil
   432  }
   433  
   434  func (s *v2v3Store) deleteEmptyDir(nodePath string) (*v2store.Event, error) {
   435  	resp, err := s.c.Txn(s.ctx).If(
   436  		clientv3.Compare(clientv3.Version(s.mkPathDepth(nodePath, 1)), "=", 0).WithPrefix(),
   437  	).Then(
   438  		clientv3.OpDelete(s.mkPath(nodePath)+"/", clientv3.WithPrevKV()),
   439  		clientv3.OpPut(s.mkActionKey(), v2store.Delete),
   440  	).Commit()
   441  	if err != nil {
   442  		return nil, err
   443  	}
   444  	if !resp.Succeeded {
   445  		return nil, v2error.NewError(v2error.EcodeDirNotEmpty, nodePath, mkV2Rev(resp.Header.Revision))
   446  	}
   447  	dresp := resp.Responses[0].GetResponseDeleteRange()
   448  	if len(dresp.PrevKvs) == 0 {
   449  		return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, mkV2Rev(resp.Header.Revision))
   450  	}
   451  	return &v2store.Event{
   452  		Action:    v2store.Delete,
   453  		PrevNode:  s.mkV2Node(dresp.PrevKvs[0]),
   454  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   455  	}, nil
   456  }
   457  
   458  func (s *v2v3Store) deleteNode(nodePath string) (*v2store.Event, error) {
   459  	resp, err := s.c.Txn(s.ctx).If(
   460  		clientv3.Compare(clientv3.Version(s.mkPath(nodePath)+"/"), "=", 0),
   461  	).Then(
   462  		clientv3.OpDelete(s.mkPath(nodePath), clientv3.WithPrevKV()),
   463  		clientv3.OpPut(s.mkActionKey(), v2store.Delete),
   464  	).Commit()
   465  	if err != nil {
   466  		return nil, err
   467  	}
   468  	if !resp.Succeeded {
   469  		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
   470  	}
   471  	pkvs := resp.Responses[0].GetResponseDeleteRange().PrevKvs
   472  	if len(pkvs) == 0 {
   473  		return nil, v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
   474  	}
   475  	pkv := pkvs[0]
   476  	return &v2store.Event{
   477  		Action: v2store.Delete,
   478  		Node: &v2store.NodeExtern{
   479  			Key:           nodePath,
   480  			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
   481  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   482  		},
   483  		PrevNode:  s.mkV2Node(pkv),
   484  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   485  	}, nil
   486  }
   487  
   488  func (s *v2v3Store) CompareAndDelete(nodePath, prevValue string, prevIndex uint64) (*v2store.Event, error) {
   489  	if isRoot(nodePath) {
   490  		return nil, v2error.NewError(v2error.EcodeRootROnly, nodePath, 0)
   491  	}
   492  
   493  	key := s.mkPath(nodePath)
   494  	resp, err := s.c.Txn(s.ctx).If(
   495  		s.mkCompare(nodePath, prevValue, prevIndex)...,
   496  	).Then(
   497  		clientv3.OpDelete(key, clientv3.WithPrevKV()),
   498  		clientv3.OpPut(s.mkActionKey(), v2store.CompareAndDelete),
   499  	).Else(
   500  		clientv3.OpGet(key),
   501  		clientv3.OpGet(key+"/"),
   502  	).Commit()
   503  
   504  	if err != nil {
   505  		return nil, err
   506  	}
   507  	if !resp.Succeeded {
   508  		return nil, compareFail(nodePath, prevValue, prevIndex, resp)
   509  	}
   510  
   511  	// len(pkvs) > 1 since txn only succeeds when key exists
   512  	pkv := resp.Responses[0].GetResponseDeleteRange().PrevKvs[0]
   513  	return &v2store.Event{
   514  		Action: v2store.CompareAndDelete,
   515  		Node: &v2store.NodeExtern{
   516  			Key:           nodePath,
   517  			CreatedIndex:  mkV2Rev(pkv.CreateRevision),
   518  			ModifiedIndex: mkV2Rev(resp.Header.Revision),
   519  		},
   520  		PrevNode:  s.mkV2Node(pkv),
   521  		EtcdIndex: mkV2Rev(resp.Header.Revision),
   522  	}, nil
   523  }
   524  
   525  func compareFail(nodePath, prevValue string, prevIndex uint64, resp *clientv3.TxnResponse) error {
   526  	if dkvs := resp.Responses[1].GetResponseRange().Kvs; len(dkvs) > 0 {
   527  		return v2error.NewError(v2error.EcodeNotFile, nodePath, mkV2Rev(resp.Header.Revision))
   528  	}
   529  	kvs := resp.Responses[0].GetResponseRange().Kvs
   530  	if len(kvs) == 0 {
   531  		return v2error.NewError(v2error.EcodeKeyNotFound, nodePath, mkV2Rev(resp.Header.Revision))
   532  	}
   533  	kv := kvs[0]
   534  	indexMatch := prevIndex == 0 || kv.ModRevision == int64(prevIndex)
   535  	valueMatch := prevValue == "" || string(kv.Value) == prevValue
   536  	var cause string
   537  	switch {
   538  	case indexMatch && !valueMatch:
   539  		cause = fmt.Sprintf("[%v != %v]", prevValue, string(kv.Value))
   540  	case valueMatch && !indexMatch:
   541  		cause = fmt.Sprintf("[%v != %v]", prevIndex, kv.ModRevision)
   542  	default:
   543  		cause = fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, string(kv.Value), prevIndex, kv.ModRevision)
   544  	}
   545  	return v2error.NewError(v2error.EcodeTestFailed, cause, mkV2Rev(resp.Header.Revision))
   546  }
   547  
   548  func (s *v2v3Store) mkCompare(nodePath, prevValue string, prevIndex uint64) []clientv3.Cmp {
   549  	key := s.mkPath(nodePath)
   550  	cmps := []clientv3.Cmp{clientv3.Compare(clientv3.Version(key), ">", 0)}
   551  	if prevIndex != 0 {
   552  		cmps = append(cmps, clientv3.Compare(clientv3.ModRevision(key), "=", mkV3Rev(prevIndex)))
   553  	}
   554  	if prevValue != "" {
   555  		cmps = append(cmps, clientv3.Compare(clientv3.Value(key), "=", prevValue))
   556  	}
   557  	return cmps
   558  }
   559  
   560  func (s *v2v3Store) JsonStats() []byte                  { panic("STUB") }
   561  func (s *v2v3Store) DeleteExpiredKeys(cutoff time.Time) { panic("STUB") }
   562  
   563  func (s *v2v3Store) Version() int { return 2 }
   564  
   565  // TODO: move this out of the Store interface?
   566  
   567  func (s *v2v3Store) Save() ([]byte, error)       { panic("STUB") }
   568  func (s *v2v3Store) Recovery(state []byte) error { panic("STUB") }
   569  func (s *v2v3Store) Clone() v2store.Store        { panic("STUB") }
   570  func (s *v2v3Store) SaveNoCopy() ([]byte, error) { panic("STUB") }
   571  func (s *v2v3Store) HasTTLKeys() bool            { panic("STUB") }
   572  
   573  func (s *v2v3Store) mkPath(nodePath string) string { return s.mkPathDepth(nodePath, 0) }
   574  
   575  func (s *v2v3Store) mkNodePath(p string) string {
   576  	return path.Clean(p[len(s.pfx)+len("/k/000/"):])
   577  }
   578  
   579  // mkPathDepth makes a path to a key that encodes its directory depth
   580  // for fast directory listing. If a depth is provided, it is added
   581  // to the computed depth.
   582  func (s *v2v3Store) mkPathDepth(nodePath string, depth int) string {
   583  	normalForm := path.Clean(path.Join("/", nodePath))
   584  	n := strings.Count(normalForm, "/") + depth
   585  	return fmt.Sprintf("%s/%03d/k/%s", s.pfx, n, normalForm)
   586  }
   587  
   588  func (s *v2v3Store) mkActionKey() string { return s.pfx + "/act" }
   589  
   590  func isRoot(s string) bool { return len(s) == 0 || s == "/" || s == "/0" || s == "/1" }
   591  
   592  func mkV2Rev(v3Rev int64) uint64 {
   593  	if v3Rev == 0 {
   594  		return 0
   595  	}
   596  	return uint64(v3Rev - 1)
   597  }
   598  
   599  func mkV3Rev(v2Rev uint64) int64 {
   600  	if v2Rev == 0 {
   601  		return 0
   602  	}
   603  	return int64(v2Rev + 1)
   604  }
   605  
   606  // mkV2Node creates a V2 NodeExtern from a V3 KeyValue
   607  func (s *v2v3Store) mkV2Node(kv *mvccpb.KeyValue) *v2store.NodeExtern {
   608  	if kv == nil {
   609  		return nil
   610  	}
   611  	n := &v2store.NodeExtern{
   612  		Key:           s.mkNodePath(string(kv.Key)),
   613  		Dir:           kv.Key[len(kv.Key)-1] == '/',
   614  		CreatedIndex:  mkV2Rev(kv.CreateRevision),
   615  		ModifiedIndex: mkV2Rev(kv.ModRevision),
   616  	}
   617  	if !n.Dir {
   618  		v := string(kv.Value)
   619  		n.Value = &v
   620  	}
   621  	return n
   622  }
   623  
   624  // prevKeyFromPuts gets the prev key that is being put; ignores
   625  // the put action response.
   626  func prevKeyFromPuts(resp *clientv3.TxnResponse) *mvccpb.KeyValue {
   627  	for _, r := range resp.Responses {
   628  		pkv := r.GetResponsePut().PrevKv
   629  		if pkv != nil && pkv.CreateRevision > 0 {
   630  			return pkv
   631  		}
   632  	}
   633  	return nil
   634  }
   635  
   636  func (s *v2v3Store) newSTM(applyf func(concurrency.STM) error) (*clientv3.TxnResponse, error) {
   637  	return concurrency.NewSTM(s.c, applyf, concurrency.WithIsolation(concurrency.Serializable))
   638  }
   639  

View as plain text