...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm/alarms.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package v3alarm manages health status alarms in etcd.
    16  package v3alarm
    17  
    18  import (
    19  	"sync"
    20  
    21  	pb "go.etcd.io/etcd/api/v3/etcdserverpb"
    22  	"go.etcd.io/etcd/client/pkg/v3/types"
    23  	"go.etcd.io/etcd/server/v3/mvcc/backend"
    24  	"go.etcd.io/etcd/server/v3/mvcc/buckets"
    25  
    26  	"go.uber.org/zap"
    27  )
    28  
    29  type BackendGetter interface {
    30  	Backend() backend.Backend
    31  }
    32  
    33  type alarmSet map[types.ID]*pb.AlarmMember
    34  
    35  // AlarmStore persists alarms to the backend.
    36  type AlarmStore struct {
    37  	lg    *zap.Logger
    38  	mu    sync.Mutex
    39  	types map[pb.AlarmType]alarmSet
    40  
    41  	bg BackendGetter
    42  }
    43  
    44  func NewAlarmStore(lg *zap.Logger, bg BackendGetter) (*AlarmStore, error) {
    45  	if lg == nil {
    46  		lg = zap.NewNop()
    47  	}
    48  	ret := &AlarmStore{lg: lg, types: make(map[pb.AlarmType]alarmSet), bg: bg}
    49  	err := ret.restore()
    50  	return ret, err
    51  }
    52  
    53  func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
    54  	a.mu.Lock()
    55  	defer a.mu.Unlock()
    56  
    57  	newAlarm := &pb.AlarmMember{MemberID: uint64(id), Alarm: at}
    58  	if m := a.addToMap(newAlarm); m != newAlarm {
    59  		return m
    60  	}
    61  
    62  	v, err := newAlarm.Marshal()
    63  	if err != nil {
    64  		a.lg.Panic("failed to marshal alarm member", zap.Error(err))
    65  	}
    66  
    67  	b := a.bg.Backend()
    68  	b.BatchTx().LockInsideApply()
    69  	b.BatchTx().UnsafePut(buckets.Alarm, v, nil)
    70  	b.BatchTx().Unlock()
    71  
    72  	return newAlarm
    73  }
    74  
    75  func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
    76  	a.mu.Lock()
    77  	defer a.mu.Unlock()
    78  
    79  	t := a.types[at]
    80  	if t == nil {
    81  		t = make(alarmSet)
    82  		a.types[at] = t
    83  	}
    84  	m := t[id]
    85  	if m == nil {
    86  		return nil
    87  	}
    88  
    89  	delete(t, id)
    90  
    91  	v, err := m.Marshal()
    92  	if err != nil {
    93  		a.lg.Panic("failed to marshal alarm member", zap.Error(err))
    94  	}
    95  
    96  	b := a.bg.Backend()
    97  	b.BatchTx().LockInsideApply()
    98  	b.BatchTx().UnsafeDelete(buckets.Alarm, v)
    99  	b.BatchTx().Unlock()
   100  
   101  	return m
   102  }
   103  
   104  func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
   105  	a.mu.Lock()
   106  	defer a.mu.Unlock()
   107  	if at == pb.AlarmType_NONE {
   108  		for _, t := range a.types {
   109  			for _, m := range t {
   110  				ret = append(ret, m)
   111  			}
   112  		}
   113  		return ret
   114  	}
   115  	for _, m := range a.types[at] {
   116  		ret = append(ret, m)
   117  	}
   118  	return ret
   119  }
   120  
   121  func (a *AlarmStore) restore() error {
   122  	b := a.bg.Backend()
   123  	tx := b.BatchTx()
   124  
   125  	tx.LockOutsideApply()
   126  	tx.UnsafeCreateBucket(buckets.Alarm)
   127  	err := tx.UnsafeForEach(buckets.Alarm, func(k, v []byte) error {
   128  		var m pb.AlarmMember
   129  		if err := m.Unmarshal(k); err != nil {
   130  			return err
   131  		}
   132  		a.addToMap(&m)
   133  		return nil
   134  	})
   135  	tx.Unlock()
   136  
   137  	b.ForceCommit()
   138  	return err
   139  }
   140  
   141  func (a *AlarmStore) addToMap(newAlarm *pb.AlarmMember) *pb.AlarmMember {
   142  	t := a.types[newAlarm.Alarm]
   143  	if t == nil {
   144  		t = make(alarmSet)
   145  		a.types[newAlarm.Alarm] = t
   146  	}
   147  	m := t[types.ID(newAlarm.MemberID)]
   148  	if m != nil {
   149  		return m
   150  	}
   151  	t[types.ID(newAlarm.MemberID)] = newAlarm
   152  	return newAlarm
   153  }
   154  

View as plain text