...

Source file src/cloud.google.com/go/pubsub/snapshot.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"strings"
    21  	"time"
    22  
    23  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    24  	fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
    25  	"google.golang.org/protobuf/types/known/timestamppb"
    26  )
    27  
    28  // Snapshot is a reference to a PubSub snapshot.
    29  type Snapshot struct {
    30  	c *Client
    31  
    32  	// The fully qualified identifier for the snapshot, in the format "projects/<projid>/snapshots/<snap>"
    33  	name string
    34  }
    35  
    36  // ID returns the unique identifier of the snapshot within its project.
    37  func (s *Snapshot) ID() string {
    38  	slash := strings.LastIndex(s.name, "/")
    39  	if slash == -1 {
    40  		// name is not a fully-qualified name.
    41  		panic("bad snapshot name")
    42  	}
    43  	return s.name[slash+1:]
    44  }
    45  
    46  // SetLabels sets or replaces the labels on a given snapshot.
    47  func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error) {
    48  	sc, err := s.c.subc.UpdateSnapshot(ctx, &pb.UpdateSnapshotRequest{
    49  		Snapshot: &pb.Snapshot{
    50  			Name:   s.name,
    51  			Labels: label,
    52  		},
    53  		UpdateMask: &fmpb.FieldMask{
    54  			Paths: []string{"labels"},
    55  		},
    56  	})
    57  	if err != nil {
    58  		return nil, err
    59  	}
    60  	return toSnapshotConfig(sc, s.c)
    61  }
    62  
    63  // SnapshotConfig contains the details of a Snapshot.
    64  type SnapshotConfig struct {
    65  	*Snapshot
    66  	Topic      *Topic
    67  	Expiration time.Time
    68  	// The set of labels for the snapshot.
    69  	Labels map[string]string
    70  }
    71  
    72  // Snapshot creates a reference to a snapshot.
    73  func (c *Client) Snapshot(id string) *Snapshot {
    74  	return &Snapshot{
    75  		c:    c,
    76  		name: fmt.Sprintf("projects/%s/snapshots/%s", c.projectID, id),
    77  	}
    78  }
    79  
    80  // Snapshots returns an iterator which returns snapshots for this project.
    81  func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator {
    82  	it := c.subc.ListSnapshots(ctx, &pb.ListSnapshotsRequest{
    83  		Project: c.fullyQualifiedProjectName(),
    84  	})
    85  	next := func() (*SnapshotConfig, error) {
    86  		snap, err := it.Next()
    87  		if err != nil {
    88  			return nil, err
    89  		}
    90  		return toSnapshotConfig(snap, c)
    91  	}
    92  	return &SnapshotConfigIterator{next: next}
    93  }
    94  
    95  // SnapshotConfigIterator is an iterator that returns a series of snapshots.
    96  type SnapshotConfigIterator struct {
    97  	next func() (*SnapshotConfig, error)
    98  }
    99  
   100  // Next returns the next SnapshotConfig. Its second return value is iterator.Done if there are no more results.
   101  // Once Next returns iterator.Done, all subsequent calls will return iterator.Done.
   102  func (snaps *SnapshotConfigIterator) Next() (*SnapshotConfig, error) {
   103  	return snaps.next()
   104  }
   105  
   106  // Delete deletes a snapshot.
   107  func (s *Snapshot) Delete(ctx context.Context) error {
   108  	return s.c.subc.DeleteSnapshot(ctx, &pb.DeleteSnapshotRequest{Snapshot: s.name})
   109  }
   110  
   111  // SeekToTime seeks the subscription to a point in time.
   112  //
   113  // Messages retained in the subscription that were published before this
   114  // time are marked as acknowledged, and messages retained in the
   115  // subscription that were published after this time are marked as
   116  // unacknowledged. Note that this operation affects only those messages
   117  // retained in the subscription (configured by SnapshotConfig). For example,
   118  // if `time` corresponds to a point before the message retention
   119  // window (or to a point before the system's notion of the subscription
   120  // creation time), only retained messages will be marked as unacknowledged,
   121  // and already-expunged messages will not be restored.
   122  func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error {
   123  	ts := timestamppb.New(t)
   124  	_, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
   125  		Subscription: s.name,
   126  		Target:       &pb.SeekRequest_Time{Time: ts},
   127  	})
   128  	return err
   129  }
   130  
   131  // CreateSnapshot creates a new snapshot from this subscription.
   132  // The snapshot will be for the topic this subscription is subscribed to.
   133  // If the name is empty string, a unique name is assigned.
   134  //
   135  // The created snapshot is guaranteed to retain:
   136  //
   137  //	(a) The existing backlog on the subscription. More precisely, this is
   138  //	    defined as the messages in the subscription's backlog that are
   139  //	    unacknowledged when Snapshot returns without error.
   140  //	(b) Any messages published to the subscription's topic following
   141  //	    Snapshot returning without error.
   142  func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error) {
   143  	if name != "" {
   144  		name = fmt.Sprintf("projects/%s/snapshots/%s", strings.Split(s.name, "/")[1], name)
   145  	}
   146  	snap, err := s.c.subc.CreateSnapshot(ctx, &pb.CreateSnapshotRequest{
   147  		Name:         name,
   148  		Subscription: s.name,
   149  	})
   150  	if err != nil {
   151  		return nil, err
   152  	}
   153  	return toSnapshotConfig(snap, s.c)
   154  }
   155  
   156  // SeekToSnapshot seeks the subscription to a snapshot.
   157  //
   158  // The snapshot need not be created from this subscription,
   159  // but it must be for the topic this subscription is subscribed to.
   160  func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error {
   161  	_, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
   162  		Subscription: s.name,
   163  		Target:       &pb.SeekRequest_Snapshot{Snapshot: snap.name},
   164  	})
   165  	return err
   166  }
   167  
   168  func toSnapshotConfig(snap *pb.Snapshot, c *Client) (*SnapshotConfig, error) {
   169  	exp := snap.ExpireTime.AsTime()
   170  	return &SnapshotConfig{
   171  		Snapshot:   &Snapshot{c: c, name: snap.Name},
   172  		Topic:      newTopic(c, snap.Topic),
   173  		Expiration: exp,
   174  		Labels:     snap.Labels,
   175  	}, nil
   176  }
   177  

View as plain text