const ( // Use ConcurrentReadTx and the txReadBuffer is copied ConcurrentReadTxMode = ReadTxMode(1) // Use backend ReadTx and txReadBuffer is not copied = ReadTxMode(2) )
var ( ErrCompacted = errors.New("mvcc: required revision has been compacted") ErrFutureRev = errors.New("mvcc: required revision is a future revision") )
var ( ErrWatcherNotExist = errors.New("mvcc: watcher does not exist") ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty") ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream") )
var ( ErrRevisionNotFound = errors.New("mvcc: revision not found") )
func NewStore(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) *store
NewStore returns a new store. It is useful to create a store inside mvcc pkg. It should only be used for testing externally.
func ReportEventReceived(n int)
ReportEventReceived reports that an event is received. This function should be called when the external systems received an event from mvcc.Watcher.
func UnsafeReadFinishedCompact(tx backend.ReadTx) (int64, bool)
func UnsafeReadScheduledCompact(tx backend.ReadTx) (int64, bool)
func UnsafeSetScheduledCompact(tx backend.BatchTx, value int64)
func WriteKV(be backend.Backend, kv mvccpb.KeyValue)
FilterFunc returns true if the given event should be filtered out.
type FilterFunc func(e mvccpb.Event) bool
type HashStorage interface { // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) // HashByRev computes the hash of all MVCC revisions up to a given revision. HashByRev(rev int64) (hash KeyValueHash, currentRev int64, err error) // Store adds hash value in local cache, allowing it can be returned by HashByRev. Store(valueHash KeyValueHash) // Hashes returns list of up to `hashStorageMaxSize` newest previously stored hashes. Hashes() []KeyValueHash }
type KV interface { ReadView WriteView // Read creates a read transaction. Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite // HashStorage returns HashStorage interface for KV storage. HashStorage() HashStorage // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() // Restore restores the KV store from a backend. Restore(b backend.Backend) error Close() error }
type KeyValueHash struct { Hash uint32 CompactRevision int64 Revision int64 }
type RangeOptions struct { Limit int64 Rev int64 Count bool }
type RangeResult struct { KVs []mvccpb.KeyValue Rev int64 Count int }
type ReadTxMode uint32
type ReadView interface { // FirstRev returns the first KV revision at the time of opening the txn. // After a compaction, the first revision increases to the compaction // revision. FirstRev() int64 // Rev returns the revision of the KV at the time of opening the txn. Rev() int64 // Range gets the keys in the range at rangeRev. // The returned rev is the current revision of the KV when the operation is executed. // If rangeRev <=0, range gets the keys at currentRev. // If `end` is nil, the request returns the key. // If `end` is not nil and not empty, it gets the keys in range [key, range_end). // If `end` is not nil and empty, it gets the keys greater than or equal to key. // Limit limits the number of keys returned. // If the required rev is compacted, ErrCompacted will be returned. Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) }
type StoreConfig struct { CompactionBatchLimit int }
TxnRead represents a read-only transaction with operations that will not block other read transactions.
type TxnRead interface { ReadView // End marks the transaction is complete and ready to commit. End() }
TxnWrite represents a transaction that can modify the store.
type TxnWrite interface { TxnRead WriteView // Changes gets the changes made since opening the write txn. Changes() []mvccpb.KeyValue }
func NewReadOnlyTxnWrite(txn TxnRead) TxnWrite
type WatchID int64
type WatchResponse struct { // WatchID is the WatchID of the watcher this response sent to. WatchID WatchID // Events contains all the events that needs to send. Events []mvccpb.Event // Revision is the revision of the KV when the watchResponse is created. // For a normal response, the revision should be the same as the last // modified revision inside Events. For a delayed response to a unsynced // watcher, the revision is greater than the last modified revision // inside Events. Revision int64 // CompactRevision is set when the watcher is cancelled due to compaction. CompactRevision int64 }
type WatchStream interface { // Watch creates a watcher. The watcher watches the events happening or // happened on the given key or range [key, end) from the given startRev. // // The whole event history can be watched unless compacted. // If "startRev" <=0, watch observes events after currentRev. // // The returned "id" is the ID of this watcher. It appears as WatchID // in events that are sent to the created watcher through stream channel. // The watch ID is used when it's not equal to AutoWatchID. Otherwise, // an auto-generated watch ID is returned. Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) // Chan returns a chan. All watch response will be sent to the returned chan. Chan() <-chan WatchResponse // RequestProgress requests the progress of the watcher with given ID. The response // will only be sent if the watcher is currently synced. // The responses will be sent through the WatchRespone Chan attached // with this stream to ensure correct ordering. // The responses contains no events. The revision in the response is the progress // of the watchers since the watcher is currently synced. RequestProgress(id WatchID) // RequestProgressAll requests a progress notification for all // watchers sharing the stream. If all watchers are synced, a // progress notification with watch ID -1 will be sent to an // arbitrary watcher of this stream, and the function returns // true. RequestProgressAll() bool // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be // returned. Cancel(id WatchID) error // Close closes Chan and release all related resources. Close() // Rev returns the current revision of the KV the stream watches on. Rev() int64 }
Watchable is the interface that wraps the NewWatchStream function.
type Watchable interface { // NewWatchStream returns a WatchStream that can be used to // watch events happened or happening on the KV. NewWatchStream() WatchStream }
WatchableKV is a KV that can be watched.
type WatchableKV interface { KV Watchable }
func New(lg *zap.Logger, b backend.Backend, le lease.Lessor, cfg StoreConfig) WatchableKV
type WriteView interface { // DeleteRange deletes the given range from the store. // A deleteRange increases the rev of the store if any key in the range exists. // The number of key deleted will be returned. // The returned rev is the current revision of the KV when the operation is executed. // It also generates one event for each key delete in the event history. // if the `end` is nil, deleteRange deletes the key. // if the `end` is not nil, deleteRange deletes the keys in range [key, range_end). DeleteRange(key, end []byte) (n, rev int64) // Put puts the given key, value into the store. Put also takes additional argument lease to // attach a lease to a key-value pair as meta-data. KV implementation does not validate the lease // id. // A put also increases the rev of the store, and generates one event in the event history. // The returned rev is the current revision of the KV when the operation is executed. Put(key, value []byte, lease lease.LeaseID) (rev int64) }