...

Source file src/github.com/aws/aws-sdk-go-v2/internal/sync/singleflight/singleflight.go

Documentation: github.com/aws/aws-sdk-go-v2/internal/sync/singleflight

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package singleflight
     6  
     7  import (
     8  	"bytes"
     9  	"errors"
    10  	"fmt"
    11  	"runtime"
    12  	"runtime/debug"
    13  	"sync"
    14  )
    15  
    16  // errGoexit indicates the runtime.Goexit was called in
    17  // the user given function.
    18  var errGoexit = errors.New("runtime.Goexit was called")
    19  
    20  // A panicError is an arbitrary value recovered from a panic
    21  // with the stack trace during the execution of given function.
    22  type panicError struct {
    23  	value interface{}
    24  	stack []byte
    25  }
    26  
    27  // Error implements error interface.
    28  func (p *panicError) Error() string {
    29  	return fmt.Sprintf("%v\n\n%s", p.value, p.stack)
    30  }
    31  
    32  func newPanicError(v interface{}) error {
    33  	stack := debug.Stack()
    34  
    35  	// The first line of the stack trace is of the form "goroutine N [status]:"
    36  	// but by the time the panic reaches Do the goroutine may no longer exist
    37  	// and its status will have changed. Trim out the misleading line.
    38  	if line := bytes.IndexByte(stack[:], '\n'); line >= 0 {
    39  		stack = stack[line+1:]
    40  	}
    41  	return &panicError{value: v, stack: stack}
    42  }
    43  
    44  // call is an in-flight or completed singleflight.Do call
    45  type call struct {
    46  	wg sync.WaitGroup
    47  
    48  	// These fields are written once before the WaitGroup is done
    49  	// and are only read after the WaitGroup is done.
    50  	val interface{}
    51  	err error
    52  
    53  	// forgotten indicates whether Forget was called with this call's key
    54  	// while the call was still in flight.
    55  	forgotten bool
    56  
    57  	// These fields are read and written with the singleflight
    58  	// mutex held before the WaitGroup is done, and are read but
    59  	// not written after the WaitGroup is done.
    60  	dups  int
    61  	chans []chan<- Result
    62  }
    63  
    64  // Group represents a class of work and forms a namespace in
    65  // which units of work can be executed with duplicate suppression.
    66  type Group struct {
    67  	mu sync.Mutex       // protects m
    68  	m  map[string]*call // lazily initialized
    69  }
    70  
    71  // Result holds the results of Do, so they can be passed
    72  // on a channel.
    73  type Result struct {
    74  	Val    interface{}
    75  	Err    error
    76  	Shared bool
    77  }
    78  
    79  // Do executes and returns the results of the given function, making
    80  // sure that only one execution is in-flight for a given key at a
    81  // time. If a duplicate comes in, the duplicate caller waits for the
    82  // original to complete and receives the same results.
    83  // The return value shared indicates whether v was given to multiple callers.
    84  func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) {
    85  	g.mu.Lock()
    86  	if g.m == nil {
    87  		g.m = make(map[string]*call)
    88  	}
    89  	if c, ok := g.m[key]; ok {
    90  		c.dups++
    91  		g.mu.Unlock()
    92  		c.wg.Wait()
    93  
    94  		if e, ok := c.err.(*panicError); ok {
    95  			panic(e)
    96  		} else if c.err == errGoexit {
    97  			runtime.Goexit()
    98  		}
    99  		return c.val, c.err, true
   100  	}
   101  	c := new(call)
   102  	c.wg.Add(1)
   103  	g.m[key] = c
   104  	g.mu.Unlock()
   105  
   106  	g.doCall(c, key, fn)
   107  	return c.val, c.err, c.dups > 0
   108  }
   109  
   110  // DoChan is like Do but returns a channel that will receive the
   111  // results when they are ready.
   112  //
   113  // The returned channel will not be closed.
   114  func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result {
   115  	ch := make(chan Result, 1)
   116  	g.mu.Lock()
   117  	if g.m == nil {
   118  		g.m = make(map[string]*call)
   119  	}
   120  	if c, ok := g.m[key]; ok {
   121  		c.dups++
   122  		c.chans = append(c.chans, ch)
   123  		g.mu.Unlock()
   124  		return ch
   125  	}
   126  	c := &call{chans: []chan<- Result{ch}}
   127  	c.wg.Add(1)
   128  	g.m[key] = c
   129  	g.mu.Unlock()
   130  
   131  	go g.doCall(c, key, fn)
   132  
   133  	return ch
   134  }
   135  
   136  // doCall handles the single call for a key.
   137  func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) {
   138  	normalReturn := false
   139  	recovered := false
   140  
   141  	// use double-defer to distinguish panic from runtime.Goexit,
   142  	// more details see https://golang.org/cl/134395
   143  	defer func() {
   144  		// the given function invoked runtime.Goexit
   145  		if !normalReturn && !recovered {
   146  			c.err = errGoexit
   147  		}
   148  
   149  		c.wg.Done()
   150  		g.mu.Lock()
   151  		defer g.mu.Unlock()
   152  		if !c.forgotten {
   153  			delete(g.m, key)
   154  		}
   155  
   156  		if e, ok := c.err.(*panicError); ok {
   157  			// In order to prevent the waiting channels from being blocked forever,
   158  			// needs to ensure that this panic cannot be recovered.
   159  			if len(c.chans) > 0 {
   160  				go panic(e)
   161  				select {} // Keep this goroutine around so that it will appear in the crash dump.
   162  			} else {
   163  				panic(e)
   164  			}
   165  		} else if c.err == errGoexit {
   166  			// Already in the process of goexit, no need to call again
   167  		} else {
   168  			// Normal return
   169  			for _, ch := range c.chans {
   170  				ch <- Result{c.val, c.err, c.dups > 0}
   171  			}
   172  		}
   173  	}()
   174  
   175  	func() {
   176  		defer func() {
   177  			if !normalReturn {
   178  				// Ideally, we would wait to take a stack trace until we've determined
   179  				// whether this is a panic or a runtime.Goexit.
   180  				//
   181  				// Unfortunately, the only way we can distinguish the two is to see
   182  				// whether the recover stopped the goroutine from terminating, and by
   183  				// the time we know that, the part of the stack trace relevant to the
   184  				// panic has been discarded.
   185  				if r := recover(); r != nil {
   186  					c.err = newPanicError(r)
   187  				}
   188  			}
   189  		}()
   190  
   191  		c.val, c.err = fn()
   192  		normalReturn = true
   193  	}()
   194  
   195  	if !normalReturn {
   196  		recovered = true
   197  	}
   198  }
   199  
   200  // Forget tells the singleflight to forget about a key.  Future calls
   201  // to Do for this key will call the function rather than waiting for
   202  // an earlier call to complete.
   203  func (g *Group) Forget(key string) {
   204  	g.mu.Lock()
   205  	if c, ok := g.m[key]; ok {
   206  		c.forgotten = true
   207  	}
   208  	delete(g.m, key)
   209  	g.mu.Unlock()
   210  }
   211  

View as plain text