...

Source file src/github.com/lestrrat-go/jwx/jwk/refresh.go

Documentation: github.com/lestrrat-go/jwx/jwk

     1  package jwk
     2  
     3  import (
     4  	"context"
     5  	"net/http"
     6  	"reflect"
     7  	"sync"
     8  	"time"
     9  
    10  	"github.com/lestrrat-go/backoff/v2"
    11  	"github.com/lestrrat-go/httpcc"
    12  	"github.com/pkg/errors"
    13  )
    14  
    15  // AutoRefresh is a container that keeps track of jwk.Set object by their source URLs.
    16  // The jwk.Set objects are refreshed automatically behind the scenes.
    17  //
    18  // Before retrieving the jwk.Set objects, the user must pre-register the
    19  // URLs they intend to use by calling `Configure()`
    20  //
    21  //  ar := jwk.NewAutoRefresh(ctx)
    22  //  ar.Configure(url, options...)
    23  //
    24  // Once registered, you can call `Fetch()` to retrieve the jwk.Set object.
    25  //
    26  // All JWKS objects that are retrieved via the auto-fetch mechanism should be
    27  // treated read-only, as they are shared among the consumers and this object.
    28  type AutoRefresh struct {
    29  	errSink      chan AutoRefreshError
    30  	cache        map[string]Set
    31  	configureCh  chan struct{}
    32  	removeCh     chan removeReq
    33  	fetching     map[string]chan struct{}
    34  	muErrSink    sync.Mutex
    35  	muCache      sync.RWMutex
    36  	muFetching   sync.Mutex
    37  	muRegistry   sync.RWMutex
    38  	registry     map[string]*target
    39  	resetTimerCh chan *resetTimerReq
    40  }
    41  
    42  type target struct {
    43  	// The backoff policy to use when fetching the JWKS fails
    44  	backoff backoff.Policy
    45  
    46  	// The HTTP client to use. The user may opt to use a client which is
    47  	// aware of HTTP caching, or one that goes through a proxy
    48  	httpcl HTTPClient
    49  
    50  	// Interval between refreshes are calculated two ways.
    51  	// 1) You can set an explicit refresh interval by using WithRefreshInterval().
    52  	//    In this mode, it doesn't matter what the HTTP response says in its
    53  	//    Cache-Control or Expires headers
    54  	// 2) You can let us calculate the time-to-refresh based on the key's
    55  	//	  Cache-Control or Expires headers.
    56  	//    First, the user provides us the absolute minimum interval before
    57  	//    refreshes. We will never check for refreshes before this specified
    58  	//    amount of time.
    59  	//
    60  	//    Next, max-age directive in the Cache-Control header is consulted.
    61  	//    If `max-age` is not present, we skip the following section, and
    62  	//    proceed to the next option.
    63  	//    If `max-age > user-supplied minimum interval`, then we use the max-age,
    64  	//    otherwise the user-supplied minimum interval is used.
    65  	//
    66  	//    Next, the value specified in Expires header is consulted.
    67  	//    If the header is not present, we skip the following seciont and
    68  	//    proceed to the next option.
    69  	//    We take the time until expiration `expires - time.Now()`, and
    70  	//	  if `time-until-expiration > user-supplied minimum interval`, then
    71  	//    we use the expires value, otherwise the user-supplied minimum interval is used.
    72  	//
    73  	//    If all of the above fails, we used the user-supplied minimum interval
    74  	refreshInterval    *time.Duration
    75  	minRefreshInterval time.Duration
    76  
    77  	url string
    78  
    79  	// The timer for refreshing the keyset. should not be set by anyone
    80  	// other than the refreshing goroutine
    81  	timer *time.Timer
    82  
    83  	// Semaphore to limit the number of concurrent refreshes in the background
    84  	sem chan struct{}
    85  
    86  	// for debugging, snapshoting
    87  	lastRefresh time.Time
    88  	nextRefresh time.Time
    89  
    90  	wl           Whitelist
    91  	parseOptions []ParseOption
    92  }
    93  
    94  type resetTimerReq struct {
    95  	t *target
    96  	d time.Duration
    97  }
    98  
    99  // NewAutoRefresh creates a container that keeps track of JWKS objects which
   100  // are automatically refreshed.
   101  //
   102  // The context object in the argument controls the life-span of the
   103  // auto-refresh worker. If you are using this in a long running process, this
   104  // should mostly be set to a context that ends when the main loop/part of your
   105  // program exits:
   106  //
   107  // func MainLoop() {
   108  //   ctx, cancel := context.WithCancel(context.Background())
   109  //   defer cancel()
   110  //   ar := jwk.AutoRefresh(ctx)
   111  //   for ... {
   112  //     ...
   113  //   }
   114  // }
   115  func NewAutoRefresh(ctx context.Context) *AutoRefresh {
   116  	af := &AutoRefresh{
   117  		cache:        make(map[string]Set),
   118  		configureCh:  make(chan struct{}),
   119  		removeCh:     make(chan removeReq),
   120  		fetching:     make(map[string]chan struct{}),
   121  		registry:     make(map[string]*target),
   122  		resetTimerCh: make(chan *resetTimerReq),
   123  	}
   124  	go af.refreshLoop(ctx)
   125  	return af
   126  }
   127  
   128  func (af *AutoRefresh) getCached(url string) (Set, bool) {
   129  	af.muCache.RLock()
   130  	ks, ok := af.cache[url]
   131  	af.muCache.RUnlock()
   132  	if ok {
   133  		return ks, true
   134  	}
   135  	return nil, false
   136  }
   137  
   138  type removeReq struct {
   139  	replyCh chan error
   140  	url     string
   141  }
   142  
   143  // Remove removes `url` from the list of urls being watched by jwk.AutoRefresh.
   144  // If the url is not already registered, returns an error.
   145  func (af *AutoRefresh) Remove(url string) error {
   146  	ch := make(chan error)
   147  	af.removeCh <- removeReq{replyCh: ch, url: url}
   148  	return <-ch
   149  }
   150  
   151  // Configure registers the url to be controlled by AutoRefresh, and also
   152  // sets any options associated to it.
   153  //
   154  // Note that options are treated as a whole -- you can't just update
   155  // one value. For example, if you did:
   156  //
   157  //   ar.Configure(url, jwk.WithHTTPClient(...))
   158  //   ar.Configure(url, jwk.WithRefreshInterval(...))
   159  // The the end result is that `url` is ONLY associated with the options
   160  // given in the second call to `Configure()`, i.e. `jwk.WithRefreshInterval`.
   161  // The other unspecified options, including the HTTP client, is set to
   162  // their default values.
   163  //
   164  // Configuration must propagate between goroutines, and therefore are
   165  // not atomic (But changes should be felt "soon enough" for practical
   166  // purposes)
   167  func (af *AutoRefresh) Configure(url string, options ...AutoRefreshOption) {
   168  	var httpcl HTTPClient = http.DefaultClient
   169  	var hasRefreshInterval bool
   170  	var refreshInterval time.Duration
   171  	var wl Whitelist
   172  	var parseOptions []ParseOption
   173  	minRefreshInterval := time.Hour
   174  	bo := backoff.Null()
   175  	for _, option := range options {
   176  		if v, ok := option.(ParseOption); ok {
   177  			parseOptions = append(parseOptions, v)
   178  			continue
   179  		}
   180  
   181  		//nolint:forcetypeassert
   182  		switch option.Ident() {
   183  		case identFetchBackoff{}:
   184  			bo = option.Value().(backoff.Policy)
   185  		case identRefreshInterval{}:
   186  			refreshInterval = option.Value().(time.Duration)
   187  			hasRefreshInterval = true
   188  		case identMinRefreshInterval{}:
   189  			minRefreshInterval = option.Value().(time.Duration)
   190  		case identHTTPClient{}:
   191  			httpcl = option.Value().(HTTPClient)
   192  		case identFetchWhitelist{}:
   193  			wl = option.Value().(Whitelist)
   194  		}
   195  	}
   196  
   197  	af.muRegistry.Lock()
   198  	t, ok := af.registry[url]
   199  	if ok {
   200  		if t.httpcl != httpcl {
   201  			t.httpcl = httpcl
   202  		}
   203  
   204  		if t.minRefreshInterval != minRefreshInterval {
   205  			t.minRefreshInterval = minRefreshInterval
   206  		}
   207  
   208  		if t.refreshInterval != nil {
   209  			if !hasRefreshInterval {
   210  				t.refreshInterval = nil
   211  			} else if *t.refreshInterval != refreshInterval {
   212  				*t.refreshInterval = refreshInterval
   213  			}
   214  		} else {
   215  			if hasRefreshInterval {
   216  				t.refreshInterval = &refreshInterval
   217  			}
   218  		}
   219  
   220  		if t.wl != wl {
   221  			t.wl = wl
   222  		}
   223  
   224  		t.parseOptions = parseOptions
   225  	} else {
   226  		t = &target{
   227  			backoff:            bo,
   228  			httpcl:             httpcl,
   229  			minRefreshInterval: minRefreshInterval,
   230  			url:                url,
   231  			sem:                make(chan struct{}, 1),
   232  			// This is a placeholder timer so we can call Reset() on it later
   233  			// Make it sufficiently in the future so that we don't have bogus
   234  			// events firing
   235  			timer:        time.NewTimer(24 * time.Hour),
   236  			wl:           wl,
   237  			parseOptions: parseOptions,
   238  		}
   239  		if hasRefreshInterval {
   240  			t.refreshInterval = &refreshInterval
   241  		}
   242  
   243  		// Record this in the registry
   244  		af.registry[url] = t
   245  	}
   246  	af.muRegistry.Unlock()
   247  
   248  	// Tell the backend to reconfigure itself
   249  	af.configureCh <- struct{}{}
   250  }
   251  
   252  func (af *AutoRefresh) releaseFetching(url string) {
   253  	// first delete the entry from the map, then close the channel or
   254  	// otherwise we may end up getting multiple groutines doing the fetch
   255  	af.muFetching.Lock()
   256  	fetchingCh, ok := af.fetching[url]
   257  	if !ok {
   258  		// Juuuuuuust in case. But shouldn't happen
   259  		af.muFetching.Unlock()
   260  		return
   261  	}
   262  	delete(af.fetching, url)
   263  	close(fetchingCh)
   264  	af.muFetching.Unlock()
   265  }
   266  
   267  // IsRegistered checks if `url` is registered already.
   268  func (af *AutoRefresh) IsRegistered(url string) bool {
   269  	_, ok := af.getRegistered(url)
   270  	return ok
   271  }
   272  
   273  // Fetch returns a jwk.Set from the given url.
   274  func (af *AutoRefresh) getRegistered(url string) (*target, bool) {
   275  	af.muRegistry.RLock()
   276  	t, ok := af.registry[url]
   277  	af.muRegistry.RUnlock()
   278  	return t, ok
   279  }
   280  
   281  // Fetch returns a jwk.Set from the given url.
   282  //
   283  // If it has previously been fetched, then a cached value is returned.
   284  //
   285  // If this the first time `url` was requested, an HTTP request will be
   286  // sent, synchronously.
   287  //
   288  // When accessed via multiple goroutines concurrently, and the cache
   289  // has not been populated yet, only the first goroutine is
   290  // allowed to perform the initialization (HTTP fetch and cache population).
   291  // All other goroutines will be blocked until the operation is completed.
   292  //
   293  // DO NOT modify the jwk.Set object returned by this method, as the
   294  // objects are shared among all consumers and the backend goroutine
   295  func (af *AutoRefresh) Fetch(ctx context.Context, url string) (Set, error) {
   296  	if _, ok := af.getRegistered(url); !ok {
   297  		return nil, errors.Errorf(`url %s must be configured using "Configure()" first`, url)
   298  	}
   299  
   300  	ks, found := af.getCached(url)
   301  	if found {
   302  		return ks, nil
   303  	}
   304  
   305  	return af.refresh(ctx, url)
   306  }
   307  
   308  // Refresh is the same as Fetch(), except that HTTP fetching is done synchronously.
   309  //
   310  // This is useful when you want to force an HTTP fetch instead of waiting
   311  // for the background goroutine to do it, for example when you want to
   312  // make sure the AutoRefresh cache is warmed up before starting your main loop
   313  func (af *AutoRefresh) Refresh(ctx context.Context, url string) (Set, error) {
   314  	if _, ok := af.getRegistered(url); !ok {
   315  		return nil, errors.Errorf(`url %s must be configured using "Configure()" first`, url)
   316  	}
   317  
   318  	return af.refresh(ctx, url)
   319  }
   320  
   321  func (af *AutoRefresh) refresh(ctx context.Context, url string) (Set, error) {
   322  	// To avoid a thundering herd, only one goroutine per url may enter into this
   323  	// initial fetch phase.
   324  	af.muFetching.Lock()
   325  	fetchingCh, fetching := af.fetching[url]
   326  	// unlock happens in each of the if/else clauses because we need to perform
   327  	// the channel initialization when there is no channel present
   328  	if fetching {
   329  		af.muFetching.Unlock()
   330  		select {
   331  		case <-ctx.Done():
   332  			return nil, ctx.Err()
   333  		case <-fetchingCh:
   334  		}
   335  	} else {
   336  		fetchingCh = make(chan struct{})
   337  		af.fetching[url] = fetchingCh
   338  		af.muFetching.Unlock()
   339  
   340  		// Register a cleanup handler, to make sure we always
   341  		defer af.releaseFetching(url)
   342  
   343  		// The first time around, we need to fetch the keyset
   344  		if err := af.doRefreshRequest(ctx, url, false); err != nil {
   345  			return nil, errors.Wrapf(err, `failed to fetch resource pointed by %s`, url)
   346  		}
   347  	}
   348  
   349  	// the cache should now be populated
   350  	ks, ok := af.getCached(url)
   351  	if !ok {
   352  		return nil, errors.New("cache was not populated after explicit refresh")
   353  	}
   354  
   355  	return ks, nil
   356  }
   357  
   358  // Keeps looping, while refreshing the KeySet.
   359  func (af *AutoRefresh) refreshLoop(ctx context.Context) {
   360  	// reflect.Select() is slow IF we are executing it over and over
   361  	// in a very fast iteration, but we assume here that refreshes happen
   362  	// seldom enough that being able to call one `select{}` with multiple
   363  	// targets / channels outweighs the speed penalty of using reflect.
   364  	//
   365  	const (
   366  		ctxDoneIdx = iota
   367  		configureIdx
   368  		resetTimerIdx
   369  		removeIdx
   370  		baseSelcasesLen
   371  	)
   372  
   373  	baseSelcases := make([]reflect.SelectCase, baseSelcasesLen)
   374  	baseSelcases[ctxDoneIdx] = reflect.SelectCase{
   375  		Dir:  reflect.SelectRecv,
   376  		Chan: reflect.ValueOf(ctx.Done()),
   377  	}
   378  	baseSelcases[configureIdx] = reflect.SelectCase{
   379  		Dir:  reflect.SelectRecv,
   380  		Chan: reflect.ValueOf(af.configureCh),
   381  	}
   382  	baseSelcases[resetTimerIdx] = reflect.SelectCase{
   383  		Dir:  reflect.SelectRecv,
   384  		Chan: reflect.ValueOf(af.resetTimerCh),
   385  	}
   386  	baseSelcases[removeIdx] = reflect.SelectCase{
   387  		Dir:  reflect.SelectRecv,
   388  		Chan: reflect.ValueOf(af.removeCh),
   389  	}
   390  
   391  	var targets []*target
   392  	var selcases []reflect.SelectCase
   393  	for {
   394  		// It seems silly, but it's much easier to keep track of things
   395  		// if we re-build the select cases every iteration
   396  
   397  		af.muRegistry.RLock()
   398  		if cap(targets) < len(af.registry) {
   399  			targets = make([]*target, 0, len(af.registry))
   400  		} else {
   401  			targets = targets[:0]
   402  		}
   403  
   404  		if cap(selcases) < len(af.registry) {
   405  			selcases = make([]reflect.SelectCase, 0, len(af.registry)+baseSelcasesLen)
   406  		} else {
   407  			selcases = selcases[:0]
   408  		}
   409  		selcases = append(selcases, baseSelcases...)
   410  
   411  		for _, data := range af.registry {
   412  			targets = append(targets, data)
   413  			selcases = append(selcases, reflect.SelectCase{
   414  				Dir:  reflect.SelectRecv,
   415  				Chan: reflect.ValueOf(data.timer.C),
   416  			})
   417  		}
   418  		af.muRegistry.RUnlock()
   419  
   420  		chosen, recv, recvOK := reflect.Select(selcases)
   421  		switch chosen {
   422  		case ctxDoneIdx:
   423  			// <-ctx.Done(). Just bail out of this loop
   424  			return
   425  		case configureIdx:
   426  			// <-configureCh. rebuild the select list from the registry.
   427  			// since we're rebuilding everything for each iteration,
   428  			// we just need to start the loop all over again
   429  			continue
   430  		case resetTimerIdx:
   431  			// <-resetTimerCh. interrupt polling, and reset the timer on
   432  			// a single target. this needs to be handled inside this select
   433  			if !recvOK {
   434  				continue
   435  			}
   436  
   437  			req := recv.Interface().(*resetTimerReq) //nolint:forcetypeassert
   438  			t := req.t
   439  			d := req.d
   440  			if !t.timer.Stop() {
   441  				select {
   442  				case <-t.timer.C:
   443  				default:
   444  				}
   445  			}
   446  			t.timer.Reset(d)
   447  		case removeIdx:
   448  			// <-removeCh. remove the URL from future fetching
   449  			//nolint:forcetypeassert
   450  			req := recv.Interface().(removeReq)
   451  			replyCh := req.replyCh
   452  			url := req.url
   453  			af.muRegistry.Lock()
   454  			if _, ok := af.registry[url]; !ok {
   455  				replyCh <- errors.Errorf(`invalid url %q (not registered)`, url)
   456  			} else {
   457  				delete(af.registry, url)
   458  				replyCh <- nil
   459  			}
   460  			af.muRegistry.Unlock()
   461  		default:
   462  			// Do not fire a refresh in case the channel was closed.
   463  			if !recvOK {
   464  				continue
   465  			}
   466  
   467  			// Time to refresh a target
   468  			t := targets[chosen-baseSelcasesLen]
   469  
   470  			// Check if there are other goroutines still doing the refresh asynchronously.
   471  			// This could happen if the refreshing goroutine is stuck on a backoff
   472  			// waiting for the HTTP request to complete.
   473  			select {
   474  			case t.sem <- struct{}{}:
   475  				// There can only be one refreshing goroutine
   476  			default:
   477  				continue
   478  			}
   479  
   480  			go func() {
   481  				//nolint:errcheck
   482  				af.doRefreshRequest(ctx, t.url, true)
   483  				<-t.sem
   484  			}()
   485  		}
   486  	}
   487  }
   488  
   489  func (af *AutoRefresh) doRefreshRequest(ctx context.Context, url string, enableBackoff bool) error {
   490  	af.muRegistry.RLock()
   491  	t, ok := af.registry[url]
   492  
   493  	if !ok {
   494  		af.muRegistry.RUnlock()
   495  		return errors.Errorf(`url "%s" is not registered`, url)
   496  	}
   497  
   498  	// In case the refresh fails due to errors in fetching/parsing the JWKS,
   499  	// we want to retry. Create a backoff object,
   500  	parseOptions := t.parseOptions
   501  	fetchOptions := []FetchOption{WithHTTPClient(t.httpcl)}
   502  	if enableBackoff {
   503  		fetchOptions = append(fetchOptions, WithFetchBackoff(t.backoff))
   504  	}
   505  	if t.wl != nil {
   506  		fetchOptions = append(fetchOptions, WithFetchWhitelist(t.wl))
   507  	}
   508  	af.muRegistry.RUnlock()
   509  
   510  	res, err := fetch(ctx, url, fetchOptions...)
   511  	if err == nil {
   512  		if res.StatusCode != http.StatusOK {
   513  			// now, can there be a remote resource that responds with a status code
   514  			// other than 200 and still be valid...? naaaaaaahhhhhh....
   515  			err = errors.Errorf(`bad response status code (%d)`, res.StatusCode)
   516  		} else {
   517  			defer res.Body.Close()
   518  			keyset, parseErr := ParseReader(res.Body, parseOptions...)
   519  			if parseErr == nil {
   520  				// Got a new key set. replace the keyset in the target
   521  				af.muCache.Lock()
   522  				af.cache[url] = keyset
   523  				af.muCache.Unlock()
   524  				af.muRegistry.RLock()
   525  				nextInterval := calculateRefreshDuration(res, t.refreshInterval, t.minRefreshInterval)
   526  				af.muRegistry.RUnlock()
   527  				rtr := &resetTimerReq{
   528  					t: t,
   529  					d: nextInterval,
   530  				}
   531  				select {
   532  				case <-ctx.Done():
   533  					return ctx.Err()
   534  				case af.resetTimerCh <- rtr:
   535  				}
   536  
   537  				now := time.Now()
   538  				af.muRegistry.Lock()
   539  				t.lastRefresh = now.Local()
   540  				t.nextRefresh = now.Add(nextInterval).Local()
   541  				af.muRegistry.Unlock()
   542  				return nil
   543  			}
   544  			err = parseErr
   545  		}
   546  	}
   547  
   548  	// At this point if err != nil, we know that there was something wrong
   549  	// in either the fetching or the parsing. Send this error to be processed,
   550  	// but take the extra mileage to not block regular processing by
   551  	// discarding the error if we fail to send it through the channel
   552  	if err != nil {
   553  		select {
   554  		case af.errSink <- AutoRefreshError{Error: err, URL: url}:
   555  		default:
   556  		}
   557  	}
   558  
   559  	// We either failed to perform the HTTP GET, or we failed to parse the
   560  	// JWK set. Even in case of errors, we don't delete the old key.
   561  	// We persist the old key set, even if it may be stale so the user has something to work with
   562  	// TODO: maybe this behavior should be customizable?
   563  
   564  	// If we failed to get a single time, then queue another fetch in the future.
   565  	rtr := &resetTimerReq{
   566  		t: t,
   567  		d: calculateRefreshDuration(res, t.refreshInterval, t.minRefreshInterval),
   568  	}
   569  	select {
   570  	case <-ctx.Done():
   571  		return ctx.Err()
   572  	case af.resetTimerCh <- rtr:
   573  	}
   574  
   575  	return err
   576  }
   577  
   578  // ErrorSink sets a channel to receive JWK fetch errors, if any.
   579  // Only the errors that occurred *after* the channel was set  will be sent.
   580  //
   581  // The user is responsible for properly draining the channel. If the channel
   582  // is not drained properly, errors will be discarded.
   583  //
   584  // To disable, set a nil channel.
   585  func (af *AutoRefresh) ErrorSink(ch chan AutoRefreshError) {
   586  	af.muErrSink.Lock()
   587  	af.errSink = ch
   588  	af.muErrSink.Unlock()
   589  }
   590  
   591  func calculateRefreshDuration(res *http.Response, refreshInterval *time.Duration, minRefreshInterval time.Duration) time.Duration {
   592  	// This always has precedence
   593  	if refreshInterval != nil {
   594  		return *refreshInterval
   595  	}
   596  
   597  	if res != nil {
   598  		if v := res.Header.Get(`Cache-Control`); v != "" {
   599  			dir, err := httpcc.ParseResponse(v)
   600  			if err == nil {
   601  				maxAge, ok := dir.MaxAge()
   602  				if ok {
   603  					resDuration := time.Duration(maxAge) * time.Second
   604  					if resDuration > minRefreshInterval {
   605  						return resDuration
   606  					}
   607  					return minRefreshInterval
   608  				}
   609  				// fallthrough
   610  			}
   611  			// fallthrough
   612  		}
   613  
   614  		if v := res.Header.Get(`Expires`); v != "" {
   615  			expires, err := http.ParseTime(v)
   616  			if err == nil {
   617  				resDuration := time.Until(expires)
   618  				if resDuration > minRefreshInterval {
   619  					return resDuration
   620  				}
   621  				return minRefreshInterval
   622  			}
   623  			// fallthrough
   624  		}
   625  	}
   626  
   627  	// Previous fallthroughs are a little redandunt, but hey, it's all good.
   628  	return minRefreshInterval
   629  }
   630  
   631  // TargetSnapshot is the structure returned by the Snapshot method.
   632  // It contains information about a url that has been configured
   633  // in AutoRefresh.
   634  type TargetSnapshot struct {
   635  	URL         string
   636  	NextRefresh time.Time
   637  	LastRefresh time.Time
   638  }
   639  
   640  func (af *AutoRefresh) Snapshot() <-chan TargetSnapshot {
   641  	af.muRegistry.Lock()
   642  	ch := make(chan TargetSnapshot, len(af.registry))
   643  	for url, t := range af.registry {
   644  		ch <- TargetSnapshot{
   645  			URL:         url,
   646  			NextRefresh: t.nextRefresh,
   647  			LastRefresh: t.lastRefresh,
   648  		}
   649  	}
   650  	af.muRegistry.Unlock()
   651  	close(ch)
   652  	return ch
   653  }
   654  

View as plain text