...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v2store/store.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v2store

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v2store
    16  
    17  import (
    18  	"encoding/json"
    19  	"fmt"
    20  	"path"
    21  	"strconv"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	"go.etcd.io/etcd/client/pkg/v3/types"
    27  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
    28  
    29  	"github.com/jonboulle/clockwork"
    30  )
    31  
    32  // The default version to set when the store is first initialized.
    33  const defaultVersion = 2
    34  
    35  var minExpireTime time.Time
    36  
    37  func init() {
    38  	minExpireTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
    39  }
    40  
    41  type Store interface {
    42  	Version() int
    43  	Index() uint64
    44  
    45  	Get(nodePath string, recursive, sorted bool) (*Event, error)
    46  	Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error)
    47  	Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error)
    48  	Create(nodePath string, dir bool, value string, unique bool,
    49  		expireOpts TTLOptionSet) (*Event, error)
    50  	CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
    51  		value string, expireOpts TTLOptionSet) (*Event, error)
    52  	Delete(nodePath string, dir, recursive bool) (*Event, error)
    53  	CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error)
    54  
    55  	Watch(prefix string, recursive, stream bool, sinceIndex uint64) (Watcher, error)
    56  
    57  	Save() ([]byte, error)
    58  	Recovery(state []byte) error
    59  
    60  	Clone() Store
    61  	SaveNoCopy() ([]byte, error)
    62  
    63  	JsonStats() []byte
    64  	DeleteExpiredKeys(cutoff time.Time)
    65  
    66  	HasTTLKeys() bool
    67  }
    68  
    69  type TTLOptionSet struct {
    70  	ExpireTime time.Time
    71  	Refresh    bool
    72  }
    73  
    74  type store struct {
    75  	Root           *node
    76  	WatcherHub     *watcherHub
    77  	CurrentIndex   uint64
    78  	Stats          *Stats
    79  	CurrentVersion int
    80  	ttlKeyHeap     *ttlKeyHeap  // need to recovery manually
    81  	worldLock      sync.RWMutex // stop the world lock
    82  	clock          clockwork.Clock
    83  	readonlySet    types.Set
    84  }
    85  
    86  // New creates a store where the given namespaces will be created as initial directories.
    87  func New(namespaces ...string) Store {
    88  	s := newStore(namespaces...)
    89  	s.clock = clockwork.NewRealClock()
    90  	return s
    91  }
    92  
    93  func newStore(namespaces ...string) *store {
    94  	s := new(store)
    95  	s.CurrentVersion = defaultVersion
    96  	s.Root = newDir(s, "/", s.CurrentIndex, nil, Permanent)
    97  	for _, namespace := range namespaces {
    98  		s.Root.Add(newDir(s, namespace, s.CurrentIndex, s.Root, Permanent))
    99  	}
   100  	s.Stats = newStats()
   101  	s.WatcherHub = newWatchHub(1000)
   102  	s.ttlKeyHeap = newTtlKeyHeap()
   103  	s.readonlySet = types.NewUnsafeSet(append(namespaces, "/")...)
   104  	return s
   105  }
   106  
   107  // Version retrieves current version of the store.
   108  func (s *store) Version() int {
   109  	return s.CurrentVersion
   110  }
   111  
   112  // Index retrieves the current index of the store.
   113  func (s *store) Index() uint64 {
   114  	s.worldLock.RLock()
   115  	defer s.worldLock.RUnlock()
   116  	return s.CurrentIndex
   117  }
   118  
   119  // Get returns a get event.
   120  // If recursive is true, it will return all the content under the node path.
   121  // If sorted is true, it will sort the content by keys.
   122  func (s *store) Get(nodePath string, recursive, sorted bool) (*Event, error) {
   123  	var err *v2error.Error
   124  
   125  	s.worldLock.RLock()
   126  	defer s.worldLock.RUnlock()
   127  
   128  	defer func() {
   129  		if err == nil {
   130  			s.Stats.Inc(GetSuccess)
   131  			if recursive {
   132  				reportReadSuccess(GetRecursive)
   133  			} else {
   134  				reportReadSuccess(Get)
   135  			}
   136  			return
   137  		}
   138  
   139  		s.Stats.Inc(GetFail)
   140  		if recursive {
   141  			reportReadFailure(GetRecursive)
   142  		} else {
   143  			reportReadFailure(Get)
   144  		}
   145  	}()
   146  
   147  	n, err := s.internalGet(nodePath)
   148  	if err != nil {
   149  		return nil, err
   150  	}
   151  
   152  	e := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
   153  	e.EtcdIndex = s.CurrentIndex
   154  	e.Node.loadInternalNode(n, recursive, sorted, s.clock)
   155  
   156  	return e, nil
   157  }
   158  
   159  // Create creates the node at nodePath. Create will help to create intermediate directories with no ttl.
   160  // If the node has already existed, create will fail.
   161  // If any node on the path is a file, create will fail.
   162  func (s *store) Create(nodePath string, dir bool, value string, unique bool, expireOpts TTLOptionSet) (*Event, error) {
   163  	var err *v2error.Error
   164  
   165  	s.worldLock.Lock()
   166  	defer s.worldLock.Unlock()
   167  
   168  	defer func() {
   169  		if err == nil {
   170  			s.Stats.Inc(CreateSuccess)
   171  			reportWriteSuccess(Create)
   172  			return
   173  		}
   174  
   175  		s.Stats.Inc(CreateFail)
   176  		reportWriteFailure(Create)
   177  	}()
   178  
   179  	e, err := s.internalCreate(nodePath, dir, value, unique, false, expireOpts.ExpireTime, Create)
   180  	if err != nil {
   181  		return nil, err
   182  	}
   183  
   184  	e.EtcdIndex = s.CurrentIndex
   185  	s.WatcherHub.notify(e)
   186  
   187  	return e, nil
   188  }
   189  
   190  // Set creates or replace the node at nodePath.
   191  func (s *store) Set(nodePath string, dir bool, value string, expireOpts TTLOptionSet) (*Event, error) {
   192  	var err *v2error.Error
   193  
   194  	s.worldLock.Lock()
   195  	defer s.worldLock.Unlock()
   196  
   197  	defer func() {
   198  		if err == nil {
   199  			s.Stats.Inc(SetSuccess)
   200  			reportWriteSuccess(Set)
   201  			return
   202  		}
   203  
   204  		s.Stats.Inc(SetFail)
   205  		reportWriteFailure(Set)
   206  	}()
   207  
   208  	// Get prevNode value
   209  	n, getErr := s.internalGet(nodePath)
   210  	if getErr != nil && getErr.ErrorCode != v2error.EcodeKeyNotFound {
   211  		err = getErr
   212  		return nil, err
   213  	}
   214  
   215  	if expireOpts.Refresh {
   216  		if getErr != nil {
   217  			err = getErr
   218  			return nil, err
   219  		}
   220  		value = n.Value
   221  	}
   222  
   223  	// Set new value
   224  	e, err := s.internalCreate(nodePath, dir, value, false, true, expireOpts.ExpireTime, Set)
   225  	if err != nil {
   226  		return nil, err
   227  	}
   228  	e.EtcdIndex = s.CurrentIndex
   229  
   230  	// Put prevNode into event
   231  	if getErr == nil {
   232  		prev := newEvent(Get, nodePath, n.ModifiedIndex, n.CreatedIndex)
   233  		prev.Node.loadInternalNode(n, false, false, s.clock)
   234  		e.PrevNode = prev.Node
   235  	}
   236  
   237  	if !expireOpts.Refresh {
   238  		s.WatcherHub.notify(e)
   239  	} else {
   240  		e.SetRefresh()
   241  		s.WatcherHub.add(e)
   242  	}
   243  
   244  	return e, nil
   245  }
   246  
   247  // returns user-readable cause of failed comparison
   248  func getCompareFailCause(n *node, which int, prevValue string, prevIndex uint64) string {
   249  	switch which {
   250  	case CompareIndexNotMatch:
   251  		return fmt.Sprintf("[%v != %v]", prevIndex, n.ModifiedIndex)
   252  	case CompareValueNotMatch:
   253  		return fmt.Sprintf("[%v != %v]", prevValue, n.Value)
   254  	default:
   255  		return fmt.Sprintf("[%v != %v] [%v != %v]", prevValue, n.Value, prevIndex, n.ModifiedIndex)
   256  	}
   257  }
   258  
   259  func (s *store) CompareAndSwap(nodePath string, prevValue string, prevIndex uint64,
   260  	value string, expireOpts TTLOptionSet) (*Event, error) {
   261  
   262  	var err *v2error.Error
   263  
   264  	s.worldLock.Lock()
   265  	defer s.worldLock.Unlock()
   266  
   267  	defer func() {
   268  		if err == nil {
   269  			s.Stats.Inc(CompareAndSwapSuccess)
   270  			reportWriteSuccess(CompareAndSwap)
   271  			return
   272  		}
   273  
   274  		s.Stats.Inc(CompareAndSwapFail)
   275  		reportWriteFailure(CompareAndSwap)
   276  	}()
   277  
   278  	nodePath = path.Clean(path.Join("/", nodePath))
   279  	// we do not allow the user to change "/"
   280  	if s.readonlySet.Contains(nodePath) {
   281  		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
   282  	}
   283  
   284  	n, err := s.internalGet(nodePath)
   285  	if err != nil {
   286  		return nil, err
   287  	}
   288  	if n.IsDir() { // can only compare and swap file
   289  		err = v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
   290  		return nil, err
   291  	}
   292  
   293  	// If both of the prevValue and prevIndex are given, we will test both of them.
   294  	// Command will be executed, only if both of the tests are successful.
   295  	if ok, which := n.Compare(prevValue, prevIndex); !ok {
   296  		cause := getCompareFailCause(n, which, prevValue, prevIndex)
   297  		err = v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
   298  		return nil, err
   299  	}
   300  
   301  	if expireOpts.Refresh {
   302  		value = n.Value
   303  	}
   304  
   305  	// update etcd index
   306  	s.CurrentIndex++
   307  
   308  	e := newEvent(CompareAndSwap, nodePath, s.CurrentIndex, n.CreatedIndex)
   309  	e.EtcdIndex = s.CurrentIndex
   310  	e.PrevNode = n.Repr(false, false, s.clock)
   311  	eNode := e.Node
   312  
   313  	// if test succeed, write the value
   314  	if err := n.Write(value, s.CurrentIndex); err != nil {
   315  		return nil, err
   316  	}
   317  	n.UpdateTTL(expireOpts.ExpireTime)
   318  
   319  	// copy the value for safety
   320  	valueCopy := value
   321  	eNode.Value = &valueCopy
   322  	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
   323  
   324  	if !expireOpts.Refresh {
   325  		s.WatcherHub.notify(e)
   326  	} else {
   327  		e.SetRefresh()
   328  		s.WatcherHub.add(e)
   329  	}
   330  
   331  	return e, nil
   332  }
   333  
   334  // Delete deletes the node at the given path.
   335  // If the node is a directory, recursive must be true to delete it.
   336  func (s *store) Delete(nodePath string, dir, recursive bool) (*Event, error) {
   337  	var err *v2error.Error
   338  
   339  	s.worldLock.Lock()
   340  	defer s.worldLock.Unlock()
   341  
   342  	defer func() {
   343  		if err == nil {
   344  			s.Stats.Inc(DeleteSuccess)
   345  			reportWriteSuccess(Delete)
   346  			return
   347  		}
   348  
   349  		s.Stats.Inc(DeleteFail)
   350  		reportWriteFailure(Delete)
   351  	}()
   352  
   353  	nodePath = path.Clean(path.Join("/", nodePath))
   354  	// we do not allow the user to change "/"
   355  	if s.readonlySet.Contains(nodePath) {
   356  		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
   357  	}
   358  
   359  	// recursive implies dir
   360  	if recursive {
   361  		dir = true
   362  	}
   363  
   364  	n, err := s.internalGet(nodePath)
   365  	if err != nil { // if the node does not exist, return error
   366  		return nil, err
   367  	}
   368  
   369  	nextIndex := s.CurrentIndex + 1
   370  	e := newEvent(Delete, nodePath, nextIndex, n.CreatedIndex)
   371  	e.EtcdIndex = nextIndex
   372  	e.PrevNode = n.Repr(false, false, s.clock)
   373  	eNode := e.Node
   374  
   375  	if n.IsDir() {
   376  		eNode.Dir = true
   377  	}
   378  
   379  	callback := func(path string) { // notify function
   380  		// notify the watchers with deleted set true
   381  		s.WatcherHub.notifyWatchers(e, path, true)
   382  	}
   383  
   384  	err = n.Remove(dir, recursive, callback)
   385  	if err != nil {
   386  		return nil, err
   387  	}
   388  
   389  	// update etcd index
   390  	s.CurrentIndex++
   391  
   392  	s.WatcherHub.notify(e)
   393  
   394  	return e, nil
   395  }
   396  
   397  func (s *store) CompareAndDelete(nodePath string, prevValue string, prevIndex uint64) (*Event, error) {
   398  	var err *v2error.Error
   399  
   400  	s.worldLock.Lock()
   401  	defer s.worldLock.Unlock()
   402  
   403  	defer func() {
   404  		if err == nil {
   405  			s.Stats.Inc(CompareAndDeleteSuccess)
   406  			reportWriteSuccess(CompareAndDelete)
   407  			return
   408  		}
   409  
   410  		s.Stats.Inc(CompareAndDeleteFail)
   411  		reportWriteFailure(CompareAndDelete)
   412  	}()
   413  
   414  	nodePath = path.Clean(path.Join("/", nodePath))
   415  
   416  	n, err := s.internalGet(nodePath)
   417  	if err != nil { // if the node does not exist, return error
   418  		return nil, err
   419  	}
   420  	if n.IsDir() { // can only compare and delete file
   421  		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, s.CurrentIndex)
   422  	}
   423  
   424  	// If both of the prevValue and prevIndex are given, we will test both of them.
   425  	// Command will be executed, only if both of the tests are successful.
   426  	if ok, which := n.Compare(prevValue, prevIndex); !ok {
   427  		cause := getCompareFailCause(n, which, prevValue, prevIndex)
   428  		return nil, v2error.NewError(v2error.EcodeTestFailed, cause, s.CurrentIndex)
   429  	}
   430  
   431  	// update etcd index
   432  	s.CurrentIndex++
   433  
   434  	e := newEvent(CompareAndDelete, nodePath, s.CurrentIndex, n.CreatedIndex)
   435  	e.EtcdIndex = s.CurrentIndex
   436  	e.PrevNode = n.Repr(false, false, s.clock)
   437  
   438  	callback := func(path string) { // notify function
   439  		// notify the watchers with deleted set true
   440  		s.WatcherHub.notifyWatchers(e, path, true)
   441  	}
   442  
   443  	err = n.Remove(false, false, callback)
   444  	if err != nil {
   445  		return nil, err
   446  	}
   447  
   448  	s.WatcherHub.notify(e)
   449  
   450  	return e, nil
   451  }
   452  
   453  func (s *store) Watch(key string, recursive, stream bool, sinceIndex uint64) (Watcher, error) {
   454  	s.worldLock.RLock()
   455  	defer s.worldLock.RUnlock()
   456  
   457  	key = path.Clean(path.Join("/", key))
   458  	if sinceIndex == 0 {
   459  		sinceIndex = s.CurrentIndex + 1
   460  	}
   461  	// WatcherHub does not know about the current index, so we need to pass it in
   462  	w, err := s.WatcherHub.watch(key, recursive, stream, sinceIndex, s.CurrentIndex)
   463  	if err != nil {
   464  		return nil, err
   465  	}
   466  
   467  	return w, nil
   468  }
   469  
   470  // walk walks all the nodePath and apply the walkFunc on each directory
   471  func (s *store) walk(nodePath string, walkFunc func(prev *node, component string) (*node, *v2error.Error)) (*node, *v2error.Error) {
   472  	components := strings.Split(nodePath, "/")
   473  
   474  	curr := s.Root
   475  	var err *v2error.Error
   476  
   477  	for i := 1; i < len(components); i++ {
   478  		if len(components[i]) == 0 { // ignore empty string
   479  			return curr, nil
   480  		}
   481  
   482  		curr, err = walkFunc(curr, components[i])
   483  		if err != nil {
   484  			return nil, err
   485  		}
   486  	}
   487  
   488  	return curr, nil
   489  }
   490  
   491  // Update updates the value/ttl of the node.
   492  // If the node is a file, the value and the ttl can be updated.
   493  // If the node is a directory, only the ttl can be updated.
   494  func (s *store) Update(nodePath string, newValue string, expireOpts TTLOptionSet) (*Event, error) {
   495  	var err *v2error.Error
   496  
   497  	s.worldLock.Lock()
   498  	defer s.worldLock.Unlock()
   499  
   500  	defer func() {
   501  		if err == nil {
   502  			s.Stats.Inc(UpdateSuccess)
   503  			reportWriteSuccess(Update)
   504  			return
   505  		}
   506  
   507  		s.Stats.Inc(UpdateFail)
   508  		reportWriteFailure(Update)
   509  	}()
   510  
   511  	nodePath = path.Clean(path.Join("/", nodePath))
   512  	// we do not allow the user to change "/"
   513  	if s.readonlySet.Contains(nodePath) {
   514  		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", s.CurrentIndex)
   515  	}
   516  
   517  	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
   518  
   519  	n, err := s.internalGet(nodePath)
   520  	if err != nil { // if the node does not exist, return error
   521  		return nil, err
   522  	}
   523  	if n.IsDir() && len(newValue) != 0 {
   524  		// if the node is a directory, we cannot update value to non-empty
   525  		return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
   526  	}
   527  
   528  	if expireOpts.Refresh {
   529  		newValue = n.Value
   530  	}
   531  
   532  	e := newEvent(Update, nodePath, nextIndex, n.CreatedIndex)
   533  	e.EtcdIndex = nextIndex
   534  	e.PrevNode = n.Repr(false, false, s.clock)
   535  	eNode := e.Node
   536  
   537  	if err := n.Write(newValue, nextIndex); err != nil {
   538  		return nil, fmt.Errorf("nodePath %v : %v", nodePath, err)
   539  	}
   540  
   541  	if n.IsDir() {
   542  		eNode.Dir = true
   543  	} else {
   544  		// copy the value for safety
   545  		newValueCopy := newValue
   546  		eNode.Value = &newValueCopy
   547  	}
   548  
   549  	// update ttl
   550  	n.UpdateTTL(expireOpts.ExpireTime)
   551  
   552  	eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
   553  
   554  	if !expireOpts.Refresh {
   555  		s.WatcherHub.notify(e)
   556  	} else {
   557  		e.SetRefresh()
   558  		s.WatcherHub.add(e)
   559  	}
   560  
   561  	s.CurrentIndex = nextIndex
   562  
   563  	return e, nil
   564  }
   565  
   566  func (s *store) internalCreate(nodePath string, dir bool, value string, unique, replace bool,
   567  	expireTime time.Time, action string) (*Event, *v2error.Error) {
   568  
   569  	currIndex, nextIndex := s.CurrentIndex, s.CurrentIndex+1
   570  
   571  	if unique { // append unique item under the node path
   572  		nodePath += "/" + fmt.Sprintf("%020s", strconv.FormatUint(nextIndex, 10))
   573  	}
   574  
   575  	nodePath = path.Clean(path.Join("/", nodePath))
   576  
   577  	// we do not allow the user to change "/"
   578  	if s.readonlySet.Contains(nodePath) {
   579  		return nil, v2error.NewError(v2error.EcodeRootROnly, "/", currIndex)
   580  	}
   581  
   582  	// Assume expire times that are way in the past are
   583  	// This can occur when the time is serialized to JS
   584  	if expireTime.Before(minExpireTime) {
   585  		expireTime = Permanent
   586  	}
   587  
   588  	dirName, nodeName := path.Split(nodePath)
   589  
   590  	// walk through the nodePath, create dirs and get the last directory node
   591  	d, err := s.walk(dirName, s.checkDir)
   592  
   593  	if err != nil {
   594  		s.Stats.Inc(SetFail)
   595  		reportWriteFailure(action)
   596  		err.Index = currIndex
   597  		return nil, err
   598  	}
   599  
   600  	e := newEvent(action, nodePath, nextIndex, nextIndex)
   601  	eNode := e.Node
   602  
   603  	n, _ := d.GetChild(nodeName)
   604  
   605  	// force will try to replace an existing file
   606  	if n != nil {
   607  		if replace {
   608  			if n.IsDir() {
   609  				return nil, v2error.NewError(v2error.EcodeNotFile, nodePath, currIndex)
   610  			}
   611  			e.PrevNode = n.Repr(false, false, s.clock)
   612  
   613  			if err := n.Remove(false, false, nil); err != nil {
   614  				return nil, err
   615  			}
   616  		} else {
   617  			return nil, v2error.NewError(v2error.EcodeNodeExist, nodePath, currIndex)
   618  		}
   619  	}
   620  
   621  	if !dir { // create file
   622  		// copy the value for safety
   623  		valueCopy := value
   624  		eNode.Value = &valueCopy
   625  
   626  		n = newKV(s, nodePath, value, nextIndex, d, expireTime)
   627  
   628  	} else { // create directory
   629  		eNode.Dir = true
   630  
   631  		n = newDir(s, nodePath, nextIndex, d, expireTime)
   632  	}
   633  
   634  	// we are sure d is a directory and does not have the children with name n.Name
   635  	if err := d.Add(n); err != nil {
   636  		return nil, err
   637  	}
   638  
   639  	// node with TTL
   640  	if !n.IsPermanent() {
   641  		s.ttlKeyHeap.push(n)
   642  
   643  		eNode.Expiration, eNode.TTL = n.expirationAndTTL(s.clock)
   644  	}
   645  
   646  	s.CurrentIndex = nextIndex
   647  
   648  	return e, nil
   649  }
   650  
   651  // InternalGet gets the node of the given nodePath.
   652  func (s *store) internalGet(nodePath string) (*node, *v2error.Error) {
   653  	nodePath = path.Clean(path.Join("/", nodePath))
   654  
   655  	walkFunc := func(parent *node, name string) (*node, *v2error.Error) {
   656  
   657  		if !parent.IsDir() {
   658  			err := v2error.NewError(v2error.EcodeNotDir, parent.Path, s.CurrentIndex)
   659  			return nil, err
   660  		}
   661  
   662  		child, ok := parent.Children[name]
   663  		if ok {
   664  			return child, nil
   665  		}
   666  
   667  		return nil, v2error.NewError(v2error.EcodeKeyNotFound, path.Join(parent.Path, name), s.CurrentIndex)
   668  	}
   669  
   670  	f, err := s.walk(nodePath, walkFunc)
   671  
   672  	if err != nil {
   673  		return nil, err
   674  	}
   675  	return f, nil
   676  }
   677  
   678  // DeleteExpiredKeys will delete all expired keys
   679  func (s *store) DeleteExpiredKeys(cutoff time.Time) {
   680  	s.worldLock.Lock()
   681  	defer s.worldLock.Unlock()
   682  
   683  	for {
   684  		node := s.ttlKeyHeap.top()
   685  		if node == nil || node.ExpireTime.After(cutoff) {
   686  			break
   687  		}
   688  
   689  		s.CurrentIndex++
   690  		e := newEvent(Expire, node.Path, s.CurrentIndex, node.CreatedIndex)
   691  		e.EtcdIndex = s.CurrentIndex
   692  		e.PrevNode = node.Repr(false, false, s.clock)
   693  		if node.IsDir() {
   694  			e.Node.Dir = true
   695  		}
   696  
   697  		callback := func(path string) { // notify function
   698  			// notify the watchers with deleted set true
   699  			s.WatcherHub.notifyWatchers(e, path, true)
   700  		}
   701  
   702  		s.ttlKeyHeap.pop()
   703  		node.Remove(true, true, callback)
   704  
   705  		reportExpiredKey()
   706  		s.Stats.Inc(ExpireCount)
   707  
   708  		s.WatcherHub.notify(e)
   709  	}
   710  
   711  }
   712  
   713  // checkDir will check whether the component is a directory under parent node.
   714  // If it is a directory, this function will return the pointer to that node.
   715  // If it does not exist, this function will create a new directory and return the pointer to that node.
   716  // If it is a file, this function will return error.
   717  func (s *store) checkDir(parent *node, dirName string) (*node, *v2error.Error) {
   718  	node, ok := parent.Children[dirName]
   719  
   720  	if ok {
   721  		if node.IsDir() {
   722  			return node, nil
   723  		}
   724  
   725  		return nil, v2error.NewError(v2error.EcodeNotDir, node.Path, s.CurrentIndex)
   726  	}
   727  
   728  	n := newDir(s, path.Join(parent.Path, dirName), s.CurrentIndex+1, parent, Permanent)
   729  
   730  	parent.Children[dirName] = n
   731  
   732  	return n, nil
   733  }
   734  
   735  // Save saves the static state of the store system.
   736  // It will not be able to save the state of watchers.
   737  // It will not save the parent field of the node. Or there will
   738  // be cyclic dependencies issue for the json package.
   739  func (s *store) Save() ([]byte, error) {
   740  	b, err := json.Marshal(s.Clone())
   741  	if err != nil {
   742  		return nil, err
   743  	}
   744  
   745  	return b, nil
   746  }
   747  
   748  func (s *store) SaveNoCopy() ([]byte, error) {
   749  	b, err := json.Marshal(s)
   750  	if err != nil {
   751  		return nil, err
   752  	}
   753  
   754  	return b, nil
   755  }
   756  
   757  func (s *store) Clone() Store {
   758  	s.worldLock.RLock()
   759  
   760  	clonedStore := newStore()
   761  	clonedStore.CurrentIndex = s.CurrentIndex
   762  	clonedStore.Root = s.Root.Clone()
   763  	clonedStore.WatcherHub = s.WatcherHub.clone()
   764  	clonedStore.Stats = s.Stats.clone()
   765  	clonedStore.CurrentVersion = s.CurrentVersion
   766  
   767  	s.worldLock.RUnlock()
   768  	return clonedStore
   769  }
   770  
   771  // Recovery recovers the store system from a static state
   772  // It needs to recover the parent field of the nodes.
   773  // It needs to delete the expired nodes since the saved time and also
   774  // needs to create monitoring goroutines.
   775  func (s *store) Recovery(state []byte) error {
   776  	s.worldLock.Lock()
   777  	defer s.worldLock.Unlock()
   778  	err := json.Unmarshal(state, s)
   779  
   780  	if err != nil {
   781  		return err
   782  	}
   783  
   784  	s.ttlKeyHeap = newTtlKeyHeap()
   785  
   786  	s.Root.recoverAndclean()
   787  	return nil
   788  }
   789  
   790  func (s *store) JsonStats() []byte {
   791  	s.Stats.Watchers = uint64(s.WatcherHub.count)
   792  	return s.Stats.toJson()
   793  }
   794  
   795  func (s *store) HasTTLKeys() bool {
   796  	s.worldLock.RLock()
   797  	defer s.worldLock.RUnlock()
   798  	return s.ttlKeyHeap.Len() != 0
   799  }
   800  

View as plain text