...

Source file src/go.etcd.io/etcd/server/v3/etcdserver/apply_v2.go

Documentation: go.etcd.io/etcd/server/v3/etcdserver

     1  // Copyright 2016 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package etcdserver
    16  
    17  import (
    18  	"encoding/json"
    19  	"fmt"
    20  	"path"
    21  	"strconv"
    22  	"time"
    23  
    24  	"github.com/coreos/go-semver/semver"
    25  	"go.etcd.io/etcd/pkg/v3/pbutil"
    26  	"go.etcd.io/etcd/server/v3/etcdserver/api"
    27  	"go.etcd.io/etcd/server/v3/etcdserver/api/membership"
    28  	"go.etcd.io/etcd/server/v3/etcdserver/api/v2store"
    29  
    30  	"go.uber.org/zap"
    31  )
    32  
    33  const v2Version = "v2"
    34  
    35  // ApplierV2 is the interface for processing V2 raft messages
    36  type ApplierV2 interface {
    37  	Delete(r *RequestV2) Response
    38  	Post(r *RequestV2) Response
    39  	Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response
    40  	QGet(r *RequestV2) Response
    41  	Sync(r *RequestV2) Response
    42  }
    43  
    44  func NewApplierV2(lg *zap.Logger, s v2store.Store, c *membership.RaftCluster) ApplierV2 {
    45  	if lg == nil {
    46  		lg = zap.NewNop()
    47  	}
    48  	return &applierV2store{lg: lg, store: s, cluster: c}
    49  }
    50  
    51  type applierV2store struct {
    52  	lg      *zap.Logger
    53  	store   v2store.Store
    54  	cluster *membership.RaftCluster
    55  }
    56  
    57  func (a *applierV2store) Delete(r *RequestV2) Response {
    58  	switch {
    59  	case r.PrevIndex > 0 || r.PrevValue != "":
    60  		return toResponse(a.store.CompareAndDelete(r.Path, r.PrevValue, r.PrevIndex))
    61  	default:
    62  		return toResponse(a.store.Delete(r.Path, r.Dir, r.Recursive))
    63  	}
    64  }
    65  
    66  func (a *applierV2store) Post(r *RequestV2) Response {
    67  	return toResponse(a.store.Create(r.Path, r.Dir, r.Val, true, r.TTLOptions()))
    68  }
    69  
    70  func (a *applierV2store) Put(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) Response {
    71  	ttlOptions := r.TTLOptions()
    72  	exists, existsSet := pbutil.GetBool(r.PrevExist)
    73  	switch {
    74  	case existsSet:
    75  		if exists {
    76  			if r.PrevIndex == 0 && r.PrevValue == "" {
    77  				return toResponse(a.store.Update(r.Path, r.Val, ttlOptions))
    78  			}
    79  			return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
    80  		}
    81  		return toResponse(a.store.Create(r.Path, r.Dir, r.Val, false, ttlOptions))
    82  	case r.PrevIndex > 0 || r.PrevValue != "":
    83  		return toResponse(a.store.CompareAndSwap(r.Path, r.PrevValue, r.PrevIndex, r.Val, ttlOptions))
    84  	default:
    85  		if storeMemberAttributeRegexp.MatchString(r.Path) {
    86  			id := membership.MustParseMemberIDFromKey(a.lg, path.Dir(r.Path))
    87  			var attr membership.Attributes
    88  			if err := json.Unmarshal([]byte(r.Val), &attr); err != nil {
    89  				a.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err))
    90  			}
    91  			if a.cluster != nil {
    92  				a.cluster.UpdateAttributes(id, attr, shouldApplyV3)
    93  			}
    94  			// return an empty response since there is no consumer.
    95  			return Response{}
    96  		}
    97  		// TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6
    98  		if r.Path == membership.StoreClusterVersionKey() {
    99  			if a.cluster != nil {
   100  				// persist to backend given v2store can be very stale
   101  				a.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3)
   102  			}
   103  			return Response{}
   104  		}
   105  		return toResponse(a.store.Set(r.Path, r.Dir, r.Val, ttlOptions))
   106  	}
   107  }
   108  
   109  func (a *applierV2store) QGet(r *RequestV2) Response {
   110  	return toResponse(a.store.Get(r.Path, r.Recursive, r.Sorted))
   111  }
   112  
   113  func (a *applierV2store) Sync(r *RequestV2) Response {
   114  	a.store.DeleteExpiredKeys(time.Unix(0, r.Time))
   115  	return Response{}
   116  }
   117  
   118  // applyV2Request interprets r as a call to v2store.X
   119  // and returns a Response interpreted from v2store.Event
   120  func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) (resp Response) {
   121  	stringer := panicAlternativeStringer{
   122  		stringer:    r,
   123  		alternative: func() string { return fmt.Sprintf("id:%d,method:%s,path:%s", r.ID, r.Method, r.Path) },
   124  	}
   125  	defer func(start time.Time) {
   126  		success := resp.Err == nil
   127  		applySec.WithLabelValues(v2Version, r.Method, strconv.FormatBool(success)).Observe(time.Since(start).Seconds())
   128  		warnOfExpensiveRequest(s.Logger(), s.Cfg.WarningApplyDuration, start, stringer, nil, nil)
   129  	}(time.Now())
   130  
   131  	switch r.Method {
   132  	case "POST":
   133  		return s.applyV2.Post(r)
   134  	case "PUT":
   135  		return s.applyV2.Put(r, shouldApplyV3)
   136  	case "DELETE":
   137  		return s.applyV2.Delete(r)
   138  	case "QGET":
   139  		return s.applyV2.QGet(r)
   140  	case "SYNC":
   141  		return s.applyV2.Sync(r)
   142  	default:
   143  		// This should never be reached, but just in case:
   144  		return Response{Err: ErrUnknownMethod}
   145  	}
   146  }
   147  
   148  func (r *RequestV2) TTLOptions() v2store.TTLOptionSet {
   149  	refresh, _ := pbutil.GetBool(r.Refresh)
   150  	ttlOptions := v2store.TTLOptionSet{Refresh: refresh}
   151  	if r.Expiration != 0 {
   152  		ttlOptions.ExpireTime = time.Unix(0, r.Expiration)
   153  	}
   154  	return ttlOptions
   155  }
   156  
   157  func toResponse(ev *v2store.Event, err error) Response {
   158  	return Response{Event: ev, Err: err}
   159  }
   160  

View as plain text