...

Source file src/github.com/LINBIT/golinstor/monitor/lostresourceuser.go

Documentation: github.com/LINBIT/golinstor/monitor

     1  package monitor
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  	"time"
     7  
     8  	"github.com/LINBIT/golinstor/client"
     9  	"github.com/LINBIT/golinstor/devicelayerkind"
    10  )
    11  
    12  type resourceState struct {
    13  	hasQuorum bool
    14  	isWatched bool
    15  }
    16  
    17  type haResources struct {
    18  	resources map[string]resourceState
    19  	sync.Mutex
    20  }
    21  
    22  // LostResourceUser is a struct that exposes the "may promote" state of a DRBD resource
    23  // If a resource may be promoted (i.e., may be switched to Primary) after some grace period, this usually means that its user (that had the resource promoted) failed. It could also happen that the user just terminated/gets rescheduled,... It is up to the user of this API to decide.
    24  // This also means that the user (e.g., some k8s pod) needs to be restarted/rescheduled.
    25  // The LostResourceUser is generic, it sends the names of resources that lost their user on the channel C.
    26  type LostResourceUser struct {
    27  	ctx              context.Context
    28  	cancel           context.CancelFunc
    29  	client           *client.Client
    30  	mayPromoteStream *client.DRBDMayPromoteStream
    31  	haResources      haResources
    32  	initialDelay     time.Duration
    33  	existingDelay    time.Duration
    34  
    35  	C chan string // DRBD resource names of resources that may be promoted.
    36  }
    37  
    38  const (
    39  	INITIAL_DELAY_DEFAULT  = 1 * time.Minute
    40  	EXISTING_DELAY_DEFAULT = 45 * time.Second
    41  )
    42  
    43  // Option represents a configuration option of the LostResourceUser
    44  type Option func(*LostResourceUser) error
    45  
    46  // WithDelay sets the "initial delay" (for not yet seen resources) and the "existing delay" (for already known resources).
    47  func WithDelay(initial, existing time.Duration) Option {
    48  	return func(ha *LostResourceUser) error {
    49  		ha.initialDelay = initial
    50  		ha.existingDelay = existing
    51  		return nil
    52  	}
    53  }
    54  
    55  // NewLostResourceUser creates a new LostResourceUser. It takes a context, a Go LINSTOR client, and options as its input.
    56  func NewLostResourceUser(ctx context.Context, client *client.Client, options ...Option) (*LostResourceUser, error) {
    57  	// TODO: we only add to the map, we should have a GC that iterates over all the non-watched(?) and rms them.
    58  	mayPromoteStream, err := client.Events.DRBDPromotion(ctx, "current")
    59  	if err != nil {
    60  		return nil, err
    61  	}
    62  
    63  	ctx, cancel := context.WithCancel(ctx)
    64  
    65  	lr := &LostResourceUser{
    66  		ctx:              ctx,
    67  		cancel:           cancel,
    68  		client:           client,
    69  		mayPromoteStream: mayPromoteStream,
    70  		// haResources:      haResources,
    71  		haResources: haResources{
    72  			resources: make(map[string]resourceState),
    73  		},
    74  		initialDelay:  INITIAL_DELAY_DEFAULT,
    75  		existingDelay: EXISTING_DELAY_DEFAULT,
    76  		C:             make(chan string),
    77  	}
    78  
    79  	for _, opt := range options {
    80  		if err := opt(lr); err != nil {
    81  			return nil, err
    82  		}
    83  	}
    84  
    85  	go func() {
    86  		for {
    87  			select {
    88  			case ev, ok := <-lr.mayPromoteStream.Events:
    89  				if !ok {
    90  					lr.Stop()
    91  					close(lr.C)
    92  					return
    93  				}
    94  				if !ev.MayPromote {
    95  					continue
    96  				}
    97  
    98  				resName := ev.ResourceName
    99  
   100  				watch, dur := lr.resShouldWatch(resName)
   101  				if !watch {
   102  					continue
   103  				}
   104  				go lr.watch(resName, dur)
   105  			case <-lr.ctx.Done():
   106  				lr.mayPromoteStream.Close()
   107  				// would now receive in the event case, so
   108  				close(lr.C)
   109  				return
   110  			}
   111  		}
   112  	}()
   113  
   114  	return lr, nil
   115  }
   116  
   117  // Stop terminates all helper Go routines and closes the connection to the events stream.
   118  func (rl *LostResourceUser) Stop() {
   119  	rl.cancel()
   120  }
   121  
   122  func (lr *LostResourceUser) watch(resName string, dur time.Duration) {
   123  	ticker := time.NewTicker(dur)
   124  	defer ticker.Stop()
   125  
   126  	select {
   127  	case <-ticker.C:
   128  		break
   129  	case <-lr.ctx.Done():
   130  		return
   131  	}
   132  
   133  	// reevaluate the current state
   134  	ress, err := lr.client.Resources.GetAll(lr.ctx, resName)
   135  	// here we might delete it, or reset isWatched
   136  	lr.haResources.Lock()
   137  	defer lr.haResources.Unlock()
   138  
   139  	if err == client.NotFoundError {
   140  		// looks like it got deleted. but anyways, nothing we can do, rm it from our dict
   141  		delete(lr.haResources.resources, resName)
   142  		return
   143  	} else if err != nil {
   144  		lr.Stop()
   145  		return
   146  	}
   147  
   148  	oneMayPromote := false
   149  	for _, r := range ress {
   150  		if r.LayerObject.Type != devicelayerkind.Drbd {
   151  			delete(lr.haResources.resources, resName)
   152  			return
   153  		}
   154  		if r.LayerObject.Drbd.MayPromote {
   155  			oneMayPromote = true
   156  			break
   157  		}
   158  	}
   159  
   160  	if oneMayPromote {
   161  		lr.C <- resName
   162  	}
   163  
   164  	res := lr.haResources.resources[resName]
   165  	// if we introduce a GC we need to check for ok here ^^
   166  	// but currently all the deletes are here under this lock
   167  	res.isWatched = false
   168  	lr.haResources.resources[resName] = res
   169  }
   170  
   171  func (ha *LostResourceUser) resHasQuorum(resName string) (bool, error) {
   172  	rd, err := ha.client.ResourceDefinitions.Get(ha.ctx, resName)
   173  	if err != nil {
   174  		return false, err
   175  	}
   176  
   177  	val, ok := rd.Props["DrbdOptions/Resource/quorum"]
   178  	if !ok || val == "off" {
   179  		return false, nil
   180  	}
   181  
   182  	return true, nil
   183  }
   184  
   185  func (ha *LostResourceUser) resShouldWatch(resName string) (bool, time.Duration) {
   186  	long, short := ha.initialDelay, ha.existingDelay
   187  
   188  	ha.haResources.Lock()
   189  	defer ha.haResources.Unlock()
   190  
   191  	res, ok := ha.haResources.resources[resName]
   192  
   193  	if ok { // existing resource
   194  		if res.isWatched {
   195  			return false, 0
   196  		}
   197  
   198  		if !res.hasQuorum {
   199  			return false, 0
   200  		}
   201  
   202  		res.isWatched = true
   203  		ha.haResources.resources[resName] = res
   204  		return true, short
   205  	}
   206  
   207  	// new resource
   208  	hasQuorum, err := ha.resHasQuorum(resName)
   209  	if err != nil {
   210  		// hope for better times...
   211  		return false, 0
   212  	}
   213  	// create the map entry
   214  	ha.haResources.resources[resName] = resourceState{
   215  		hasQuorum: hasQuorum,
   216  		isWatched: hasQuorum, // not a typo, if it hasQuorum, we will watch it
   217  	}
   218  	if !hasQuorum {
   219  		return false, 0
   220  	}
   221  	// new one with quorum, give it some time...
   222  	return true, long
   223  }
   224  

View as plain text