...

Source file src/go.etcd.io/etcd/client/v3/experimental/recipes/rwmutex.go

Documentation: go.etcd.io/etcd/client/v3/experimental/recipes

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package recipe
    16  
    17  import (
    18  	"context"
    19  
    20  	"go.etcd.io/etcd/api/v3/mvccpb"
    21  	v3 "go.etcd.io/etcd/client/v3"
    22  	"go.etcd.io/etcd/client/v3/concurrency"
    23  )
    24  
    25  type RWMutex struct {
    26  	s   *concurrency.Session
    27  	ctx context.Context
    28  
    29  	pfx   string
    30  	myKey *EphemeralKV
    31  }
    32  
    33  func NewRWMutex(s *concurrency.Session, prefix string) *RWMutex {
    34  	return &RWMutex{s, context.TODO(), prefix + "/", nil}
    35  }
    36  
    37  func (rwm *RWMutex) RLock() error {
    38  	rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"read")
    39  	if err != nil {
    40  		return err
    41  	}
    42  	rwm.myKey = rk
    43  	// wait until nodes with "write-" and a lower revision number than myKey are gone
    44  	for {
    45  		if done, werr := rwm.waitOnLastRev(rwm.pfx + "write"); done || werr != nil {
    46  			return werr
    47  		}
    48  	}
    49  }
    50  
    51  func (rwm *RWMutex) Lock() error {
    52  	rk, err := newUniqueEphemeralKey(rwm.s, rwm.pfx+"write")
    53  	if err != nil {
    54  		return err
    55  	}
    56  	rwm.myKey = rk
    57  	// wait until all keys of lower revision than myKey are gone
    58  	for {
    59  		if done, werr := rwm.waitOnLastRev(rwm.pfx); done || werr != nil {
    60  			return werr
    61  		}
    62  		//  get the new lowest key until this is the only one left
    63  	}
    64  }
    65  
    66  // waitOnLowest will wait on the last key with a revision < rwm.myKey.Revision with a
    67  // given prefix. If there are no keys left to wait on, return true.
    68  func (rwm *RWMutex) waitOnLastRev(pfx string) (bool, error) {
    69  	client := rwm.s.Client()
    70  	// get key that's blocking myKey
    71  	opts := append(v3.WithLastRev(), v3.WithMaxModRev(rwm.myKey.Revision()-1))
    72  	lastKey, err := client.Get(rwm.ctx, pfx, opts...)
    73  	if err != nil {
    74  		return false, err
    75  	}
    76  	if len(lastKey.Kvs) == 0 {
    77  		return true, nil
    78  	}
    79  	// wait for release on blocking key
    80  	_, err = WaitEvents(
    81  		client,
    82  		string(lastKey.Kvs[0].Key),
    83  		rwm.myKey.Revision(),
    84  		[]mvccpb.Event_EventType{mvccpb.DELETE})
    85  	return false, err
    86  }
    87  
    88  func (rwm *RWMutex) RUnlock() error { return rwm.myKey.Delete() }
    89  func (rwm *RWMutex) Unlock() error  { return rwm.myKey.Delete() }
    90  

View as plain text