1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 package longrunning
23
24 import (
25 "context"
26 "errors"
27 "fmt"
28 "time"
29
30 autogen "cloud.google.com/go/longrunning/autogen"
31 pb "cloud.google.com/go/longrunning/autogen/longrunningpb"
32 gax "github.com/googleapis/gax-go/v2"
33 "github.com/googleapis/gax-go/v2/apierror"
34 "google.golang.org/grpc/status"
35 "google.golang.org/protobuf/proto"
36 "google.golang.org/protobuf/protoadapt"
37 "google.golang.org/protobuf/types/known/anypb"
38 )
39
40
41 var ErrNoMetadata = errors.New("operation contains no metadata")
42
43
44 type Operation struct {
45 c operationsClient
46 proto *pb.Operation
47 }
48
49 type operationsClient interface {
50 GetOperation(context.Context, *pb.GetOperationRequest, ...gax.CallOption) (*pb.Operation, error)
51 CancelOperation(context.Context, *pb.CancelOperationRequest, ...gax.CallOption) error
52 DeleteOperation(context.Context, *pb.DeleteOperationRequest, ...gax.CallOption) error
53 }
54
55
56
57
58
59 func InternalNewOperation(inner *autogen.OperationsClient, proto *pb.Operation) *Operation {
60 return &Operation{
61 c: inner,
62 proto: proto,
63 }
64 }
65
66
67
68
69 func (op *Operation) Name() string {
70 return op.proto.Name
71 }
72
73
74 func (op *Operation) Done() bool {
75 return op.proto.Done
76 }
77
78
79
80 func (op *Operation) Metadata(meta protoadapt.MessageV1) error {
81 if m := op.proto.Metadata; m != nil {
82 metav2 := protoadapt.MessageV2Of(meta)
83 return anypb.UnmarshalTo(m, metav2, proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true})
84 }
85 return ErrNoMetadata
86 }
87
88
89
90
91
92
93
94
95
96 func (op *Operation) Poll(ctx context.Context, resp protoadapt.MessageV1, opts ...gax.CallOption) error {
97 if !op.Done() {
98 p, err := op.c.GetOperation(ctx, &pb.GetOperationRequest{Name: op.Name()}, opts...)
99 if err != nil {
100 return err
101 }
102 op.proto = p
103 }
104 if !op.Done() {
105 return nil
106 }
107
108 switch r := op.proto.Result.(type) {
109 case *pb.Operation_Error:
110 err, _ := apierror.FromError(status.ErrorProto(r.Error))
111 return err
112 case *pb.Operation_Response:
113 if resp == nil {
114 return nil
115 }
116 respv2 := protoadapt.MessageV2Of(resp)
117 return anypb.UnmarshalTo(r.Response, respv2, proto.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true})
118 default:
119 return fmt.Errorf("unsupported result type %[1]T: %[1]v", r)
120 }
121 }
122
123
124 const DefaultWaitInterval = 60 * time.Second
125
126
127 func (op *Operation) Wait(ctx context.Context, resp protoadapt.MessageV1, opts ...gax.CallOption) error {
128 return op.WaitWithInterval(ctx, resp, DefaultWaitInterval, opts...)
129 }
130
131
132
133
134
135
136
137 func (op *Operation) WaitWithInterval(ctx context.Context, resp protoadapt.MessageV1, interval time.Duration, opts ...gax.CallOption) error {
138 bo := gax.Backoff{
139 Initial: 1 * time.Second,
140 Max: interval,
141 }
142 if bo.Max < bo.Initial {
143 bo.Max = bo.Initial
144 }
145 return op.wait(ctx, resp, &bo, gax.Sleep, opts...)
146 }
147
148 type sleeper func(context.Context, time.Duration) error
149
150
151 func (op *Operation) wait(ctx context.Context, resp protoadapt.MessageV1, bo *gax.Backoff, sl sleeper, opts ...gax.CallOption) error {
152 for {
153 if err := op.Poll(ctx, resp, opts...); err != nil {
154 return err
155 }
156 if op.Done() {
157 return nil
158 }
159 if err := sl(ctx, bo.Pause()); err != nil {
160 return err
161 }
162 }
163 }
164
165
166
167
168
169
170
171
172
173 func (op *Operation) Cancel(ctx context.Context, opts ...gax.CallOption) error {
174 return op.c.CancelOperation(ctx, &pb.CancelOperationRequest{Name: op.Name()}, opts...)
175 }
176
177
178
179
180 func (op *Operation) Delete(ctx context.Context, opts ...gax.CallOption) error {
181 return op.c.DeleteOperation(ctx, &pb.DeleteOperationRequest{Name: op.Name()}, opts...)
182 }
183
View as plain text