...

Source file src/go.etcd.io/etcd/client/v3/experimental/recipes/double_barrier.go

Documentation: go.etcd.io/etcd/client/v3/experimental/recipes

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package recipe
    16  
    17  import (
    18  	"context"
    19  
    20  	"go.etcd.io/etcd/api/v3/mvccpb"
    21  	"go.etcd.io/etcd/client/v3"
    22  	"go.etcd.io/etcd/client/v3/concurrency"
    23  )
    24  
    25  // DoubleBarrier blocks processes on Enter until an expected count enters, then
    26  // blocks again on Leave until all processes have left.
    27  type DoubleBarrier struct {
    28  	s   *concurrency.Session
    29  	ctx context.Context
    30  
    31  	key   string // key for the collective barrier
    32  	count int
    33  	myKey *EphemeralKV // current key for this process on the barrier
    34  }
    35  
    36  func NewDoubleBarrier(s *concurrency.Session, key string, count int) *DoubleBarrier {
    37  	return &DoubleBarrier{
    38  		s:     s,
    39  		ctx:   context.TODO(),
    40  		key:   key,
    41  		count: count,
    42  	}
    43  }
    44  
    45  // Enter waits for "count" processes to enter the barrier then returns
    46  func (b *DoubleBarrier) Enter() error {
    47  	client := b.s.Client()
    48  
    49  	// Check the entered clients before creating the UniqueEphemeralKey,
    50  	// fail the request if there are already too many clients.
    51  	if resp1, err := b.enteredClients(client); err != nil {
    52  		return err
    53  	} else if len(resp1.Kvs) >= b.count {
    54  		return ErrTooManyClients
    55  	}
    56  
    57  	ek, err := newUniqueEphemeralKey(b.s, b.key+"/waiters")
    58  	if err != nil {
    59  		return err
    60  	}
    61  	b.myKey = ek
    62  
    63  	// Check the entered clients after creating the UniqueEphemeralKey
    64  	resp2, err := b.enteredClients(client)
    65  	if err != nil {
    66  		return err
    67  	}
    68  	if len(resp2.Kvs) >= b.count {
    69  		lastWaiter := resp2.Kvs[b.count-1]
    70  		if ek.rev > lastWaiter.CreateRevision {
    71  			// delete itself now, otherwise other processes may need to wait
    72  			// until these keys are automatically deleted when the related
    73  			// lease expires.
    74  			if err = b.myKey.Delete(); err != nil {
    75  				// Nothing to do here. We have to wait for the key to be
    76  				// deleted when the lease expires.
    77  			}
    78  			return ErrTooManyClients
    79  		}
    80  
    81  		if ek.rev == lastWaiter.CreateRevision {
    82  			// TODO(ahrtr): we might need to compare ek.key and
    83  			// string(lastWaiter.Key), they should be equal.
    84  			// unblock all other waiters
    85  			_, err = client.Put(b.ctx, b.key+"/ready", "")
    86  			return err
    87  		}
    88  	}
    89  
    90  	_, err = WaitEvents(
    91  		client,
    92  		b.key+"/ready",
    93  		ek.Revision(),
    94  		[]mvccpb.Event_EventType{mvccpb.PUT})
    95  	return err
    96  }
    97  
    98  // enteredClients gets all the entered clients, which are ordered by the
    99  // createRevision in ascending order.
   100  func (b *DoubleBarrier) enteredClients(cli *clientv3.Client) (*clientv3.GetResponse, error) {
   101  	resp, err := cli.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix(),
   102  		clientv3.WithSort(clientv3.SortByCreateRevision, clientv3.SortAscend))
   103  	if err != nil {
   104  		return nil, err
   105  	}
   106  
   107  	return resp, nil
   108  }
   109  
   110  // Leave waits for "count" processes to leave the barrier then returns
   111  func (b *DoubleBarrier) Leave() error {
   112  	client := b.s.Client()
   113  	resp, err := client.Get(b.ctx, b.key+"/waiters", clientv3.WithPrefix())
   114  	if err != nil {
   115  		return err
   116  	}
   117  	if len(resp.Kvs) == 0 {
   118  		return nil
   119  	}
   120  
   121  	lowest, highest := resp.Kvs[0], resp.Kvs[0]
   122  	for _, k := range resp.Kvs {
   123  		if k.ModRevision < lowest.ModRevision {
   124  			lowest = k
   125  		}
   126  		if k.ModRevision > highest.ModRevision {
   127  			highest = k
   128  		}
   129  	}
   130  	isLowest := string(lowest.Key) == b.myKey.Key()
   131  
   132  	if len(resp.Kvs) == 1 && isLowest {
   133  		// this is the only node in the barrier; finish up
   134  		if _, err = client.Delete(b.ctx, b.key+"/ready"); err != nil {
   135  			return err
   136  		}
   137  		return b.myKey.Delete()
   138  	}
   139  
   140  	// this ensures that if a process fails, the ephemeral lease will be
   141  	// revoked, its barrier key is removed, and the barrier can resume
   142  
   143  	// lowest process in node => wait on highest process
   144  	if isLowest {
   145  		_, err = WaitEvents(
   146  			client,
   147  			string(highest.Key),
   148  			highest.ModRevision,
   149  			[]mvccpb.Event_EventType{mvccpb.DELETE})
   150  		if err != nil {
   151  			return err
   152  		}
   153  		return b.Leave()
   154  	}
   155  
   156  	// delete self and wait on lowest process
   157  	if err = b.myKey.Delete(); err != nil {
   158  		return err
   159  	}
   160  
   161  	key := string(lowest.Key)
   162  	_, err = WaitEvents(
   163  		client,
   164  		key,
   165  		lowest.ModRevision,
   166  		[]mvccpb.Event_EventType{mvccpb.DELETE})
   167  	if err != nil {
   168  		return err
   169  	}
   170  	return b.Leave()
   171  }
   172  

View as plain text