...

Source file src/cloud.google.com/go/logging/logadmin/sinks.go

Documentation: cloud.google.com/go/logging/logadmin

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package logadmin
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  
    22  	vkit "cloud.google.com/go/logging/apiv2"
    23  	logpb "cloud.google.com/go/logging/apiv2/loggingpb"
    24  	"google.golang.org/api/iterator"
    25  	maskpb "google.golang.org/protobuf/types/known/fieldmaskpb"
    26  )
    27  
    28  // Sink describes a sink used to export log entries outside Cloud
    29  // Logging. Incoming log entries matching a filter are exported to a
    30  // destination (a Cloud Storage bucket, BigQuery dataset or Cloud Pub/Sub
    31  // topic).
    32  //
    33  // For more information, see https://cloud.google.com/logging/docs/export/using_exported_logs.
    34  // (The Sinks in this package are what the documentation refers to as "project sinks".)
    35  type Sink struct {
    36  	// ID is a client-assigned sink identifier. Example:
    37  	// "my-severe-errors-to-pubsub".
    38  	// Sink identifiers are limited to 1000 characters
    39  	// and can include only the following characters: A-Z, a-z,
    40  	// 0-9, and the special characters "_-.".
    41  	ID string
    42  
    43  	// Destination is the export destination. See
    44  	// https://cloud.google.com/logging/docs/api/tasks/exporting-logs.
    45  	// Examples: "storage.googleapis.com/a-bucket",
    46  	// "bigquery.googleapis.com/projects/a-project-id/datasets/a-dataset".
    47  	Destination string
    48  
    49  	// Filter optionally specifies an advanced logs filter (see
    50  	// https://cloud.google.com/logging/docs/view/advanced_filters) that
    51  	// defines the log entries to be exported. Example: "logName:syslog AND
    52  	// severity>=ERROR". If omitted, all entries are returned.
    53  	Filter string
    54  
    55  	// WriterIdentity must be a service account name. When exporting logs, Logging
    56  	// adopts this identity for authorization. The export destination's owner must
    57  	// give this service account permission to write to the export destination.
    58  	WriterIdentity string
    59  
    60  	// IncludeChildren, when set to true, allows the sink to export log entries from
    61  	// the organization or folder, plus (recursively) from any contained folders, billing
    62  	// accounts, or projects. IncludeChildren is false by default. You can use the sink's
    63  	// filter to choose log entries from specific projects, specific resource types, or
    64  	// specific named logs.
    65  	//
    66  	// Caution: If you enable this feature, your aggregated export sink might export
    67  	// a very large number of log entries. To avoid exporting too many log entries,
    68  	// design your aggregated export sink filter carefully, as described on
    69  	// https://cloud.google.com/logging/docs/export/aggregated_exports.
    70  	IncludeChildren bool
    71  }
    72  
    73  // CreateSink creates a Sink. It returns an error if the Sink already exists.
    74  // Requires AdminScope.
    75  func (c *Client) CreateSink(ctx context.Context, sink *Sink) (*Sink, error) {
    76  	return c.CreateSinkOpt(ctx, sink, SinkOptions{})
    77  }
    78  
    79  // CreateSinkOpt creates a Sink using the provided options. It returns an
    80  // error if the Sink already exists. Requires AdminScope.
    81  func (c *Client) CreateSinkOpt(ctx context.Context, sink *Sink, opts SinkOptions) (*Sink, error) {
    82  	ls, err := c.sClient.CreateSink(ctx, &logpb.CreateSinkRequest{
    83  		Parent:               c.parent,
    84  		Sink:                 toLogSink(sink),
    85  		UniqueWriterIdentity: opts.UniqueWriterIdentity,
    86  	})
    87  	if err != nil {
    88  		return nil, err
    89  	}
    90  	return fromLogSink(ls), nil
    91  }
    92  
    93  // SinkOptions define options to be used when creating or updating a sink.
    94  type SinkOptions struct {
    95  	// Determines the kind of IAM identity returned as WriterIdentity in the new
    96  	// sink. If this value is omitted or set to false, and if the sink's parent is a
    97  	// project, then the value returned as WriterIdentity is the same group or
    98  	// service account used by Cloud Logging before the addition of writer
    99  	// identities to the API. The sink's destination must be in the same project as
   100  	// the sink itself.
   101  	//
   102  	// If this field is set to true, or if the sink is owned by a non-project
   103  	// resource such as an organization, then the value of WriterIdentity will
   104  	// be a unique service account used only for exports from the new sink.
   105  	UniqueWriterIdentity bool
   106  
   107  	// These fields apply only to UpdateSinkOpt calls. The corresponding sink field
   108  	// is updated if and only if the Update field is true.
   109  	UpdateDestination     bool
   110  	UpdateFilter          bool
   111  	UpdateIncludeChildren bool
   112  }
   113  
   114  // DeleteSink deletes a sink. The provided sinkID is the sink's identifier, such as
   115  // "my-severe-errors-to-pubsub".
   116  // Requires AdminScope.
   117  func (c *Client) DeleteSink(ctx context.Context, sinkID string) error {
   118  	return c.sClient.DeleteSink(ctx, &logpb.DeleteSinkRequest{
   119  		SinkName: c.sinkPath(sinkID),
   120  	})
   121  }
   122  
   123  // Sink gets a sink. The provided sinkID is the sink's identifier, such as
   124  // "my-severe-errors-to-pubsub".
   125  // Requires ReadScope or AdminScope.
   126  func (c *Client) Sink(ctx context.Context, sinkID string) (*Sink, error) {
   127  	ls, err := c.sClient.GetSink(ctx, &logpb.GetSinkRequest{
   128  		SinkName: c.sinkPath(sinkID),
   129  	})
   130  	if err != nil {
   131  		return nil, err
   132  	}
   133  	return fromLogSink(ls), nil
   134  }
   135  
   136  // UpdateSink updates an existing Sink. Requires AdminScope.
   137  //
   138  // WARNING: UpdateSink will always update the Destination, Filter and IncludeChildren
   139  // fields of the sink, even if they have their zero values. Use UpdateSinkOpt
   140  // for more control over which fields to update.
   141  func (c *Client) UpdateSink(ctx context.Context, sink *Sink) (*Sink, error) {
   142  	return c.UpdateSinkOpt(ctx, sink, SinkOptions{
   143  		UpdateDestination:     true,
   144  		UpdateFilter:          true,
   145  		UpdateIncludeChildren: true,
   146  	})
   147  }
   148  
   149  // UpdateSinkOpt updates an existing Sink, using the provided options. Requires AdminScope.
   150  //
   151  // To change a sink's writer identity to a service account unique to the sink, set
   152  // opts.UniqueWriterIdentity to true. It is not possible to change a sink's writer identity
   153  // from a unique service account to a non-unique writer identity.
   154  func (c *Client) UpdateSinkOpt(ctx context.Context, sink *Sink, opts SinkOptions) (*Sink, error) {
   155  	mask := &maskpb.FieldMask{}
   156  	if opts.UpdateDestination {
   157  		mask.Paths = append(mask.Paths, "destination")
   158  	}
   159  	if opts.UpdateFilter {
   160  		mask.Paths = append(mask.Paths, "filter")
   161  	}
   162  	if opts.UpdateIncludeChildren {
   163  		mask.Paths = append(mask.Paths, "include_children")
   164  	}
   165  	if opts.UniqueWriterIdentity && len(mask.Paths) == 0 {
   166  		// Hack: specify a deprecated, unchangeable field so that we have a non-empty
   167  		// field mask. (An empty field mask would cause the destination, filter and include_children
   168  		// fields to be changed.)
   169  		mask.Paths = append(mask.Paths, "output_version_format")
   170  	}
   171  	if len(mask.Paths) == 0 {
   172  		return nil, errors.New("logadmin: UpdateSinkOpt: nothing to update")
   173  	}
   174  	ls, err := c.sClient.UpdateSink(ctx, &logpb.UpdateSinkRequest{
   175  		SinkName:             c.sinkPath(sink.ID),
   176  		Sink:                 toLogSink(sink),
   177  		UniqueWriterIdentity: opts.UniqueWriterIdentity,
   178  		UpdateMask:           mask,
   179  	})
   180  	if err != nil {
   181  		return nil, err
   182  	}
   183  	return fromLogSink(ls), err
   184  }
   185  
   186  func (c *Client) sinkPath(sinkID string) string {
   187  	return fmt.Sprintf("%s/sinks/%s", c.parent, sinkID)
   188  }
   189  
   190  // Sinks returns a SinkIterator for iterating over all Sinks in the Client's project.
   191  // Requires ReadScope or AdminScope.
   192  func (c *Client) Sinks(ctx context.Context) *SinkIterator {
   193  	it := &SinkIterator{
   194  		it: c.sClient.ListSinks(ctx, &logpb.ListSinksRequest{Parent: c.parent}),
   195  	}
   196  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(
   197  		it.fetch,
   198  		func() int { return len(it.items) },
   199  		func() interface{} { b := it.items; it.items = nil; return b })
   200  	return it
   201  }
   202  
   203  // A SinkIterator iterates over Sinks.
   204  type SinkIterator struct {
   205  	it       *vkit.LogSinkIterator
   206  	pageInfo *iterator.PageInfo
   207  	nextFunc func() error
   208  	items    []*Sink
   209  }
   210  
   211  // PageInfo supports pagination. See the google.golang.org/api/iterator package for details.
   212  func (it *SinkIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
   213  
   214  // Next returns the next result. Its second return value is Done if there are
   215  // no more results. Once Next returns Done, all subsequent calls will return
   216  // Done.
   217  func (it *SinkIterator) Next() (*Sink, error) {
   218  	if err := it.nextFunc(); err != nil {
   219  		return nil, err
   220  	}
   221  	item := it.items[0]
   222  	it.items = it.items[1:]
   223  	return item, nil
   224  }
   225  
   226  func (it *SinkIterator) fetch(pageSize int, pageToken string) (string, error) {
   227  	return iterFetch(pageSize, pageToken, it.it.PageInfo(), func() error {
   228  		item, err := it.it.Next()
   229  		if err != nil {
   230  			return err
   231  		}
   232  		it.items = append(it.items, fromLogSink(item))
   233  		return nil
   234  	})
   235  }
   236  
   237  func toLogSink(s *Sink) *logpb.LogSink {
   238  	return &logpb.LogSink{
   239  		Name:                s.ID,
   240  		Destination:         s.Destination,
   241  		Filter:              s.Filter,
   242  		IncludeChildren:     s.IncludeChildren,
   243  		OutputVersionFormat: logpb.LogSink_V2,
   244  		// omit WriterIdentity because it is output-only.
   245  	}
   246  }
   247  
   248  func fromLogSink(ls *logpb.LogSink) *Sink {
   249  	return &Sink{
   250  		ID:              ls.Name,
   251  		Destination:     ls.Destination,
   252  		Filter:          ls.Filter,
   253  		WriterIdentity:  ls.WriterIdentity,
   254  		IncludeChildren: ls.IncludeChildren,
   255  	}
   256  }
   257  

View as plain text