const ( ServerName = SubstitutionVar("{server_name}") ServerType = SubstitutionVar("{sever_type}") LaneNumber = SubstitutionVar("{lane_number}") Suffix = SubstitutionVar("{suffix}") CouchDBStatefulSet = SubstitutionVar("{couchdb_sts}") ChirpStatefulSet = SubstitutionVar("{chirp_sts}") ReplicationDB = SubstitutionVar("{replication_db}") ReplicationSecret = SubstitutionVar("{replication_secret}") ReplicationSecretNS = SubstitutionVar("{replication_secret_ns}") NodeUID = SubstitutionVar("{node_uid}") LaneNumberSubstitutionMaxLength = 12 ChirpName = "chirp" ChirpOldName = "data-sync-messaging" ConfigMapUID = "node-uuid" )
const (
ControllerNamespace = "couchctl"
)
const (
DatasyncFinalizer = "finalizers.datasync.edge.ncr.com"
)
const (
IniFileKey = "inifile"
)
var ( ErrServerNotReady = errors.New("server is not ready yet") ErrNewDatabaseNotFound = errors.New("new database not found") ErrNewDatabaseRolesAndUsersNotFound = errors.New("new database roles and users are not set") )
var ( DatabaseReplicationStatus = promauto.NewCounterVec(prometheus.CounterOpts{ Name: metrics.Name("couchctl", "database_replication_status"), Help: "information about database replication status", }, []string{"server_name", "db_name", "status", "message"}) DatabaseDocumentCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: metrics.Name("couchctl", "database_doc_count"), Help: "number of documents in a couchdb database", }, []string{"server_name", "db_name"}) DatabaseDiffDocumentCount = promauto.NewGaugeVec(prometheus.GaugeOpts{ Name: metrics.Name("couchctl", "database_diff_count"), Help: "difference in documents from the cloud couchdb database", }, []string{"server_name", "db_name"}) )
var ( // Substitutions a set of valid mockSubstitutions values Substitutions = map[SubstitutionVar]struct{}{ ServerName: {}, ServerType: {}, LaneNumber: {}, Suffix: {}, CouchDBStatefulSet: {}, ChirpStatefulSet: {}, ReplicationDB: {}, ReplicationSecret: {}, ReplicationSecretNS: {}, NodeUID: {}, } ErrInvalidSubstitutions = errors.New("invalid substitutions") ErrNoValueSubstitution = errors.New("no value found for mockSubstitutions") )
var ( ErrNewUserNotFoundInDB = errors.New("new user not found in users table") )
var ( ErrPodsNotReady = errors.New("pods arent ready") )
func ApplySubstitutions(res client.Object, su Substitution) (*unstructured.Unstructured, error)
ApplySubstitutions replace all mockSubstitutions values
func ChangeSetEntry(action sap.Action, obj client.Object) (*sap.ChangeSetEntry, error)
func CheckPod(ctx context.Context, cl client.Client, ns string, name string) (bool, error)
func ConfigMap(server dsapi.CouchDBServer) (*corev1.ConfigMap, error)
ConfigMap dynamically build couchdb config ini
func DatabaseDocumentCountSet(servername, dbname string, count float64)
func DatabaseDocumentDiffInc(servername, dbname string, count float64)
func ExistingChangeSetEntry(obj client.Object) (*sap.ChangeSetEntry, error)
ExistingChangeSetEntry used for backward compatibility
func IsCondition(conditions []corev1.PodCondition, cType string, cStatus corev1.ConditionStatus) bool
IsCondition determines if a K8s resource has a status matching the input type and status
func IsLeader(node corev1.Node) bool
func MemoryLessThan(node1 corev1.ResourceList, node2 corev1.ResourceList) bool
func ReplicationStatusInc(servername, dbname, status, message string)
func Run(o ...controller.Option) error
Run creates the manager, sets up the controller, and then starts everything. It returns the created manager for testing purposes
func SubstitutionVars(su Substitution) map[SubstitutionVar]string
func WatchCouchDBUser(ctx context.Context, cl client.WithWatch, user *dsapi.CouchDBUser, timeout time.Duration) error
WatchCouchDBUser watch couchdb user resource. TODO make generic to watch any couchdb resource.
type BulkDoc struct { State Doc map[string]interface{} Error error Dataset dsapi.Dataset }
func (d *BulkDoc) SetDone()
func (d *BulkDoc) SetError(err error)
type BulkDocs struct { Docs map[string]*BulkDoc }
func (b *BulkDocs) DoneProcessing() bool
func (b *BulkDocs) Equals(id string, doc map[string]interface{}) bool
func (b *BulkDocs) GetAllDocs() []*BulkDoc
func (b *BulkDocs) GetDocs(state State) []string
func (b *BulkDocs) JoinErrors() error
func (b *BulkDocs) ProcessingDocs() []interface{}
func (b *BulkDocs) Remove(dbs ...string)
func (b *BulkDocs) SetDone(id string) bool
func (b *BulkDocs) SetError(id string, err error) bool
func (b *BulkDocs) SetErrors(err error)
SetErrors can't no longer process doc, set all remaining doc as error
func (b *BulkDocs) SetProcessing(id string) bool
func (b *BulkDocs) SetRevision(id string, rev string) bool
func (b *BulkDocs) SetState(id string, state State) bool
func (b *BulkDocs) Stats() Stat
type Changes interface { Next() bool Err() error ID() string // document ID Changes() []string // list of revisions }
type ChangesFunc func(ctx context.Context, username, password, url string) (Changes, error)
type Config struct { CompactRatio float64 RequeueTime time.Duration // Duration use to reconcile random errors ServerNotReady time.Duration // Duration use to wait for server to become ready DatabaseNotFound time.Duration // Duration use to reconcile for database not found IngressNotReady time.Duration // Duration use to reconcile for ingress not ready PollingInterval time.Duration // Duration use to check for resource updates ReplicationPollingInterval time.Duration // Duration use to check for new DB replication ReplicationChangesInterval time.Duration // Duration use to check for new DB replication EnablementWatchInterval time.Duration // Duration use to check for banner couchdb enablement ReplicationDBCreated time.Duration // Duration use to check db created by replication DatasyncDNSName string CouchDBPort string FleetType string ClusterType string BannerEdgeID string ProjectID string SiteID string // bslInfo.ID, only in stores CouchNamespace string CouchCTLNamespace string InterlockAPIURL string NodeUID string NodeClass string NodeRole string ReplicationEventFromSource bool ReconcileConcurrency int }
func NewConfig() (*Config, error)
func (c *Config) BindFlags(fs *flag.FlagSet)
func (c *Config) CloudURL() string
func (c *Config) IsCPNode() bool
func (c *Config) IsDSDS() bool
func (c *Config) IsGKE() bool
func (c *Config) IsGeneric() bool
func (c *Config) IsStore() bool
func (c *Config) ReplicationDB() string
func (c *Config) Validate() error
type CouchDBDesignDocReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics // contains filtered or unexported fields }
func (r *CouchDBDesignDocReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchDBDesignDocReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up CouchDBDesignDocReconciler with the manager
type CouchDBPersistenceReconciler struct { client.Client LeaderElector kuberecorder.EventRecorder ResourceManager *sap.ResourceManager Name string Config *Config Metrics metrics.Metrics PersistenceLeaderElector // contains filtered or unexported fields }
func (r *CouchDBPersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchDBPersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error
type CouchDatabaseReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics ReconcileConcurrency int // contains filtered or unexported fields }
func (r *CouchDatabaseReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchDatabaseReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up ComputeAddressReconciler with the manager
type CouchIndexReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics // contains filtered or unexported fields }
func (r *CouchIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchIndexReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up CouchIndexReconciler with the manager
type CouchReplicationReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics // contains filtered or unexported fields }
func (r *CouchReplicationReconciler) EnQueue(_ HostState, queue workqueue.RateLimitingInterface)
func (r *CouchReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up CouchReplicationReconciler with the manager
type CouchServerReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder ResourceManager *sap.ResourceManager Name string Config *Config Metrics metrics.Metrics // contains filtered or unexported fields }
func (r *CouchServerReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchServerReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up CouchServerReconciler with the manager
type CouchUserReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder SecretManager secretManager Name string Config *Config Metrics metrics.Metrics ResourceManager *sap.ResourceManager // contains filtered or unexported fields }
func (r *CouchUserReconciler) CredsToSecretManager(ctx context.Context, user *dsapi.CouchDBUser, userCreds couchdb.CredentialsManager, couchReplicationCred bool) error
CredsToSecretManager takes in user and attempts to add the fields as secrets to secretmanager
func (r *CouchUserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error)
func (r *CouchUserReconciler) SetupWithManager(mgr ctrl.Manager) error
SetupWithManager sets up ComputeAddressReconciler with the manager
type Event struct { Topic string `json:"topic"` // must be `host` for HostState data HostState HostState `json:"data"` }
type HostNetwork struct { LanOutageDetected bool `json:"lan-outage-detected"` LanOutageMode bool `json:"lan-outage-mode"` }
type HostState struct { Hostname string `json:"hostname"` Network HostNetwork `json:"network"` }
func (h HostState) InLOM() bool
InterlockClient https://docs.edge-infra.dev/edge/sds/interlock/api/
type InterlockClient struct {
// contains filtered or unexported fields
}
func NewInterlockClient(baseURL string, cb func(e HostState, queue workqueue.RateLimitingInterface)) *InterlockClient
NewInterlockClient retrieves state of the cluster
func (c *InterlockClient) GetHostState(ctx context.Context) (*HostState, error)
func (c *InterlockClient) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error
type LeaderElection struct { Lock resourcelock.Interface // contains filtered or unexported fields }
func NewLeaderElection(cs *coordinationv1client.CoordinationV1Client, namespace, name, id string) *LeaderElection
func (l *LeaderElection) IsLeader() bool
func (l *LeaderElection) OnNewLeader(ctx context.Context, cb func(leader bool))
type LeaderElector interface { Elect([]corev1.Node) (*corev1.Node, error) }
type MockCouchDBCluster struct { sync.Mutex *utils.MockHTTPTestServer // contains filtered or unexported fields }
func NewMockCouchDBCluster(config *Config, client client.Client, server *dsapi.CouchDBServer, database *dsapi.CouchDBDatabase, user *dsapi.CouchDBUser, index *dsapi.CouchDBIndex, ddoc *dsapi.CouchDBDesignDoc, replication *dsapi.CouchDBReplicationSet) (*MockCouchDBCluster, error)
NewMockCouchDBCluster allows us to run all integration tests without connecting to couchdb. Note: this does not replace integration test for couchdb
func (c *MockCouchDBCluster) NotFound()
func (c *MockCouchDBCluster) Port() string
type NodeMemoryComparison []corev1.Node
func (n NodeMemoryComparison) Len() int
func (n NodeMemoryComparison) Less(i, j int) bool
func (n NodeMemoryComparison) Swap(i, j int)
type NodeMemoryElector struct{}
func (NodeMemoryElector) Elect(nodes []corev1.Node) (*corev1.Node, error)
Elect returns a copy of the node elected as leader
type NodeResourcePredicate interface { ShouldReconcile(*Config, client.Object) bool }
type NodeResourcePredicateFunc func(*Config, client.Object) bool
func (f NodeResourcePredicateFunc) ShouldReconcile(cfg *Config, obj client.Object) bool
type Payload struct { Action string `json:"action"` }
type PersistenceLeaderElector interface { IsLeader() bool }
type PersistenceLeaderElectorFunc func() bool
func (r PersistenceLeaderElectorFunc) IsLeader() bool
type ReplicationEvent struct { sync.Mutex // contains filtered or unexported fields }
func NewReplicationEvent(cfg *Config) *ReplicationEvent
func (c *ReplicationEvent) Listen(repl *dsapi.CouchDBReplicationSet, username, password, url string) error
func (c *ReplicationEvent) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error
func (c *ReplicationEvent) Stop()
type ReplicationInfo struct { SourceURI string SourceUsername string SourcePassword string TargetURI string // http://localhost:5984 TargetUsername string TargetPassword string }
type RetryableErrorFunc func(err error) bool
type ServerSetupResponse struct { State string `json:"state,omitempty"` Error string `json:"error,omitempty"` Reason string `json:"reason,omitempty"` }
type Stat struct {
// contains filtered or unexported fields
}
func (s Stat) Done() bool
func (s Stat) String() string
type State int
const ( Processing State = iota Done Error )
type Substitution struct { Leader bool DSDS bool ServerName string ServerType dsapi.ServerType LaneNumber string Suffix string CouchDBStatefulSet string ReplicationDB string ReplicationSecret string ReplicationSecretNS string NodeUID string // optional for generic cluster, required for dsds cluster NodeName string // optional for generic cluster, required for dsds cluster ChirpStatefulSet string NodeInfo *nameutils.NodeInfo }
func LaneSubstitution(ni *nameutils.NodeInfo, nodeMapping map[string]map[string]string, replDB, leaderNodeUID string) Substitution
LaneSubstitution based on node labels Node UID is used for suffix
func StoreSubstitution(replDB string) Substitution
StoreSubstitution generic cluster don't have node labels, so we can use hard coded values
func ToSubstitution(s map[SubstitutionVar]string) Substitution
func (s Substitution) IsTouchpoint() bool
func (s Substitution) NodeClass() v1ien.Class
func (s Substitution) NodeRole() v1ien.Role
type SubstitutionVar string
func ParseSubstitutions(yamlString string) ([]SubstitutionVar, error)
ParseSubstitutions return a set of valid substitutions or error
Name | Synopsis |
---|---|
.. | |
test_runner |