1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "fmt"
20 "strings"
21 "time"
22
23 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
24 fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
25 "google.golang.org/protobuf/types/known/timestamppb"
26 )
27
28
29 type Snapshot struct {
30 c *Client
31
32
33 name string
34 }
35
36
37 func (s *Snapshot) ID() string {
38 slash := strings.LastIndex(s.name, "/")
39 if slash == -1 {
40
41 panic("bad snapshot name")
42 }
43 return s.name[slash+1:]
44 }
45
46
47 func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error) {
48 sc, err := s.c.subc.UpdateSnapshot(ctx, &pb.UpdateSnapshotRequest{
49 Snapshot: &pb.Snapshot{
50 Name: s.name,
51 Labels: label,
52 },
53 UpdateMask: &fmpb.FieldMask{
54 Paths: []string{"labels"},
55 },
56 })
57 if err != nil {
58 return nil, err
59 }
60 return toSnapshotConfig(sc, s.c)
61 }
62
63
64 type SnapshotConfig struct {
65 *Snapshot
66 Topic *Topic
67 Expiration time.Time
68
69 Labels map[string]string
70 }
71
72
73 func (c *Client) Snapshot(id string) *Snapshot {
74 return &Snapshot{
75 c: c,
76 name: fmt.Sprintf("projects/%s/snapshots/%s", c.projectID, id),
77 }
78 }
79
80
81 func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator {
82 it := c.subc.ListSnapshots(ctx, &pb.ListSnapshotsRequest{
83 Project: c.fullyQualifiedProjectName(),
84 })
85 next := func() (*SnapshotConfig, error) {
86 snap, err := it.Next()
87 if err != nil {
88 return nil, err
89 }
90 return toSnapshotConfig(snap, c)
91 }
92 return &SnapshotConfigIterator{next: next}
93 }
94
95
96 type SnapshotConfigIterator struct {
97 next func() (*SnapshotConfig, error)
98 }
99
100
101
102 func (snaps *SnapshotConfigIterator) Next() (*SnapshotConfig, error) {
103 return snaps.next()
104 }
105
106
107 func (s *Snapshot) Delete(ctx context.Context) error {
108 return s.c.subc.DeleteSnapshot(ctx, &pb.DeleteSnapshotRequest{Snapshot: s.name})
109 }
110
111
112
113
114
115
116
117
118
119
120
121
122 func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error {
123 ts := timestamppb.New(t)
124 _, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
125 Subscription: s.name,
126 Target: &pb.SeekRequest_Time{Time: ts},
127 })
128 return err
129 }
130
131
132
133
134
135
136
137
138
139
140
141
142 func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error) {
143 if name != "" {
144 name = fmt.Sprintf("projects/%s/snapshots/%s", strings.Split(s.name, "/")[1], name)
145 }
146 snap, err := s.c.subc.CreateSnapshot(ctx, &pb.CreateSnapshotRequest{
147 Name: name,
148 Subscription: s.name,
149 })
150 if err != nil {
151 return nil, err
152 }
153 return toSnapshotConfig(snap, s.c)
154 }
155
156
157
158
159
160 func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error {
161 _, err := s.c.subc.Seek(ctx, &pb.SeekRequest{
162 Subscription: s.name,
163 Target: &pb.SeekRequest_Snapshot{Snapshot: snap.name},
164 })
165 return err
166 }
167
168 func toSnapshotConfig(snap *pb.Snapshot, c *Client) (*SnapshotConfig, error) {
169 exp := snap.ExpireTime.AsTime()
170 return &SnapshotConfig{
171 Snapshot: &Snapshot{c: c, name: snap.Name},
172 Topic: newTopic(c, snap.Topic),
173 Expiration: exp,
174 Labels: snap.Labels,
175 }, nil
176 }
177
View as plain text