...

Source file src/cloud.google.com/go/bigquery/storage/managedwriter/routers.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2023 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package managedwriter
    16  
    17  import (
    18  	"fmt"
    19  	"sort"
    20  	"sync"
    21  	"time"
    22  )
    23  
    24  type poolRouter interface {
    25  
    26  	// poolAttach is called once to signal a router that it is responsible for a given pool.
    27  	poolAttach(pool *connectionPool) error
    28  
    29  	// poolDetach is called as part of clean connectionPool shutdown.
    30  	// It provides an opportunity for the router to shut down internal state.
    31  	poolDetach() error
    32  
    33  	// writerAttach is a hook to notify the router that a new writer is being attached to the pool.
    34  	// It provides an opportunity for the router to allocate resources and update internal state.
    35  	writerAttach(writer *ManagedStream) error
    36  
    37  	// writerAttach signals the router that a given writer is being removed from the pool.  The router
    38  	// does not have responsibility for closing the writer, but this is called as part of writer close.
    39  	writerDetach(writer *ManagedStream) error
    40  
    41  	// pickConnection is used to select a connection for a given pending write.
    42  	pickConnection(pw *pendingWrite) (*connection, error)
    43  }
    44  
    45  // simpleRouter is a primitive traffic router that routes all traffic to its single connection instance.
    46  //
    47  // This router is designed for our migration case, where an single ManagedStream writer has as 1:1 relationship
    48  // with a connectionPool.  You can multiplex with this router, but it will never scale beyond a single connection.
    49  type simpleRouter struct {
    50  	mode connectionMode
    51  	pool *connectionPool
    52  
    53  	mu      sync.RWMutex
    54  	conn    *connection
    55  	writers map[string]struct{}
    56  }
    57  
    58  func (rtr *simpleRouter) poolAttach(pool *connectionPool) error {
    59  	if rtr.pool == nil {
    60  		rtr.pool = pool
    61  		return nil
    62  	}
    63  	return fmt.Errorf("router already attached to pool %q", rtr.pool.id)
    64  }
    65  
    66  func (rtr *simpleRouter) poolDetach() error {
    67  	rtr.mu.Lock()
    68  	defer rtr.mu.Unlock()
    69  	if rtr.conn != nil {
    70  		rtr.conn.close()
    71  		rtr.conn = nil
    72  	}
    73  	return nil
    74  }
    75  
    76  func (rtr *simpleRouter) writerAttach(writer *ManagedStream) error {
    77  	if writer.id == "" {
    78  		return fmt.Errorf("writer has no ID")
    79  	}
    80  	rtr.mu.Lock()
    81  	defer rtr.mu.Unlock()
    82  	rtr.writers[writer.id] = struct{}{}
    83  	if rtr.conn == nil {
    84  		rtr.conn = newConnection(rtr.pool, rtr.mode, nil)
    85  	}
    86  	return nil
    87  }
    88  
    89  func (rtr *simpleRouter) writerDetach(writer *ManagedStream) error {
    90  	if writer.id == "" {
    91  		return fmt.Errorf("writer has no ID")
    92  	}
    93  	rtr.mu.Lock()
    94  	defer rtr.mu.Unlock()
    95  	delete(rtr.writers, writer.id)
    96  	if len(rtr.writers) == 0 && rtr.conn != nil {
    97  		// no attached writers, cleanup and remove connection.
    98  		defer rtr.conn.close()
    99  		rtr.conn = nil
   100  	}
   101  	return nil
   102  }
   103  
   104  // Picking a connection is easy; there's only one.
   105  func (rtr *simpleRouter) pickConnection(pw *pendingWrite) (*connection, error) {
   106  	rtr.mu.RLock()
   107  	defer rtr.mu.RUnlock()
   108  	if rtr.conn != nil {
   109  		return rtr.conn, nil
   110  	}
   111  	return nil, fmt.Errorf("no connection available")
   112  }
   113  
   114  func newSimpleRouter(mode connectionMode) *simpleRouter {
   115  	return &simpleRouter{
   116  		// We don't add a connection until writers attach.
   117  		mode:    mode,
   118  		writers: make(map[string]struct{}),
   119  	}
   120  }
   121  
   122  // sharedRouter is a more comprehensive router for a connection pool.
   123  //
   124  // It maintains state for both exclusive and shared connections, but doesn't commingle the
   125  // two.  If the router is configured to allow multiplex, it also runs a watchdog goroutine
   126  // that allows is to curate traffic there by reassigning writers to different connections.
   127  //
   128  // Multiplexing routing here is designed for connection sharing among more idle writers,
   129  // and does NOT yet handle the use case where a single writer produces enough traffic to
   130  // warrant fanout across multiple connections.
   131  type sharedRouter struct {
   132  	pool      *connectionPool
   133  	multiplex bool
   134  	maxConns  int           // multiplex limit.
   135  	close     chan struct{} // for shutting down watchdog
   136  
   137  	// mu guards access to exclusive connections
   138  	mu sync.RWMutex
   139  	// keyed by writer ID
   140  	exclusiveConns map[string]*connection
   141  
   142  	// multiMu guards access to multiplex mappings.
   143  	multiMu sync.RWMutex
   144  	// keyed by writer ID
   145  	multiMap map[string]*connection
   146  	// keyed by connection ID
   147  	invertedMultiMap map[string][]*ManagedStream
   148  	multiConns       []*connection
   149  }
   150  
   151  type connPair struct {
   152  	writer *ManagedStream
   153  	conn   *connection
   154  }
   155  
   156  // attaches the router to the connection pool.  The watchdog goroutine
   157  // only curates multiplex connections, so we don't start it if the
   158  // router isn't going to process that traffic.
   159  func (sr *sharedRouter) poolAttach(pool *connectionPool) error {
   160  	if sr.pool == nil {
   161  		sr.pool = pool
   162  		sr.close = make(chan struct{})
   163  		if sr.multiplex {
   164  			go sr.watchdog()
   165  		}
   166  		return nil
   167  	}
   168  	return fmt.Errorf("router already attached to pool %q", sr.pool.id)
   169  }
   170  
   171  // poolDetach gives us an opportunity to cleanup connections during
   172  // shutdown/close.
   173  func (sr *sharedRouter) poolDetach() error {
   174  	sr.mu.Lock()
   175  	// cleanup explicit connections
   176  	for writerID, conn := range sr.exclusiveConns {
   177  		conn.close()
   178  		delete(sr.exclusiveConns, writerID)
   179  	}
   180  	sr.mu.Unlock()
   181  	// cleanup multiplex resources
   182  	sr.multiMu.Lock()
   183  	for _, co := range sr.multiConns {
   184  		co.close()
   185  	}
   186  	sr.multiMap = make(map[string]*connection)
   187  	sr.multiConns = nil
   188  	close(sr.close) // trigger watchdog shutdown
   189  	sr.multiMu.Unlock()
   190  	return nil
   191  }
   192  
   193  func (sr *sharedRouter) writerAttach(writer *ManagedStream) error {
   194  	if writer == nil {
   195  		return fmt.Errorf("invalid writer")
   196  	}
   197  	if writer.id == "" {
   198  		return fmt.Errorf("writer has empty ID")
   199  	}
   200  	if sr.multiplex && canMultiplex(writer.StreamName()) {
   201  		return sr.writerAttachMulti(writer)
   202  	}
   203  	// Handle non-multiplex writer.
   204  	sr.mu.Lock()
   205  	defer sr.mu.Unlock()
   206  	if pair := sr.exclusiveConns[writer.id]; pair != nil {
   207  		return fmt.Errorf("writer %q already attached", writer.id)
   208  	}
   209  	sr.exclusiveConns[writer.id] = newConnection(sr.pool, simplexConnectionMode, writer.streamSettings)
   210  	return nil
   211  }
   212  
   213  // multiAttach is the multiplex-specific logic for writerAttach.
   214  // It should only be called from writerAttach.  We use the same
   215  // orderAndGrow as watchdog, and simply attach the new writer to
   216  // the most idle connection.
   217  func (sr *sharedRouter) writerAttachMulti(writer *ManagedStream) error {
   218  	sr.multiMu.Lock()
   219  	defer sr.multiMu.Unlock()
   220  	// order any existing connections
   221  	sr.orderAndGrowMultiConns()
   222  	conn := sr.multiConns[0]
   223  	sr.multiMap[writer.id] = conn
   224  	var writers []*ManagedStream
   225  	if w, ok := sr.invertedMultiMap[conn.id]; ok {
   226  		writers = append(w, writer)
   227  	} else {
   228  		// first connection
   229  		writers = []*ManagedStream{writer}
   230  	}
   231  	sr.invertedMultiMap[conn.id] = writers
   232  	return nil
   233  }
   234  
   235  // orderMultiConns orders the connection slice by current load, and will grow
   236  // the connections if necessary.
   237  //
   238  // Should only be called with R/W lock.
   239  func (sr *sharedRouter) orderAndGrowMultiConns() {
   240  	sort.SliceStable(sr.multiConns,
   241  		func(i, j int) bool {
   242  			return sr.multiConns[i].curLoad() < sr.multiConns[j].curLoad()
   243  		})
   244  	if len(sr.multiConns) == 0 {
   245  		sr.multiConns = []*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}
   246  	} else if sr.multiConns[0].isLoaded() && len(sr.multiConns) < sr.maxConns {
   247  		sr.multiConns = append([]*connection{newConnection(sr.pool, multiplexConnectionMode, nil)}, sr.multiConns...)
   248  	}
   249  }
   250  
   251  var (
   252  	// Used by rebalanceWriters to avoid rebalancing if the load difference is within the threshold range.
   253  	connLoadDeltaThreshold = 1.2
   254  	watchDogInterval       = 500 * time.Millisecond
   255  )
   256  
   257  // rebalanceWriters looks for opportunities to redistribute traffic load.
   258  //
   259  // This is run as part of a heartbeat, when the connections have been ordered
   260  // by load.
   261  //
   262  // Should only be called with the multiplex mutex r/w lock.
   263  func (sr *sharedRouter) rebalanceWriters() {
   264  	mostIdleIdx := 0
   265  	leastIdleIdx := len(sr.multiConns) - 1
   266  
   267  	mostIdleConn := sr.multiConns[0]
   268  	mostIdleLoad := mostIdleConn.curLoad()
   269  	if mostIdleConn.isLoaded() {
   270  		// Don't rebalance if all connections are loaded.
   271  		return
   272  	}
   273  	// only look for rebalance opportunies between different connections.
   274  	for mostIdleIdx != leastIdleIdx {
   275  		targetConn := sr.multiConns[leastIdleIdx]
   276  		if targetConn.curLoad() < mostIdleLoad*connLoadDeltaThreshold {
   277  			// the load delta isn't significant enough between most and least idle connections
   278  			// to warrant moving traffic.  Done for this heartbeat.
   279  			return
   280  		}
   281  		candidates, ok := sr.invertedMultiMap[targetConn.id]
   282  		if !ok {
   283  			leastIdleIdx = leastIdleIdx - 1
   284  			continue
   285  		}
   286  		if len(candidates) == 1 {
   287  			leastIdleIdx = leastIdleIdx - 1
   288  			continue
   289  		}
   290  		// Multiple writers, relocate one.
   291  		candidate, remaining := candidates[0], candidates[1:]
   292  		// update the moved forward map
   293  		sr.multiMap[candidate.id] = mostIdleConn
   294  		// update the inverse map
   295  		sr.invertedMultiMap[targetConn.id] = remaining
   296  		idleWriters, ok := sr.invertedMultiMap[mostIdleConn.id]
   297  		if ok {
   298  			sr.invertedMultiMap[mostIdleConn.id] = append(idleWriters, candidate)
   299  		} else {
   300  			sr.invertedMultiMap[mostIdleConn.id] = []*ManagedStream{candidate}
   301  		}
   302  		return
   303  	}
   304  
   305  }
   306  
   307  func (sr *sharedRouter) writerDetach(writer *ManagedStream) error {
   308  	if writer == nil {
   309  		return fmt.Errorf("invalid writer")
   310  	}
   311  	if sr.multiplex && canMultiplex(writer.StreamName()) {
   312  		return sr.writerDetachMulti(writer)
   313  	}
   314  	// Handle non-multiplex writer.
   315  	sr.mu.Lock()
   316  	defer sr.mu.Unlock()
   317  	conn := sr.exclusiveConns[writer.id]
   318  	if conn == nil {
   319  		return fmt.Errorf("writer not currently attached")
   320  	}
   321  	conn.close()
   322  	delete(sr.exclusiveConns, writer.id)
   323  	return nil
   324  }
   325  
   326  // writerDetachMulti is the multiplex-specific logic for writerDetach.
   327  // It should only be called from writerDetach.
   328  func (sr *sharedRouter) writerDetachMulti(writer *ManagedStream) error {
   329  	sr.multiMu.Lock()
   330  	defer sr.multiMu.Unlock()
   331  	delete(sr.multiMap, writer.id)
   332  	// If the number of writers drops to zero, close all open connections.
   333  	if len(sr.multiMap) == 0 {
   334  		for _, co := range sr.multiConns {
   335  			co.close()
   336  		}
   337  		sr.multiConns = nil
   338  	}
   339  	return nil
   340  }
   341  
   342  // pickConnection either routes a write to a connection for explicit streams,
   343  // or delegates too pickMultiplexConnection for the multiplex case.
   344  func (sr *sharedRouter) pickConnection(pw *pendingWrite) (*connection, error) {
   345  	if pw.writer == nil {
   346  		return nil, fmt.Errorf("no writer present pending write")
   347  	}
   348  	if sr.multiplex && canMultiplex(pw.writer.StreamName()) {
   349  		return sr.pickMultiplexConnection(pw)
   350  	}
   351  	sr.mu.RLock()
   352  	defer sr.mu.RUnlock()
   353  	conn := sr.exclusiveConns[pw.writer.id]
   354  	if conn == nil {
   355  		return nil, fmt.Errorf("writer %q unknown", pw.writer.id)
   356  	}
   357  	return conn, nil
   358  }
   359  
   360  func (sr *sharedRouter) pickMultiplexConnection(pw *pendingWrite) (*connection, error) {
   361  	sr.multiMu.RLock()
   362  	defer sr.multiMu.RUnlock()
   363  	conn := sr.multiMap[pw.writer.id]
   364  	if conn == nil {
   365  		// TODO: update map
   366  		return nil, fmt.Errorf("no multiplex connection assigned")
   367  	}
   368  	return conn, nil
   369  }
   370  
   371  // watchdog is intended to run as a goroutine where multiplex features are enabled.
   372  //
   373  // Our goals during a heartbeat are simple:
   374  // * ensure we have sufficient connections.
   375  // * ensure traffic from writers is well distributed across connections.
   376  //
   377  // Our rebalancing strategy in this iteration is modest.  We order the connections by
   378  // current load, and then examine the busiest connection(s) looking for opportunities
   379  // to redistribute traffic.
   380  func (sr *sharedRouter) watchdog() {
   381  	for {
   382  		select {
   383  		case <-sr.close:
   384  			return
   385  		case <-time.After(watchDogInterval):
   386  			sr.watchdogPulse()
   387  		}
   388  	}
   389  }
   390  
   391  // an individual pulse of the watchdog loop.
   392  func (sr *sharedRouter) watchdogPulse() {
   393  	sr.multiMu.Lock()
   394  	defer sr.multiMu.Unlock()
   395  	sr.orderAndGrowMultiConns()
   396  	sr.rebalanceWriters()
   397  }
   398  
   399  func newSharedRouter(multiplex bool, maxConns int) *sharedRouter {
   400  	return &sharedRouter{
   401  		multiplex:        multiplex,
   402  		maxConns:         maxConns,
   403  		exclusiveConns:   make(map[string]*connection),
   404  		multiMap:         make(map[string]*connection),
   405  		invertedMultiMap: make(map[string][]*ManagedStream),
   406  	}
   407  }
   408  

View as plain text