...

Source file src/github.com/docker/distribution/registry/handlers/app.go

Documentation: github.com/docker/distribution/registry/handlers

     1  package handlers
     2  
     3  import (
     4  	"context"
     5  	"crypto/rand"
     6  	"expvar"
     7  	"fmt"
     8  	"math"
     9  	"math/big"
    10  	"net"
    11  	"net/http"
    12  	"net/url"
    13  	"os"
    14  	"regexp"
    15  	"runtime"
    16  	"strings"
    17  	"time"
    18  
    19  	"github.com/distribution/reference"
    20  	"github.com/docker/distribution"
    21  	"github.com/docker/distribution/configuration"
    22  	dcontext "github.com/docker/distribution/context"
    23  	"github.com/docker/distribution/health"
    24  	"github.com/docker/distribution/health/checks"
    25  	prometheus "github.com/docker/distribution/metrics"
    26  	"github.com/docker/distribution/notifications"
    27  	"github.com/docker/distribution/registry/api/errcode"
    28  	v2 "github.com/docker/distribution/registry/api/v2"
    29  	"github.com/docker/distribution/registry/auth"
    30  	registrymiddleware "github.com/docker/distribution/registry/middleware/registry"
    31  	repositorymiddleware "github.com/docker/distribution/registry/middleware/repository"
    32  	"github.com/docker/distribution/registry/proxy"
    33  	"github.com/docker/distribution/registry/storage"
    34  	memorycache "github.com/docker/distribution/registry/storage/cache/memory"
    35  	rediscache "github.com/docker/distribution/registry/storage/cache/redis"
    36  	storagedriver "github.com/docker/distribution/registry/storage/driver"
    37  	"github.com/docker/distribution/registry/storage/driver/factory"
    38  	storagemiddleware "github.com/docker/distribution/registry/storage/driver/middleware"
    39  	"github.com/docker/distribution/version"
    40  	"github.com/docker/go-metrics"
    41  	"github.com/docker/libtrust"
    42  	"github.com/garyburd/redigo/redis"
    43  	"github.com/gorilla/mux"
    44  	"github.com/sirupsen/logrus"
    45  )
    46  
    47  // randomSecretSize is the number of random bytes to generate if no secret
    48  // was specified.
    49  const randomSecretSize = 32
    50  
    51  // defaultCheckInterval is the default time in between health checks
    52  const defaultCheckInterval = 10 * time.Second
    53  
    54  // App is a global registry application object. Shared resources can be placed
    55  // on this object that will be accessible from all requests. Any writable
    56  // fields should be protected.
    57  type App struct {
    58  	context.Context
    59  
    60  	Config *configuration.Configuration
    61  
    62  	router           *mux.Router                    // main application router, configured with dispatchers
    63  	driver           storagedriver.StorageDriver    // driver maintains the app global storage driver instance.
    64  	registry         distribution.Namespace         // registry is the primary registry backend for the app instance.
    65  	repoRemover      distribution.RepositoryRemover // repoRemover provides ability to delete repos
    66  	accessController auth.AccessController          // main access controller for application
    67  
    68  	// httpHost is a parsed representation of the http.host parameter from
    69  	// the configuration. Only the Scheme and Host fields are used.
    70  	httpHost url.URL
    71  
    72  	// events contains notification related configuration.
    73  	events struct {
    74  		sink   notifications.Sink
    75  		source notifications.SourceRecord
    76  	}
    77  
    78  	redis *redis.Pool
    79  
    80  	// trustKey is a deprecated key used to sign manifests converted to
    81  	// schema1 for backward compatibility. It should not be used for any
    82  	// other purposes.
    83  	trustKey libtrust.PrivateKey
    84  
    85  	// isCache is true if this registry is configured as a pull through cache
    86  	isCache bool
    87  
    88  	// readOnly is true if the registry is in a read-only maintenance mode
    89  	readOnly bool
    90  }
    91  
    92  // NewApp takes a configuration and returns a configured app, ready to serve
    93  // requests. The app only implements ServeHTTP and can be wrapped in other
    94  // handlers accordingly.
    95  func NewApp(ctx context.Context, config *configuration.Configuration) *App {
    96  	app := &App{
    97  		Config:  config,
    98  		Context: ctx,
    99  		router:  v2.RouterWithPrefix(config.HTTP.Prefix),
   100  		isCache: config.Proxy.RemoteURL != "",
   101  	}
   102  
   103  	// Register the handler dispatchers.
   104  	app.register(v2.RouteNameBase, func(ctx *Context, r *http.Request) http.Handler {
   105  		return http.HandlerFunc(apiBase)
   106  	})
   107  	app.register(v2.RouteNameManifest, manifestDispatcher)
   108  	app.register(v2.RouteNameCatalog, catalogDispatcher)
   109  	app.register(v2.RouteNameTags, tagsDispatcher)
   110  	app.register(v2.RouteNameBlob, blobDispatcher)
   111  	app.register(v2.RouteNameBlobUpload, blobUploadDispatcher)
   112  	app.register(v2.RouteNameBlobUploadChunk, blobUploadDispatcher)
   113  
   114  	// override the storage driver's UA string for registry outbound HTTP requests
   115  	storageParams := config.Storage.Parameters()
   116  	if storageParams == nil {
   117  		storageParams = make(configuration.Parameters)
   118  	}
   119  	storageParams["useragent"] = fmt.Sprintf("docker-distribution/%s %s", version.Version, runtime.Version())
   120  
   121  	var err error
   122  	app.driver, err = factory.Create(config.Storage.Type(), storageParams)
   123  	if err != nil {
   124  		// TODO(stevvooe): Move the creation of a service into a protected
   125  		// method, where this is created lazily. Its status can be queried via
   126  		// a health check.
   127  		panic(err)
   128  	}
   129  
   130  	purgeConfig := uploadPurgeDefaultConfig()
   131  	if mc, ok := config.Storage["maintenance"]; ok {
   132  		if v, ok := mc["uploadpurging"]; ok {
   133  			purgeConfig, ok = v.(map[interface{}]interface{})
   134  			if !ok {
   135  				panic("uploadpurging config key must contain additional keys")
   136  			}
   137  		}
   138  		if v, ok := mc["readonly"]; ok {
   139  			readOnly, ok := v.(map[interface{}]interface{})
   140  			if !ok {
   141  				panic("readonly config key must contain additional keys")
   142  			}
   143  			if readOnlyEnabled, ok := readOnly["enabled"]; ok {
   144  				app.readOnly, ok = readOnlyEnabled.(bool)
   145  				if !ok {
   146  					panic("readonly's enabled config key must have a boolean value")
   147  				}
   148  			}
   149  		}
   150  	}
   151  
   152  	startUploadPurger(app, app.driver, dcontext.GetLogger(app), purgeConfig)
   153  
   154  	app.driver, err = applyStorageMiddleware(app.driver, config.Middleware["storage"])
   155  	if err != nil {
   156  		panic(err)
   157  	}
   158  
   159  	app.configureSecret(config)
   160  	app.configureEvents(config)
   161  	app.configureRedis(config)
   162  	app.configureLogHook(config)
   163  
   164  	options := registrymiddleware.GetRegistryOptions()
   165  	if config.Compatibility.Schema1.TrustKey != "" {
   166  		app.trustKey, err = libtrust.LoadKeyFile(config.Compatibility.Schema1.TrustKey)
   167  		if err != nil {
   168  			panic(fmt.Sprintf(`could not load schema1 "signingkey" parameter: %v`, err))
   169  		}
   170  	} else {
   171  		// Generate an ephemeral key to be used for signing converted manifests
   172  		// for clients that don't support schema2.
   173  		app.trustKey, err = libtrust.GenerateECP256PrivateKey()
   174  		if err != nil {
   175  			panic(err)
   176  		}
   177  	}
   178  
   179  	options = append(options, storage.Schema1SigningKey(app.trustKey))
   180  
   181  	if config.Compatibility.Schema1.Enabled {
   182  		options = append(options, storage.EnableSchema1)
   183  	}
   184  
   185  	if config.HTTP.Host != "" {
   186  		u, err := url.Parse(config.HTTP.Host)
   187  		if err != nil {
   188  			panic(fmt.Sprintf(`could not parse http "host" parameter: %v`, err))
   189  		}
   190  		app.httpHost = *u
   191  	}
   192  
   193  	if app.isCache {
   194  		options = append(options, storage.DisableDigestResumption)
   195  	}
   196  
   197  	// configure deletion
   198  	if d, ok := config.Storage["delete"]; ok {
   199  		e, ok := d["enabled"]
   200  		if ok {
   201  			if deleteEnabled, ok := e.(bool); ok && deleteEnabled {
   202  				options = append(options, storage.EnableDelete)
   203  			}
   204  		}
   205  	}
   206  
   207  	// configure redirects
   208  	var redirectDisabled bool
   209  	if redirectConfig, ok := config.Storage["redirect"]; ok {
   210  		v := redirectConfig["disable"]
   211  		switch v := v.(type) {
   212  		case bool:
   213  			redirectDisabled = v
   214  		default:
   215  			panic(fmt.Sprintf("invalid type for redirect config: %#v", redirectConfig))
   216  		}
   217  	}
   218  	if redirectDisabled {
   219  		dcontext.GetLogger(app).Infof("backend redirection disabled")
   220  	} else {
   221  		options = append(options, storage.EnableRedirect)
   222  	}
   223  
   224  	if !config.Validation.Enabled {
   225  		config.Validation.Enabled = !config.Validation.Disabled
   226  	}
   227  
   228  	// configure validation
   229  	if config.Validation.Enabled {
   230  		if len(config.Validation.Manifests.URLs.Allow) == 0 && len(config.Validation.Manifests.URLs.Deny) == 0 {
   231  			// If Allow and Deny are empty, allow nothing.
   232  			options = append(options, storage.ManifestURLsAllowRegexp(regexp.MustCompile("^$")))
   233  		} else {
   234  			if len(config.Validation.Manifests.URLs.Allow) > 0 {
   235  				for i, s := range config.Validation.Manifests.URLs.Allow {
   236  					// Validate via compilation.
   237  					if _, err := regexp.Compile(s); err != nil {
   238  						panic(fmt.Sprintf("validation.manifests.urls.allow: %s", err))
   239  					}
   240  					// Wrap with non-capturing group.
   241  					config.Validation.Manifests.URLs.Allow[i] = fmt.Sprintf("(?:%s)", s)
   242  				}
   243  				re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Allow, "|"))
   244  				options = append(options, storage.ManifestURLsAllowRegexp(re))
   245  			}
   246  			if len(config.Validation.Manifests.URLs.Deny) > 0 {
   247  				for i, s := range config.Validation.Manifests.URLs.Deny {
   248  					// Validate via compilation.
   249  					if _, err := regexp.Compile(s); err != nil {
   250  						panic(fmt.Sprintf("validation.manifests.urls.deny: %s", err))
   251  					}
   252  					// Wrap with non-capturing group.
   253  					config.Validation.Manifests.URLs.Deny[i] = fmt.Sprintf("(?:%s)", s)
   254  				}
   255  				re := regexp.MustCompile(strings.Join(config.Validation.Manifests.URLs.Deny, "|"))
   256  				options = append(options, storage.ManifestURLsDenyRegexp(re))
   257  			}
   258  		}
   259  	}
   260  
   261  	// configure storage caches
   262  	if cc, ok := config.Storage["cache"]; ok {
   263  		v, ok := cc["blobdescriptor"]
   264  		if !ok {
   265  			// Backwards compatible: "layerinfo" == "blobdescriptor"
   266  			v = cc["layerinfo"]
   267  		}
   268  
   269  		switch v {
   270  		case "redis":
   271  			if app.redis == nil {
   272  				panic("redis configuration required to use for layerinfo cache")
   273  			}
   274  			cacheProvider := rediscache.NewRedisBlobDescriptorCacheProvider(app.redis)
   275  			localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
   276  			app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
   277  			if err != nil {
   278  				panic("could not create registry: " + err.Error())
   279  			}
   280  			dcontext.GetLogger(app).Infof("using redis blob descriptor cache")
   281  		case "inmemory":
   282  			cacheProvider := memorycache.NewInMemoryBlobDescriptorCacheProvider()
   283  			localOptions := append(options, storage.BlobDescriptorCacheProvider(cacheProvider))
   284  			app.registry, err = storage.NewRegistry(app, app.driver, localOptions...)
   285  			if err != nil {
   286  				panic("could not create registry: " + err.Error())
   287  			}
   288  			dcontext.GetLogger(app).Infof("using inmemory blob descriptor cache")
   289  		default:
   290  			if v != "" {
   291  				dcontext.GetLogger(app).Warnf("unknown cache type %q, caching disabled", config.Storage["cache"])
   292  			}
   293  		}
   294  	}
   295  
   296  	if app.registry == nil {
   297  		// configure the registry if no cache section is available.
   298  		app.registry, err = storage.NewRegistry(app.Context, app.driver, options...)
   299  		if err != nil {
   300  			panic("could not create registry: " + err.Error())
   301  		}
   302  	}
   303  
   304  	app.registry, err = applyRegistryMiddleware(app, app.registry, config.Middleware["registry"])
   305  	if err != nil {
   306  		panic(err)
   307  	}
   308  
   309  	authType := config.Auth.Type()
   310  
   311  	if authType != "" && !strings.EqualFold(authType, "none") {
   312  		accessController, err := auth.GetAccessController(config.Auth.Type(), config.Auth.Parameters())
   313  		if err != nil {
   314  			panic(fmt.Sprintf("unable to configure authorization (%s): %v", authType, err))
   315  		}
   316  		app.accessController = accessController
   317  		dcontext.GetLogger(app).Debugf("configured %q access controller", authType)
   318  	}
   319  
   320  	// configure as a pull through cache
   321  	if config.Proxy.RemoteURL != "" {
   322  		app.registry, err = proxy.NewRegistryPullThroughCache(ctx, app.registry, app.driver, config.Proxy)
   323  		if err != nil {
   324  			panic(err.Error())
   325  		}
   326  		app.isCache = true
   327  		dcontext.GetLogger(app).Info("Registry configured as a proxy cache to ", config.Proxy.RemoteURL)
   328  	}
   329  	var ok bool
   330  	app.repoRemover, ok = app.registry.(distribution.RepositoryRemover)
   331  	if !ok {
   332  		dcontext.GetLogger(app).Warnf("Registry does not implement RempositoryRemover. Will not be able to delete repos and tags")
   333  	}
   334  
   335  	return app
   336  }
   337  
   338  // RegisterHealthChecks is an awful hack to defer health check registration
   339  // control to callers. This should only ever be called once per registry
   340  // process, typically in a main function. The correct way would be register
   341  // health checks outside of app, since multiple apps may exist in the same
   342  // process. Because the configuration and app are tightly coupled,
   343  // implementing this properly will require a refactor. This method may panic
   344  // if called twice in the same process.
   345  func (app *App) RegisterHealthChecks(healthRegistries ...*health.Registry) {
   346  	if len(healthRegistries) > 1 {
   347  		panic("RegisterHealthChecks called with more than one registry")
   348  	}
   349  	healthRegistry := health.DefaultRegistry
   350  	if len(healthRegistries) == 1 {
   351  		healthRegistry = healthRegistries[0]
   352  	}
   353  
   354  	if app.Config.Health.StorageDriver.Enabled {
   355  		interval := app.Config.Health.StorageDriver.Interval
   356  		if interval == 0 {
   357  			interval = defaultCheckInterval
   358  		}
   359  
   360  		storageDriverCheck := func() error {
   361  			_, err := app.driver.Stat(app, "/") // "/" should always exist
   362  			if _, ok := err.(storagedriver.PathNotFoundError); ok {
   363  				err = nil // pass this through, backend is responding, but this path doesn't exist.
   364  			}
   365  			return err
   366  		}
   367  
   368  		if app.Config.Health.StorageDriver.Threshold != 0 {
   369  			healthRegistry.RegisterPeriodicThresholdFunc("storagedriver_"+app.Config.Storage.Type(), interval, app.Config.Health.StorageDriver.Threshold, storageDriverCheck)
   370  		} else {
   371  			healthRegistry.RegisterPeriodicFunc("storagedriver_"+app.Config.Storage.Type(), interval, storageDriverCheck)
   372  		}
   373  	}
   374  
   375  	for _, fileChecker := range app.Config.Health.FileCheckers {
   376  		interval := fileChecker.Interval
   377  		if interval == 0 {
   378  			interval = defaultCheckInterval
   379  		}
   380  		dcontext.GetLogger(app).Infof("configuring file health check path=%s, interval=%d", fileChecker.File, interval/time.Second)
   381  		healthRegistry.Register(fileChecker.File, health.PeriodicChecker(checks.FileChecker(fileChecker.File), interval))
   382  	}
   383  
   384  	for _, httpChecker := range app.Config.Health.HTTPCheckers {
   385  		interval := httpChecker.Interval
   386  		if interval == 0 {
   387  			interval = defaultCheckInterval
   388  		}
   389  
   390  		statusCode := httpChecker.StatusCode
   391  		if statusCode == 0 {
   392  			statusCode = 200
   393  		}
   394  
   395  		checker := checks.HTTPChecker(httpChecker.URI, statusCode, httpChecker.Timeout, httpChecker.Headers)
   396  
   397  		if httpChecker.Threshold != 0 {
   398  			dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d, threshold=%d", httpChecker.URI, interval/time.Second, httpChecker.Threshold)
   399  			healthRegistry.Register(httpChecker.URI, health.PeriodicThresholdChecker(checker, interval, httpChecker.Threshold))
   400  		} else {
   401  			dcontext.GetLogger(app).Infof("configuring HTTP health check uri=%s, interval=%d", httpChecker.URI, interval/time.Second)
   402  			healthRegistry.Register(httpChecker.URI, health.PeriodicChecker(checker, interval))
   403  		}
   404  	}
   405  
   406  	for _, tcpChecker := range app.Config.Health.TCPCheckers {
   407  		interval := tcpChecker.Interval
   408  		if interval == 0 {
   409  			interval = defaultCheckInterval
   410  		}
   411  
   412  		checker := checks.TCPChecker(tcpChecker.Addr, tcpChecker.Timeout)
   413  
   414  		if tcpChecker.Threshold != 0 {
   415  			dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d, threshold=%d", tcpChecker.Addr, interval/time.Second, tcpChecker.Threshold)
   416  			healthRegistry.Register(tcpChecker.Addr, health.PeriodicThresholdChecker(checker, interval, tcpChecker.Threshold))
   417  		} else {
   418  			dcontext.GetLogger(app).Infof("configuring TCP health check addr=%s, interval=%d", tcpChecker.Addr, interval/time.Second)
   419  			healthRegistry.Register(tcpChecker.Addr, health.PeriodicChecker(checker, interval))
   420  		}
   421  	}
   422  }
   423  
   424  // register a handler with the application, by route name. The handler will be
   425  // passed through the application filters and context will be constructed at
   426  // request time.
   427  func (app *App) register(routeName string, dispatch dispatchFunc) {
   428  	handler := app.dispatcher(dispatch)
   429  
   430  	// Chain the handler with prometheus instrumented handler
   431  	if app.Config.HTTP.Debug.Prometheus.Enabled {
   432  		namespace := metrics.NewNamespace(prometheus.NamespacePrefix, "http", nil)
   433  		httpMetrics := namespace.NewDefaultHttpMetrics(strings.Replace(routeName, "-", "_", -1))
   434  		metrics.Register(namespace)
   435  		handler = metrics.InstrumentHandler(httpMetrics, handler)
   436  	}
   437  
   438  	// TODO(stevvooe): This odd dispatcher/route registration is by-product of
   439  	// some limitations in the gorilla/mux router. We are using it to keep
   440  	// routing consistent between the client and server, but we may want to
   441  	// replace it with manual routing and structure-based dispatch for better
   442  	// control over the request execution.
   443  
   444  	app.router.GetRoute(routeName).Handler(handler)
   445  }
   446  
   447  // configureEvents prepares the event sink for action.
   448  func (app *App) configureEvents(configuration *configuration.Configuration) {
   449  	// Configure all of the endpoint sinks.
   450  	var sinks []notifications.Sink
   451  	for _, endpoint := range configuration.Notifications.Endpoints {
   452  		if endpoint.Disabled {
   453  			dcontext.GetLogger(app).Infof("endpoint %s disabled, skipping", endpoint.Name)
   454  			continue
   455  		}
   456  
   457  		dcontext.GetLogger(app).Infof("configuring endpoint %v (%v), timeout=%s, headers=%v", endpoint.Name, endpoint.URL, endpoint.Timeout, endpoint.Headers)
   458  		endpoint := notifications.NewEndpoint(endpoint.Name, endpoint.URL, notifications.EndpointConfig{
   459  			Timeout:           endpoint.Timeout,
   460  			Threshold:         endpoint.Threshold,
   461  			Backoff:           endpoint.Backoff,
   462  			Headers:           endpoint.Headers,
   463  			IgnoredMediaTypes: endpoint.IgnoredMediaTypes,
   464  			Ignore:            endpoint.Ignore,
   465  		})
   466  
   467  		sinks = append(sinks, endpoint)
   468  	}
   469  
   470  	// NOTE(stevvooe): Moving to a new queuing implementation is as easy as
   471  	// replacing broadcaster with a rabbitmq implementation. It's recommended
   472  	// that the registry instances also act as the workers to keep deployment
   473  	// simple.
   474  	app.events.sink = notifications.NewBroadcaster(sinks...)
   475  
   476  	// Populate registry event source
   477  	hostname, err := os.Hostname()
   478  	if err != nil {
   479  		hostname = configuration.HTTP.Addr
   480  	} else {
   481  		// try to pick the port off the config
   482  		_, port, err := net.SplitHostPort(configuration.HTTP.Addr)
   483  		if err == nil {
   484  			hostname = net.JoinHostPort(hostname, port)
   485  		}
   486  	}
   487  
   488  	app.events.source = notifications.SourceRecord{
   489  		Addr:       hostname,
   490  		InstanceID: dcontext.GetStringValue(app, "instance.id"),
   491  	}
   492  }
   493  
   494  type redisStartAtKey struct{}
   495  
   496  func (app *App) configureRedis(configuration *configuration.Configuration) {
   497  	if configuration.Redis.Addr == "" {
   498  		dcontext.GetLogger(app).Infof("redis not configured")
   499  		return
   500  	}
   501  
   502  	pool := &redis.Pool{
   503  		Dial: func() (redis.Conn, error) {
   504  			// TODO(stevvooe): Yet another use case for contextual timing.
   505  			ctx := context.WithValue(app, redisStartAtKey{}, time.Now())
   506  
   507  			done := func(err error) {
   508  				logger := dcontext.GetLoggerWithField(ctx, "redis.connect.duration",
   509  					dcontext.Since(ctx, redisStartAtKey{}))
   510  				if err != nil {
   511  					logger.Errorf("redis: error connecting: %v", err)
   512  				} else {
   513  					logger.Infof("redis: connect %v", configuration.Redis.Addr)
   514  				}
   515  			}
   516  
   517  			conn, err := redis.DialTimeout("tcp",
   518  				configuration.Redis.Addr,
   519  				configuration.Redis.DialTimeout,
   520  				configuration.Redis.ReadTimeout,
   521  				configuration.Redis.WriteTimeout)
   522  			if err != nil {
   523  				dcontext.GetLogger(app).Errorf("error connecting to redis instance %s: %v",
   524  					configuration.Redis.Addr, err)
   525  				done(err)
   526  				return nil, err
   527  			}
   528  
   529  			// authorize the connection
   530  			if configuration.Redis.Password != "" {
   531  				if _, err = conn.Do("AUTH", configuration.Redis.Password); err != nil {
   532  					defer conn.Close()
   533  					done(err)
   534  					return nil, err
   535  				}
   536  			}
   537  
   538  			// select the database to use
   539  			if configuration.Redis.DB != 0 {
   540  				if _, err = conn.Do("SELECT", configuration.Redis.DB); err != nil {
   541  					defer conn.Close()
   542  					done(err)
   543  					return nil, err
   544  				}
   545  			}
   546  
   547  			done(nil)
   548  			return conn, nil
   549  		},
   550  		MaxIdle:     configuration.Redis.Pool.MaxIdle,
   551  		MaxActive:   configuration.Redis.Pool.MaxActive,
   552  		IdleTimeout: configuration.Redis.Pool.IdleTimeout,
   553  		TestOnBorrow: func(c redis.Conn, t time.Time) error {
   554  			// TODO(stevvooe): We can probably do something more interesting
   555  			// here with the health package.
   556  			_, err := c.Do("PING")
   557  			return err
   558  		},
   559  		Wait: false, // if a connection is not available, proceed without cache.
   560  	}
   561  
   562  	app.redis = pool
   563  
   564  	// setup expvar
   565  	registry := expvar.Get("registry")
   566  	if registry == nil {
   567  		registry = expvar.NewMap("registry")
   568  	}
   569  
   570  	registry.(*expvar.Map).Set("redis", expvar.Func(func() interface{} {
   571  		return map[string]interface{}{
   572  			"Config": configuration.Redis,
   573  			"Active": app.redis.ActiveCount(),
   574  		}
   575  	}))
   576  }
   577  
   578  // configureLogHook prepares logging hook parameters.
   579  func (app *App) configureLogHook(configuration *configuration.Configuration) {
   580  	entry, ok := dcontext.GetLogger(app).(*logrus.Entry)
   581  	if !ok {
   582  		// somehow, we are not using logrus
   583  		return
   584  	}
   585  
   586  	logger := entry.Logger
   587  
   588  	for _, configHook := range configuration.Log.Hooks {
   589  		if !configHook.Disabled {
   590  			switch configHook.Type {
   591  			case "mail":
   592  				hook := &logHook{}
   593  				hook.LevelsParam = configHook.Levels
   594  				hook.Mail = &mailer{
   595  					Addr:     configHook.MailOptions.SMTP.Addr,
   596  					Username: configHook.MailOptions.SMTP.Username,
   597  					Password: configHook.MailOptions.SMTP.Password,
   598  					Insecure: configHook.MailOptions.SMTP.Insecure,
   599  					From:     configHook.MailOptions.From,
   600  					To:       configHook.MailOptions.To,
   601  				}
   602  				logger.Hooks.Add(hook)
   603  			default:
   604  			}
   605  		}
   606  	}
   607  }
   608  
   609  // configureSecret creates a random secret if a secret wasn't included in the
   610  // configuration.
   611  func (app *App) configureSecret(configuration *configuration.Configuration) {
   612  	if configuration.HTTP.Secret == "" {
   613  		var secretBytes [randomSecretSize]byte
   614  		if _, err := rand.Read(secretBytes[:]); err != nil {
   615  			panic(fmt.Sprintf("could not generate random bytes for HTTP secret: %v", err))
   616  		}
   617  		configuration.HTTP.Secret = string(secretBytes[:])
   618  		dcontext.GetLogger(app).Warn("No HTTP secret provided - generated random secret. This may cause problems with uploads if multiple registries are behind a load-balancer. To provide a shared secret, fill in http.secret in the configuration file or set the REGISTRY_HTTP_SECRET environment variable.")
   619  	}
   620  }
   621  
   622  func (app *App) ServeHTTP(w http.ResponseWriter, r *http.Request) {
   623  	// Prepare the context with our own little decorations.
   624  	ctx := r.Context()
   625  	ctx = dcontext.WithRequest(ctx, r)
   626  	ctx, w = dcontext.WithResponseWriter(ctx, w)
   627  	ctx = dcontext.WithLogger(ctx, dcontext.GetRequestLogger(ctx))
   628  	r = r.WithContext(ctx)
   629  
   630  	defer func() {
   631  		status, ok := ctx.Value("http.response.status").(int)
   632  		if ok && status >= 200 && status <= 399 {
   633  			dcontext.GetResponseLogger(r.Context()).Infof("response completed")
   634  		}
   635  	}()
   636  
   637  	// Set a header with the Docker Distribution API Version for all responses.
   638  	w.Header().Add("Docker-Distribution-API-Version", "registry/2.0")
   639  	app.router.ServeHTTP(w, r)
   640  }
   641  
   642  // dispatchFunc takes a context and request and returns a constructed handler
   643  // for the route. The dispatcher will use this to dynamically create request
   644  // specific handlers for each endpoint without creating a new router for each
   645  // request.
   646  type dispatchFunc func(ctx *Context, r *http.Request) http.Handler
   647  
   648  // TODO(stevvooe): dispatchers should probably have some validation error
   649  // chain with proper error reporting.
   650  
   651  // dispatcher returns a handler that constructs a request specific context and
   652  // handler, using the dispatch factory function.
   653  func (app *App) dispatcher(dispatch dispatchFunc) http.Handler {
   654  	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   655  		for headerName, headerValues := range app.Config.HTTP.Headers {
   656  			for _, value := range headerValues {
   657  				w.Header().Add(headerName, value)
   658  			}
   659  		}
   660  
   661  		context := app.context(w, r)
   662  
   663  		if err := app.authorized(w, r, context); err != nil {
   664  			dcontext.GetLogger(context).Warnf("error authorizing context: %v", err)
   665  			return
   666  		}
   667  
   668  		// Add username to request logging
   669  		context.Context = dcontext.WithLogger(context.Context, dcontext.GetLogger(context.Context, auth.UserNameKey))
   670  
   671  		// sync up context on the request.
   672  		r = r.WithContext(context)
   673  
   674  		if app.nameRequired(r) {
   675  			nameRef, err := reference.WithName(getName(context))
   676  			if err != nil {
   677  				dcontext.GetLogger(context).Errorf("error parsing reference from context: %v", err)
   678  				context.Errors = append(context.Errors, distribution.ErrRepositoryNameInvalid{
   679  					Name:   getName(context),
   680  					Reason: err,
   681  				})
   682  				if err := errcode.ServeJSON(w, context.Errors); err != nil {
   683  					dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   684  				}
   685  				return
   686  			}
   687  			repository, err := app.registry.Repository(context, nameRef)
   688  
   689  			if err != nil {
   690  				dcontext.GetLogger(context).Errorf("error resolving repository: %v", err)
   691  
   692  				switch err := err.(type) {
   693  				case distribution.ErrRepositoryUnknown:
   694  					context.Errors = append(context.Errors, v2.ErrorCodeNameUnknown.WithDetail(err))
   695  				case distribution.ErrRepositoryNameInvalid:
   696  					context.Errors = append(context.Errors, v2.ErrorCodeNameInvalid.WithDetail(err))
   697  				case errcode.Error:
   698  					context.Errors = append(context.Errors, err)
   699  				}
   700  
   701  				if err := errcode.ServeJSON(w, context.Errors); err != nil {
   702  					dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   703  				}
   704  				return
   705  			}
   706  
   707  			// assign and decorate the authorized repository with an event bridge.
   708  			context.Repository, context.RepositoryRemover = notifications.Listen(
   709  				repository,
   710  				context.App.repoRemover,
   711  				app.eventBridge(context, r))
   712  
   713  			context.Repository, err = applyRepoMiddleware(app, context.Repository, app.Config.Middleware["repository"])
   714  			if err != nil {
   715  				dcontext.GetLogger(context).Errorf("error initializing repository middleware: %v", err)
   716  				context.Errors = append(context.Errors, errcode.ErrorCodeUnknown.WithDetail(err))
   717  
   718  				if err := errcode.ServeJSON(w, context.Errors); err != nil {
   719  					dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   720  				}
   721  				return
   722  			}
   723  		}
   724  
   725  		dispatch(context, r).ServeHTTP(w, r)
   726  		// Automated error response handling here. Handlers may return their
   727  		// own errors if they need different behavior (such as range errors
   728  		// for layer upload).
   729  		if context.Errors.Len() > 0 {
   730  			if err := errcode.ServeJSON(w, context.Errors); err != nil {
   731  				dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   732  			}
   733  
   734  			app.logError(context, context.Errors)
   735  		}
   736  	})
   737  }
   738  
   739  type errCodeKey struct{}
   740  
   741  func (errCodeKey) String() string { return "err.code" }
   742  
   743  type errMessageKey struct{}
   744  
   745  func (errMessageKey) String() string { return "err.message" }
   746  
   747  type errDetailKey struct{}
   748  
   749  func (errDetailKey) String() string { return "err.detail" }
   750  
   751  func (app *App) logError(ctx context.Context, errors errcode.Errors) {
   752  	for _, e1 := range errors {
   753  		var c context.Context
   754  
   755  		switch e := e1.(type) {
   756  		case errcode.Error:
   757  			c = context.WithValue(ctx, errCodeKey{}, e.Code)
   758  			c = context.WithValue(c, errMessageKey{}, e.Message)
   759  			c = context.WithValue(c, errDetailKey{}, e.Detail)
   760  		case errcode.ErrorCode:
   761  			c = context.WithValue(ctx, errCodeKey{}, e)
   762  			c = context.WithValue(c, errMessageKey{}, e.Message())
   763  		default:
   764  			// just normal go 'error'
   765  			c = context.WithValue(ctx, errCodeKey{}, errcode.ErrorCodeUnknown)
   766  			c = context.WithValue(c, errMessageKey{}, e.Error())
   767  		}
   768  
   769  		c = dcontext.WithLogger(c, dcontext.GetLogger(c,
   770  			errCodeKey{},
   771  			errMessageKey{},
   772  			errDetailKey{}))
   773  		dcontext.GetResponseLogger(c).Errorf("response completed with error")
   774  	}
   775  }
   776  
   777  // context constructs the context object for the application. This only be
   778  // called once per request.
   779  func (app *App) context(w http.ResponseWriter, r *http.Request) *Context {
   780  	ctx := r.Context()
   781  	ctx = dcontext.WithVars(ctx, r)
   782  	ctx = dcontext.WithLogger(ctx, dcontext.GetLogger(ctx,
   783  		"vars.name",
   784  		"vars.reference",
   785  		"vars.digest",
   786  		"vars.uuid"))
   787  
   788  	context := &Context{
   789  		App:     app,
   790  		Context: ctx,
   791  	}
   792  
   793  	if app.httpHost.Scheme != "" && app.httpHost.Host != "" {
   794  		// A "host" item in the configuration takes precedence over
   795  		// X-Forwarded-Proto and X-Forwarded-Host headers, and the
   796  		// hostname in the request.
   797  		context.urlBuilder = v2.NewURLBuilder(&app.httpHost, false)
   798  	} else {
   799  		context.urlBuilder = v2.NewURLBuilderFromRequest(r, app.Config.HTTP.RelativeURLs)
   800  	}
   801  
   802  	return context
   803  }
   804  
   805  // authorized checks if the request can proceed with access to the requested
   806  // repository. If it succeeds, the context may access the requested
   807  // repository. An error will be returned if access is not available.
   808  func (app *App) authorized(w http.ResponseWriter, r *http.Request, context *Context) error {
   809  	dcontext.GetLogger(context).Debug("authorizing request")
   810  	repo := getName(context)
   811  
   812  	if app.accessController == nil {
   813  		return nil // access controller is not enabled.
   814  	}
   815  
   816  	var accessRecords []auth.Access
   817  
   818  	if repo != "" {
   819  		accessRecords = appendAccessRecords(accessRecords, r.Method, repo)
   820  		if fromRepo := r.FormValue("from"); fromRepo != "" {
   821  			// mounting a blob from one repository to another requires pull (GET)
   822  			// access to the source repository.
   823  			accessRecords = appendAccessRecords(accessRecords, "GET", fromRepo)
   824  		}
   825  	} else {
   826  		// Only allow the name not to be set on the base route.
   827  		if app.nameRequired(r) {
   828  			// For this to be properly secured, repo must always be set for a
   829  			// resource that may make a modification. The only condition under
   830  			// which name is not set and we still allow access is when the
   831  			// base route is accessed. This section prevents us from making
   832  			// that mistake elsewhere in the code, allowing any operation to
   833  			// proceed.
   834  			if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized); err != nil {
   835  				dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   836  			}
   837  			return fmt.Errorf("forbidden: no repository name")
   838  		}
   839  		accessRecords = appendCatalogAccessRecord(accessRecords, r)
   840  	}
   841  
   842  	ctx, err := app.accessController.Authorized(context.Context, accessRecords...)
   843  	if err != nil {
   844  		switch err := err.(type) {
   845  		case auth.Challenge:
   846  			// Add the appropriate WWW-Auth header
   847  			err.SetHeaders(r, w)
   848  
   849  			if err := errcode.ServeJSON(w, errcode.ErrorCodeUnauthorized.WithDetail(accessRecords)); err != nil {
   850  				dcontext.GetLogger(context).Errorf("error serving error json: %v (from %v)", err, context.Errors)
   851  			}
   852  		default:
   853  			// This condition is a potential security problem either in
   854  			// the configuration or whatever is backing the access
   855  			// controller. Just return a bad request with no information
   856  			// to avoid exposure. The request should not proceed.
   857  			dcontext.GetLogger(context).Errorf("error checking authorization: %v", err)
   858  			w.WriteHeader(http.StatusBadRequest)
   859  		}
   860  
   861  		return err
   862  	}
   863  
   864  	dcontext.GetLogger(ctx).Info("authorized request")
   865  	// TODO(stevvooe): This pattern needs to be cleaned up a bit. One context
   866  	// should be replaced by another, rather than replacing the context on a
   867  	// mutable object.
   868  	context.Context = ctx
   869  	return nil
   870  }
   871  
   872  // eventBridge returns a bridge for the current request, configured with the
   873  // correct actor and source.
   874  func (app *App) eventBridge(ctx *Context, r *http.Request) notifications.Listener {
   875  	actor := notifications.ActorRecord{
   876  		Name: getUserName(ctx, r),
   877  	}
   878  	request := notifications.NewRequestRecord(dcontext.GetRequestID(ctx), r)
   879  
   880  	return notifications.NewBridge(ctx.urlBuilder, app.events.source, actor, request, app.events.sink, app.Config.Notifications.EventConfig.IncludeReferences)
   881  }
   882  
   883  // nameRequired returns true if the route requires a name.
   884  func (app *App) nameRequired(r *http.Request) bool {
   885  	route := mux.CurrentRoute(r)
   886  	if route == nil {
   887  		return true
   888  	}
   889  	routeName := route.GetName()
   890  	return routeName != v2.RouteNameBase && routeName != v2.RouteNameCatalog
   891  }
   892  
   893  // apiBase implements a simple yes-man for doing overall checks against the
   894  // api. This can support auth roundtrips to support docker login.
   895  func apiBase(w http.ResponseWriter, r *http.Request) {
   896  	const emptyJSON = "{}"
   897  	// Provide a simple /v2/ 200 OK response with empty json response.
   898  	w.Header().Set("Content-Type", "application/json; charset=utf-8")
   899  	w.Header().Set("Content-Length", fmt.Sprint(len(emptyJSON)))
   900  
   901  	fmt.Fprint(w, emptyJSON)
   902  }
   903  
   904  // appendAccessRecords checks the method and adds the appropriate Access records to the records list.
   905  func appendAccessRecords(records []auth.Access, method string, repo string) []auth.Access {
   906  	resource := auth.Resource{
   907  		Type: "repository",
   908  		Name: repo,
   909  	}
   910  
   911  	switch method {
   912  	case "GET", "HEAD":
   913  		records = append(records,
   914  			auth.Access{
   915  				Resource: resource,
   916  				Action:   "pull",
   917  			})
   918  	case "POST", "PUT", "PATCH":
   919  		records = append(records,
   920  			auth.Access{
   921  				Resource: resource,
   922  				Action:   "pull",
   923  			},
   924  			auth.Access{
   925  				Resource: resource,
   926  				Action:   "push",
   927  			})
   928  	case "DELETE":
   929  		records = append(records,
   930  			auth.Access{
   931  				Resource: resource,
   932  				Action:   "delete",
   933  			})
   934  	}
   935  	return records
   936  }
   937  
   938  // Add the access record for the catalog if it's our current route
   939  func appendCatalogAccessRecord(accessRecords []auth.Access, r *http.Request) []auth.Access {
   940  	route := mux.CurrentRoute(r)
   941  	routeName := route.GetName()
   942  
   943  	if routeName == v2.RouteNameCatalog {
   944  		resource := auth.Resource{
   945  			Type: "registry",
   946  			Name: "catalog",
   947  		}
   948  
   949  		accessRecords = append(accessRecords,
   950  			auth.Access{
   951  				Resource: resource,
   952  				Action:   "*",
   953  			})
   954  	}
   955  	return accessRecords
   956  }
   957  
   958  // applyRegistryMiddleware wraps a registry instance with the configured middlewares
   959  func applyRegistryMiddleware(ctx context.Context, registry distribution.Namespace, middlewares []configuration.Middleware) (distribution.Namespace, error) {
   960  	for _, mw := range middlewares {
   961  		rmw, err := registrymiddleware.Get(ctx, mw.Name, mw.Options, registry)
   962  		if err != nil {
   963  			return nil, fmt.Errorf("unable to configure registry middleware (%s): %s", mw.Name, err)
   964  		}
   965  		registry = rmw
   966  	}
   967  	return registry, nil
   968  
   969  }
   970  
   971  // applyRepoMiddleware wraps a repository with the configured middlewares
   972  func applyRepoMiddleware(ctx context.Context, repository distribution.Repository, middlewares []configuration.Middleware) (distribution.Repository, error) {
   973  	for _, mw := range middlewares {
   974  		rmw, err := repositorymiddleware.Get(ctx, mw.Name, mw.Options, repository)
   975  		if err != nil {
   976  			return nil, err
   977  		}
   978  		repository = rmw
   979  	}
   980  	return repository, nil
   981  }
   982  
   983  // applyStorageMiddleware wraps a storage driver with the configured middlewares
   984  func applyStorageMiddleware(driver storagedriver.StorageDriver, middlewares []configuration.Middleware) (storagedriver.StorageDriver, error) {
   985  	for _, mw := range middlewares {
   986  		smw, err := storagemiddleware.Get(mw.Name, mw.Options, driver)
   987  		if err != nil {
   988  			return nil, fmt.Errorf("unable to configure storage middleware (%s): %v", mw.Name, err)
   989  		}
   990  		driver = smw
   991  	}
   992  	return driver, nil
   993  }
   994  
   995  // uploadPurgeDefaultConfig provides a default configuration for upload
   996  // purging to be used in the absence of configuration in the
   997  // configuration file
   998  func uploadPurgeDefaultConfig() map[interface{}]interface{} {
   999  	config := map[interface{}]interface{}{}
  1000  	config["enabled"] = true
  1001  	config["age"] = "168h"
  1002  	config["interval"] = "24h"
  1003  	config["dryrun"] = false
  1004  	return config
  1005  }
  1006  
  1007  func badPurgeUploadConfig(reason string) {
  1008  	panic(fmt.Sprintf("Unable to parse upload purge configuration: %s", reason))
  1009  }
  1010  
  1011  // startUploadPurger schedules a goroutine which will periodically
  1012  // check upload directories for old files and delete them
  1013  func startUploadPurger(ctx context.Context, storageDriver storagedriver.StorageDriver, log dcontext.Logger, config map[interface{}]interface{}) {
  1014  	if config["enabled"] == false {
  1015  		return
  1016  	}
  1017  
  1018  	var purgeAgeDuration time.Duration
  1019  	var err error
  1020  	purgeAge, ok := config["age"]
  1021  	if ok {
  1022  		ageStr, ok := purgeAge.(string)
  1023  		if !ok {
  1024  			badPurgeUploadConfig("age is not a string")
  1025  		}
  1026  		purgeAgeDuration, err = time.ParseDuration(ageStr)
  1027  		if err != nil {
  1028  			badPurgeUploadConfig(fmt.Sprintf("Cannot parse duration: %s", err.Error()))
  1029  		}
  1030  	} else {
  1031  		badPurgeUploadConfig("age missing")
  1032  	}
  1033  
  1034  	var intervalDuration time.Duration
  1035  	interval, ok := config["interval"]
  1036  	if ok {
  1037  		intervalStr, ok := interval.(string)
  1038  		if !ok {
  1039  			badPurgeUploadConfig("interval is not a string")
  1040  		}
  1041  
  1042  		intervalDuration, err = time.ParseDuration(intervalStr)
  1043  		if err != nil {
  1044  			badPurgeUploadConfig(fmt.Sprintf("Cannot parse interval: %s", err.Error()))
  1045  		}
  1046  	} else {
  1047  		badPurgeUploadConfig("interval missing")
  1048  	}
  1049  
  1050  	var dryRunBool bool
  1051  	dryRun, ok := config["dryrun"]
  1052  	if ok {
  1053  		dryRunBool, ok = dryRun.(bool)
  1054  		if !ok {
  1055  			badPurgeUploadConfig("cannot parse dryrun")
  1056  		}
  1057  	} else {
  1058  		badPurgeUploadConfig("dryrun missing")
  1059  	}
  1060  
  1061  	go func() {
  1062  		randInt, err := rand.Int(rand.Reader, new(big.Int).SetInt64(math.MaxInt64))
  1063  		if err != nil {
  1064  			log.Infof("Failed to generate random jitter: %v", err)
  1065  			// sleep 30min for failure case
  1066  			randInt = big.NewInt(30)
  1067  		}
  1068  		jitter := time.Duration(randInt.Int64()%60) * time.Minute
  1069  		log.Infof("Starting upload purge in %s", jitter)
  1070  		time.Sleep(jitter)
  1071  
  1072  		for {
  1073  			storage.PurgeUploads(ctx, storageDriver, time.Now().Add(-purgeAgeDuration), !dryRunBool)
  1074  			log.Infof("Starting upload purge in %s", intervalDuration)
  1075  			time.Sleep(intervalDuration)
  1076  		}
  1077  	}()
  1078  }
  1079  

View as plain text