...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v2store/event_history.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v2store

     1  // Copyright 2015 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package v2store
    16  
    17  import (
    18  	"fmt"
    19  	"path"
    20  	"strings"
    21  	"sync"
    22  
    23  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2error"
    24  )
    25  
    26  type EventHistory struct {
    27  	Queue      eventQueue
    28  	StartIndex uint64
    29  	LastIndex  uint64
    30  	rwl        sync.RWMutex
    31  }
    32  
    33  func newEventHistory(capacity int) *EventHistory {
    34  	return &EventHistory{
    35  		Queue: eventQueue{
    36  			Capacity: capacity,
    37  			Events:   make([]*Event, capacity),
    38  		},
    39  	}
    40  }
    41  
    42  // addEvent function adds event into the eventHistory
    43  func (eh *EventHistory) addEvent(e *Event) *Event {
    44  	eh.rwl.Lock()
    45  	defer eh.rwl.Unlock()
    46  
    47  	eh.Queue.insert(e)
    48  
    49  	eh.LastIndex = e.Index()
    50  
    51  	eh.StartIndex = eh.Queue.Events[eh.Queue.Front].Index()
    52  
    53  	return e
    54  }
    55  
    56  // scan enumerates events from the index history and stops at the first point
    57  // where the key matches.
    58  func (eh *EventHistory) scan(key string, recursive bool, index uint64) (*Event, *v2error.Error) {
    59  	eh.rwl.RLock()
    60  	defer eh.rwl.RUnlock()
    61  
    62  	// index should be after the event history's StartIndex
    63  	if index < eh.StartIndex {
    64  		return nil,
    65  			v2error.NewError(v2error.EcodeEventIndexCleared,
    66  				fmt.Sprintf("the requested history has been cleared [%v/%v]",
    67  					eh.StartIndex, index), 0)
    68  	}
    69  
    70  	// the index should come before the size of the queue minus the duplicate count
    71  	if index > eh.LastIndex { // future index
    72  		return nil, nil
    73  	}
    74  
    75  	offset := index - eh.StartIndex
    76  	i := (eh.Queue.Front + int(offset)) % eh.Queue.Capacity
    77  
    78  	for {
    79  		e := eh.Queue.Events[i]
    80  
    81  		if !e.Refresh {
    82  			ok := e.Node.Key == key
    83  
    84  			if recursive {
    85  				// add tailing slash
    86  				nkey := path.Clean(key)
    87  				if nkey[len(nkey)-1] != '/' {
    88  					nkey = nkey + "/"
    89  				}
    90  
    91  				ok = ok || strings.HasPrefix(e.Node.Key, nkey)
    92  			}
    93  
    94  			if (e.Action == Delete || e.Action == Expire) && e.PrevNode != nil && e.PrevNode.Dir {
    95  				ok = ok || strings.HasPrefix(key, e.PrevNode.Key)
    96  			}
    97  
    98  			if ok {
    99  				return e, nil
   100  			}
   101  		}
   102  
   103  		i = (i + 1) % eh.Queue.Capacity
   104  
   105  		if i == eh.Queue.Back {
   106  			return nil, nil
   107  		}
   108  	}
   109  }
   110  
   111  // clone will be protected by a stop-world lock
   112  // do not need to obtain internal lock
   113  func (eh *EventHistory) clone() *EventHistory {
   114  	clonedQueue := eventQueue{
   115  		Capacity: eh.Queue.Capacity,
   116  		Events:   make([]*Event, eh.Queue.Capacity),
   117  		Size:     eh.Queue.Size,
   118  		Front:    eh.Queue.Front,
   119  		Back:     eh.Queue.Back,
   120  	}
   121  
   122  	copy(clonedQueue.Events, eh.Queue.Events)
   123  	return &EventHistory{
   124  		StartIndex: eh.StartIndex,
   125  		Queue:      clonedQueue,
   126  		LastIndex:  eh.LastIndex,
   127  	}
   128  
   129  }
   130  

View as plain text