// Copyright 2023 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // https://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package managedwriter import ( "fmt" "sort" "sync" "time" ) type poolRouter interface { // poolAttach is called once to signal a router that it is responsible for a given pool. poolAttach(pool *connectionPool) error // poolDetach is called as part of clean connectionPool shutdown. // It provides an opportunity for the router to shut down internal state. poolDetach() error // writerAttach is a hook to notify the router that a new writer is being attached to the pool. // It provides an opportunity for the router to allocate resources and update internal state. writerAttach(writer *ManagedStream) error // writerAttach signals the router that a given writer is being removed from the pool. The router // does not have responsibility for closing the writer, but this is called as part of writer close. writerDetach(writer *ManagedStream) error // pickConnection is used to select a connection for a given pending write. pickConnection(pw *pendingWrite) (*connection, error) } // simpleRouter is a primitive traffic router that routes all traffic to its single connection instance. // // This router is designed for our migration case, where an single ManagedStream writer has as 1:1 relationship // with a connectionPool. You can multiplex with this router, but it will never scale beyond a single connection. type simpleRouter struct { mode connectionMode pool *connectionPool mu sync.RWMutex conn *connection writers map[string]struct{} } func (rtr *simpleRouter) poolAttach(pool *connectionPool) error { if rtr.pool == nil { rtr.pool = pool return nil } return fmt.Errorf("router already attached to pool %q", rtr.pool.id) } func (rtr *simpleRouter) poolDetach() error { rtr.mu.Lock() defer rtr.mu.Unlock() if rtr.conn != nil { rtr.conn.close() rtr.conn = nil } return nil } func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error { if writer.id == "" { return fmt.Errorf("writer has no ID") } rtr.mu.Lock() defer rtr.mu.Unlock() rtr.writers[writer.id] = struct{}{} if rtr.conn == nil { rtr.conn = newConnection(rtr.pool, rtr.mode, nil) } return nil } func (rtr *simpleRouter) writerDetach(writer *ManagedStream) error { if writer.id == "" { return fmt.Errorf("writer has no ID") } rtr.mu.Lock() defer rtr.mu.Unlock() delete(rtr.writers, writer.id) if len(rtr.writers) == 0 && rtr.conn != nil { // no attached writers, cleanup and remove connection. defer rtr.conn.close() rtr.conn = nil } return nil } // Picking a connection is easy; there's only one. func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) { rtr.mu.RLock() defer rtr.mu.RUnlock() if rtr.conn != nil { return rtr.conn, nil } return nil, fmt.Errorf("no connection available") } func newSimpleRouter(mode connectionMode) *simpleRouter { return &simpleRouter{ // We don't add a connection until writers attach. mode: mode, writers: make(map[string]struct{}), } } // sharedRouter is a more comprehensive router for a connection pool. // // It maintains state for both exclusive and shared connections, but doesn't commingle the // two. If the router is configured to allow multiplex, it also runs a watchdog goroutine // that allows is to curate traffic there by reassigning writers to different connections. // // Multiplexing routing here is designed for connection sharing among more idle writers, // and does NOT yet handle the use case where a single writer produces enough traffic to // warrant fanout across multiple connections. type sharedRouter struct { pool *connectionPool multiplex bool maxConns int // multiplex limit. close chan struct{} // for shutting down watchdog // mu guards access to exclusive connections mu sync.RWMutex // keyed by writer ID exclusiveConns map[string]*connection // multiMu guards access to multiplex mappings. multiMu sync.RWMutex // keyed by writer ID multiMap map[string]*connection // keyed by connection ID invertedMultiMap map[string][]*ManagedStream multiConns []*connection } type connPair struct { writer *ManagedStream conn *connection } // attaches the router to the connection pool. The watchdog goroutine // only curates multiplex connections, so we don't start it if the // router isn't going to process that traffic. func (sr *sharedRouter) poolAttach(pool *connectionPool) error { if sr.pool == nil { sr.pool = pool sr.close = make(chan struct{}) if sr.multiplex { go sr.watchdog() } return nil } return fmt.Errorf("router already attached to pool %q", sr.pool.id) } // poolDetach gives us an opportunity to cleanup connections during // shutdown/close. func (sr *sharedRouter) poolDetach() error { sr.mu.Lock() // cleanup explicit connections for writerID, conn := range sr.exclusiveConns { conn.close() delete(sr.exclusiveConns, writerID) } sr.mu.Unlock() // cleanup multiplex resources sr.multiMu.Lock() for _, co := range sr.multiConns { co.close() } sr.multiMap = make(map[string]*connection) sr.multiConns = nil close(sr.close) // trigger watchdog shutdown sr.multiMu.Unlock() return nil } func (sr *sharedRouter) writerAttach(writer *ManagedStream) error { if writer == nil { return fmt.Errorf("invalid writer") } if writer.id == "" { return fmt.Errorf("writer has empty ID") } if sr.multiplex && canMultiplex(writer.StreamName()) { return sr.writerAttachMulti(writer) } // Handle non-multiplex writer. sr.mu.Lock() defer sr.mu.Unlock() if pair := sr.exclusiveConns[writer.id]; pair != nil { return fmt.Errorf("writer %q already attached", writer.id) } sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings) return nil } // multiAttach is the multiplex-specific logic for writerAttach. // It should only be called from writerAttach. We use the same // orderAndGrow as watchdog, and simply attach the new writer to // the most idle connection. func (sr *sharedRouter) writerAttachMulti(writer *ManagedStream) error { sr.multiMu.Lock() defer sr.multiMu.Unlock() // order any existing connections sr.orderAndGrowMultiConns() conn := sr.multiConns[0] sr.multiMap[writer.id] = conn var writers []*ManagedStream if w, ok := sr.invertedMultiMap[conn.id]; ok { writers = append(w, writer) } else { // first connection writers = []*ManagedStream{writer} } sr.invertedMultiMap[conn.id] = writers return nil } // orderMultiConns orders the connection slice by current load, and will grow // the connections if necessary. // // Should only be called with R/W lock. func (sr *sharedRouter) orderAndGrowMultiConns() { sort.SliceStable(sr.multiConns, func(i, j int) bool { return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad() }) if len(sr.multiConns) == 0 { sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)} } else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns { sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...) } } var ( // Used by rebalanceWriters to avoid rebalancing if the load difference is within the threshold range. connLoadDeltaThreshold = 1.2 watchDogInterval = 500 * time.Millisecond ) // rebalanceWriters looks for opportunities to redistribute traffic load. // // This is run as part of a heartbeat, when the connections have been ordered // by load. // // Should only be called with the multiplex mutex r/w lock. func (sr *sharedRouter) rebalanceWriters() { mostIdleIdx := 0 leastIdleIdx := len(sr.multiConns) - 1 mostIdleConn := sr.multiConns[0] mostIdleLoad := mostIdleConn.curLoad() if mostIdleConn.isLoaded() { // Don't rebalance if all connections are loaded. return } // only look for rebalance opportunies between different connections. for mostIdleIdx != leastIdleIdx { targetConn := sr.multiConns[leastIdleIdx] if targetConn.curLoad() < mostIdleLoad*connLoadDeltaThreshold { // the load delta isn't significant enough between most and least idle connections // to warrant moving traffic. Done for this heartbeat. return } candidates, ok := sr.invertedMultiMap[targetConn.id] if !ok { leastIdleIdx = leastIdleIdx - 1 continue } if len(candidates) == 1 { leastIdleIdx = leastIdleIdx - 1 continue } // Multiple writers, relocate one. candidate, remaining := candidates[0], candidates[1:] // update the moved forward map sr.multiMap[candidate.id] = mostIdleConn // update the inverse map sr.invertedMultiMap[targetConn.id] = remaining idleWriters, ok := sr.invertedMultiMap[mostIdleConn.id] if ok { sr.invertedMultiMap[mostIdleConn.id] = append(idleWriters, candidate) } else { sr.invertedMultiMap[mostIdleConn.id] = []*ManagedStream{candidate} } return } } func (sr *sharedRouter) writerDetach(writer *ManagedStream) error { if writer == nil { return fmt.Errorf("invalid writer") } if sr.multiplex && canMultiplex(writer.StreamName()) { return sr.writerDetachMulti(writer) } // Handle non-multiplex writer. sr.mu.Lock() defer sr.mu.Unlock() conn := sr.exclusiveConns[writer.id] if conn == nil { return fmt.Errorf("writer not currently attached") } conn.close() delete(sr.exclusiveConns, writer.id) return nil } // writerDetachMulti is the multiplex-specific logic for writerDetach. // It should only be called from writerDetach. func (sr *sharedRouter) writerDetachMulti(writer *ManagedStream) error { sr.multiMu.Lock() defer sr.multiMu.Unlock() delete(sr.multiMap, writer.id) // If the number of writers drops to zero, close all open connections. if len(sr.multiMap) == 0 { for _, co := range sr.multiConns { co.close() } sr.multiConns = nil } return nil } // pickConnection either routes a write to a connection for explicit streams, // or delegates too pickMultiplexConnection for the multiplex case. func (sr *sharedRouter) pickConnection(pw *pendingWrite) (*connection, error) { if pw.writer == nil { return nil, fmt.Errorf("no writer present pending write") } if sr.multiplex && canMultiplex(pw.writer.StreamName()) { return sr.pickMultiplexConnection(pw) } sr.mu.RLock() defer sr.mu.RUnlock() conn := sr.exclusiveConns[pw.writer.id] if conn == nil { return nil, fmt.Errorf("writer %q unknown", pw.writer.id) } return conn, nil } func (sr *sharedRouter) pickMultiplexConnection(pw *pendingWrite) (*connection, error) { sr.multiMu.RLock() defer sr.multiMu.RUnlock() conn := sr.multiMap[pw.writer.id] if conn == nil { // TODO: update map return nil, fmt.Errorf("no multiplex connection assigned") } return conn, nil } // watchdog is intended to run as a goroutine where multiplex features are enabled. // // Our goals during a heartbeat are simple: // * ensure we have sufficient connections. // * ensure traffic from writers is well distributed across connections. // // Our rebalancing strategy in this iteration is modest. We order the connections by // current load, and then examine the busiest connection(s) looking for opportunities // to redistribute traffic. func (sr *sharedRouter) watchdog() { for { select { case <-sr.close: return case <-time.After(watchDogInterval): sr.watchdogPulse() } } } // an individual pulse of the watchdog loop. func (sr *sharedRouter) watchdogPulse() { sr.multiMu.Lock() defer sr.multiMu.Unlock() sr.orderAndGrowMultiConns() sr.rebalanceWriters() } func newSharedRouter(multiplex bool, maxConns int) *sharedRouter { return &sharedRouter{ multiplex: multiplex, maxConns: maxConns, exclusiveConns: make(map[string]*connection), multiMap: make(map[string]*connection), invertedMultiMap: make(map[string][]*ManagedStream), } }