...

Source file src/go.etcd.io/etcd/client/v3/concurrency/mutex.go

Documentation: go.etcd.io/etcd/client/v3/concurrency

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package concurrency
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"sync"
    22  
    23  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    24  	v3 "go.etcd.io/etcd/client/v3"
    25  )
    26  
    27  // ErrLocked is returned by TryLock when Mutex is already locked by another session.
    28  var ErrLocked = errors.New("mutex: Locked by another session")
    29  var ErrSessionExpired = errors.New("mutex: session is expired")
    30  
    31  // Mutex implements the sync Locker interface with etcd
    32  type Mutex struct {
    33  	s *Session
    34  
    35  	pfx   string
    36  	myKey string
    37  	myRev int64
    38  	hdr   *pb.ResponseHeader
    39  }
    40  
    41  func NewMutex(s *Session, pfx string) *Mutex {
    42  	return &Mutex{s, pfx + "/", "", -1, nil}
    43  }
    44  
    45  // TryLock locks the mutex if not already locked by another session.
    46  // If lock is held by another session, return immediately after attempting necessary cleanup
    47  // The ctx argument is used for the sending/receiving Txn RPC.
    48  func (m *Mutex) TryLock(ctx context.Context) error {
    49  	resp, err := m.tryAcquire(ctx)
    50  	if err != nil {
    51  		return err
    52  	}
    53  	// if no key on prefix / the minimum rev is key, already hold the lock
    54  	ownerKey := resp.Responses[1].GetResponseRange().Kvs
    55  	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    56  		m.hdr = resp.Header
    57  		return nil
    58  	}
    59  	client := m.s.Client()
    60  	// Cannot lock, so delete the key
    61  	if _, err := client.Delete(ctx, m.myKey); err != nil {
    62  		return err
    63  	}
    64  	m.myKey = "\x00"
    65  	m.myRev = -1
    66  	return ErrLocked
    67  }
    68  
    69  // Lock locks the mutex with a cancelable context. If the context is canceled
    70  // while trying to acquire the lock, the mutex tries to clean its stale lock entry.
    71  func (m *Mutex) Lock(ctx context.Context) error {
    72  	resp, err := m.tryAcquire(ctx)
    73  	if err != nil {
    74  		return err
    75  	}
    76  	// if no key on prefix / the minimum rev is key, already hold the lock
    77  	ownerKey := resp.Responses[1].GetResponseRange().Kvs
    78  	if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
    79  		m.hdr = resp.Header
    80  		return nil
    81  	}
    82  	client := m.s.Client()
    83  	// wait for deletion revisions prior to myKey
    84  	// TODO: early termination if the session key is deleted before other session keys with smaller revisions.
    85  	_, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    86  	// release lock key if wait failed
    87  	if werr != nil {
    88  		m.Unlock(client.Ctx())
    89  		return werr
    90  	}
    91  
    92  	// make sure the session is not expired, and the owner key still exists.
    93  	gresp, werr := client.Get(ctx, m.myKey)
    94  	if werr != nil {
    95  		m.Unlock(client.Ctx())
    96  		return werr
    97  	}
    98  
    99  	if len(gresp.Kvs) == 0 { // is the session key lost?
   100  		return ErrSessionExpired
   101  	}
   102  	m.hdr = gresp.Header
   103  
   104  	return nil
   105  }
   106  
   107  func (m *Mutex) tryAcquire(ctx context.Context) (*v3.TxnResponse, error) {
   108  	s := m.s
   109  	client := m.s.Client()
   110  
   111  	m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
   112  	cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
   113  	// put self in lock waiters via myKey; oldest waiter holds lock
   114  	put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
   115  	// reuse key in case this session already holds the lock
   116  	get := v3.OpGet(m.myKey)
   117  	// fetch current holder to complete uncontended path with only one RPC
   118  	getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
   119  	resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
   120  	if err != nil {
   121  		return nil, err
   122  	}
   123  	m.myRev = resp.Header.Revision
   124  	if !resp.Succeeded {
   125  		m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
   126  	}
   127  	return resp, nil
   128  }
   129  
   130  func (m *Mutex) Unlock(ctx context.Context) error {
   131  	client := m.s.Client()
   132  	if _, err := client.Delete(ctx, m.myKey); err != nil {
   133  		return err
   134  	}
   135  	m.myKey = "\x00"
   136  	m.myRev = -1
   137  	return nil
   138  }
   139  
   140  func (m *Mutex) IsOwner() v3.Cmp {
   141  	return v3.Compare(v3.CreateRevision(m.myKey), "=", m.myRev)
   142  }
   143  
   144  func (m *Mutex) Key() string { return m.myKey }
   145  
   146  // Header is the response header received from etcd on acquiring the lock.
   147  func (m *Mutex) Header() *pb.ResponseHeader { return m.hdr }
   148  
   149  type lockerMutex struct{ *Mutex }
   150  
   151  func (lm *lockerMutex) Lock() {
   152  	client := lm.s.Client()
   153  	if err := lm.Mutex.Lock(client.Ctx()); err != nil {
   154  		panic(err)
   155  	}
   156  }
   157  func (lm *lockerMutex) Unlock() {
   158  	client := lm.s.Client()
   159  	if err := lm.Mutex.Unlock(client.Ctx()); err != nil {
   160  		panic(err)
   161  	}
   162  }
   163  
   164  // NewLocker creates a sync.Locker backed by an etcd mutex.
   165  func NewLocker(s *Session, pfx string) sync.Locker {
   166  	return &lockerMutex{NewMutex(s, pfx)}
   167  }
   168  

View as plain text