1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26 package logadmin
27
28 import (
29 "context"
30 "fmt"
31 "net/http"
32 "net/url"
33 "strings"
34 "time"
35
36 "cloud.google.com/go/logging"
37 vkit "cloud.google.com/go/logging/apiv2"
38 logpb "cloud.google.com/go/logging/apiv2/loggingpb"
39 "cloud.google.com/go/logging/internal"
40 gax "github.com/googleapis/gax-go/v2"
41 "google.golang.org/api/iterator"
42 "google.golang.org/api/option"
43 _ "google.golang.org/genproto/googleapis/appengine/logging/v1"
44 _ "google.golang.org/genproto/googleapis/cloud/audit"
45 logtypepb "google.golang.org/genproto/googleapis/logging/type"
46 "google.golang.org/grpc/codes"
47 )
48
49
50 type Client struct {
51 lClient *vkit.Client
52 sClient *vkit.ConfigClient
53 mClient *vkit.MetricsClient
54 parent string
55 closed bool
56 }
57
58
59
60
61
62 func NewClient(ctx context.Context, parent string, opts ...option.ClientOption) (*Client, error) {
63 if !strings.ContainsRune(parent, '/') {
64 parent = "projects/" + parent
65 }
66 opts = append([]option.ClientOption{
67 option.WithScopes(logging.AdminScope),
68 }, opts...)
69 lc, err := vkit.NewClient(ctx, opts...)
70 if err != nil {
71 return nil, err
72 }
73
74 sc, err := vkit.NewConfigClient(ctx, option.WithGRPCConn(lc.Connection()))
75 if err != nil {
76 return nil, err
77 }
78 mc, err := vkit.NewMetricsClient(ctx, option.WithGRPCConn(lc.Connection()))
79 if err != nil {
80 return nil, err
81 }
82
83
84 retryerOnInternal := func() gax.Retryer {
85 return gax.OnCodes([]codes.Code{
86 codes.Internal,
87 }, gax.Backoff{
88 Initial: 100 * time.Millisecond,
89 Max: 1000 * time.Millisecond,
90 Multiplier: 1.2,
91 })
92 }
93 mc.CallOptions.CreateLogMetric = []gax.CallOption{gax.WithRetry(retryerOnInternal)}
94 mc.CallOptions.UpdateLogMetric = []gax.CallOption{gax.WithRetry(retryerOnInternal)}
95
96 lc.SetGoogleClientInfo("gccl", internal.Version)
97 sc.SetGoogleClientInfo("gccl", internal.Version)
98 mc.SetGoogleClientInfo("gccl", internal.Version)
99 client := &Client{
100 lClient: lc,
101 sClient: sc,
102 mClient: mc,
103 parent: parent,
104 }
105 return client, nil
106 }
107
108
109 func (c *Client) Close() error {
110 if c.closed {
111 return nil
112 }
113
114
115 err := c.lClient.Close()
116 _ = c.sClient.Close()
117 _ = c.mClient.Close()
118 c.closed = true
119 return err
120 }
121
122
123
124 func (c *Client) DeleteLog(ctx context.Context, logID string) error {
125 return c.lClient.DeleteLog(ctx, &logpb.DeleteLogRequest{
126 LogName: internal.LogPath(c.parent, logID),
127 })
128 }
129
130 func toHTTPRequest(p *logtypepb.HttpRequest) (*logging.HTTPRequest, error) {
131 if p == nil {
132 return nil, nil
133 }
134 u, err := url.Parse(p.RequestUrl)
135 if err != nil {
136 return nil, err
137 }
138 var dur time.Duration
139 if p.Latency != nil {
140 if err := p.GetLatency().CheckValid(); err != nil {
141 return nil, err
142 }
143 dur = p.GetLatency().AsDuration()
144 }
145 hr := &http.Request{
146 Method: p.RequestMethod,
147 URL: u,
148 Header: map[string][]string{},
149 }
150 if p.UserAgent != "" {
151 hr.Header.Set("User-Agent", p.UserAgent)
152 }
153 if p.Referer != "" {
154 hr.Header.Set("Referer", p.Referer)
155 }
156 return &logging.HTTPRequest{
157 Request: hr,
158 RequestSize: p.RequestSize,
159 Status: int(p.Status),
160 ResponseSize: p.ResponseSize,
161 Latency: dur,
162 LocalIP: p.ServerIp,
163 RemoteIP: p.RemoteIp,
164 CacheHit: p.CacheHit,
165 CacheValidatedWithOriginServer: p.CacheValidatedWithOriginServer,
166 CacheFillBytes: p.CacheFillBytes,
167 CacheLookup: p.CacheLookup,
168 }, nil
169 }
170
171
172 type EntriesOption interface {
173 set(*logpb.ListLogEntriesRequest)
174 }
175
176
177
178 func ProjectIDs(pids []string) EntriesOption { return projectIDs(pids) }
179
180 type projectIDs []string
181
182 func (p projectIDs) set(r *logpb.ListLogEntriesRequest) {
183 r.ResourceNames = make([]string, len(p))
184 for i, v := range p {
185 r.ResourceNames[i] = fmt.Sprintf("projects/%s", v)
186 }
187 }
188
189
190
191 func ResourceNames(rns []string) EntriesOption { return resourceNames(rns) }
192
193 type resourceNames []string
194
195 func (rn resourceNames) set(r *logpb.ListLogEntriesRequest) {
196 r.ResourceNames = append([]string(nil), rn...)
197 }
198
199
200
201
202
203
204
205
206
207
208
209
210
211 func Filter(f string) EntriesOption { return filter(f) }
212
213 type filter string
214
215 func (f filter) set(r *logpb.ListLogEntriesRequest) { r.Filter = string(f) }
216
217
218
219 func NewestFirst() EntriesOption { return newestFirst{} }
220
221 type newestFirst struct{}
222
223 func (newestFirst) set(r *logpb.ListLogEntriesRequest) { r.OrderBy = "timestamp desc" }
224
225
226 func PageSize(p int32) EntriesOption { return pageSize(p) }
227
228 type pageSize int32
229
230 func (p pageSize) set(r *logpb.ListLogEntriesRequest) { r.PageSize = int32(p) }
231
232
233
234
235 func (c *Client) Entries(ctx context.Context, opts ...EntriesOption) *EntryIterator {
236 it := &EntryIterator{
237 it: c.lClient.ListLogEntries(ctx, listLogEntriesRequest(c.parent, opts)),
238 }
239 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
240 it.fetch,
241 func() int { return len(it.items) },
242 func() interface{} { b := it.items; it.items = nil; return b })
243 return it
244 }
245
246 func listLogEntriesRequest(parent string, opts []EntriesOption) *logpb.ListLogEntriesRequest {
247 req := &logpb.ListLogEntriesRequest{
248 ResourceNames: []string{parent},
249 }
250 for _, opt := range opts {
251 opt.set(req)
252 }
253 req.Filter = defaultTimestampFilter(req.Filter)
254 return req
255 }
256
257
258
259
260 func defaultTimestampFilter(filter string) string {
261 dayAgo := time.Now().Add(-24 * time.Hour).UTC()
262 switch {
263 case len(filter) == 0:
264 return fmt.Sprintf(`timestamp >= "%s"`, dayAgo.Format(time.RFC3339))
265 case !strings.Contains(strings.ToLower(filter), "timestamp"):
266 return fmt.Sprintf(`%s AND timestamp >= "%s"`, filter, dayAgo.Format(time.RFC3339))
267 default:
268 return filter
269 }
270 }
271
272
273 type EntryIterator struct {
274 it *vkit.LogEntryIterator
275 pageInfo *iterator.PageInfo
276 nextFunc func() error
277 items []*logging.Entry
278 }
279
280
281 func (it *EntryIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
282
283
284
285
286 func (it *EntryIterator) Next() (*logging.Entry, error) {
287 if err := it.nextFunc(); err != nil {
288 return nil, err
289 }
290 item := it.items[0]
291 it.items = it.items[1:]
292 return item, nil
293 }
294
295 func (it *EntryIterator) fetch(pageSize int, pageToken string) (string, error) {
296 return iterFetch(pageSize, pageToken, it.it.PageInfo(), func() error {
297 item, err := it.it.Next()
298 if err != nil {
299 return err
300 }
301 e, err := fromLogEntry(item)
302 if err != nil {
303 return err
304 }
305 it.items = append(it.items, e)
306 return nil
307 })
308 }
309
310 var slashUnescaper = strings.NewReplacer("%2F", "/", "%2f", "/")
311
312 func fromLogEntry(le *logpb.LogEntry) (*logging.Entry, error) {
313 if err := le.GetTimestamp().CheckValid(); err != nil {
314 return nil, err
315 }
316 time := le.GetTimestamp().AsTime()
317 var payload interface{}
318 switch x := le.Payload.(type) {
319 case *logpb.LogEntry_TextPayload:
320 payload = x.TextPayload
321
322 case *logpb.LogEntry_ProtoPayload:
323 msg, err := x.ProtoPayload.UnmarshalNew()
324 if err != nil {
325 return nil, fmt.Errorf("logging: unmarshalling proto payload: %w", err)
326 }
327 payload = msg
328
329 case *logpb.LogEntry_JsonPayload:
330
331
332 payload = x.JsonPayload
333
334 case nil:
335 payload = nil
336
337 default:
338 return nil, fmt.Errorf("logging: unknown payload type: %T", le.Payload)
339 }
340 hr, err := toHTTPRequest(le.HttpRequest)
341 if err != nil {
342 return nil, err
343 }
344 return &logging.Entry{
345 Timestamp: time,
346 Severity: logging.Severity(le.Severity),
347 Payload: payload,
348 Labels: le.Labels,
349 InsertID: le.InsertId,
350 HTTPRequest: hr,
351 Operation: le.Operation,
352 LogName: slashUnescaper.Replace(le.LogName),
353 Resource: le.Resource,
354 Trace: le.Trace,
355 SourceLocation: le.SourceLocation,
356 }, nil
357 }
358
359
360 func (c *Client) Logs(ctx context.Context) *LogIterator {
361 it := &LogIterator{
362 parentResource: c.parent,
363 it: c.lClient.ListLogs(ctx, &logpb.ListLogsRequest{Parent: c.parent}),
364 }
365 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
366 it.fetch,
367 func() int { return len(it.items) },
368 func() interface{} { b := it.items; it.items = nil; return b })
369 return it
370 }
371
372
373 type LogIterator struct {
374 parentResource string
375 it *vkit.StringIterator
376 pageInfo *iterator.PageInfo
377 nextFunc func() error
378 items []string
379 }
380
381
382 func (it *LogIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
383
384
385
386
387 func (it *LogIterator) Next() (string, error) {
388 if err := it.nextFunc(); err != nil {
389 return "", err
390 }
391 item := it.items[0]
392 it.items = it.items[1:]
393 return item, nil
394 }
395
396 func (it *LogIterator) fetch(pageSize int, pageToken string) (string, error) {
397 return iterFetch(pageSize, pageToken, it.it.PageInfo(), func() error {
398 logPath, err := it.it.Next()
399 if err != nil {
400 return err
401 }
402 logID := internal.LogIDFromPath(it.parentResource, logPath)
403 it.items = append(it.items, logID)
404 return nil
405 })
406 }
407
408
409 func iterFetch(pageSize int, pageToken string, pi *iterator.PageInfo, next func() error) (string, error) {
410 pi.MaxSize = pageSize
411 pi.Token = pageToken
412
413 if err := next(); err != nil {
414 return "", err
415 }
416
417 for pi.Remaining() > 0 {
418 if err := next(); err != nil {
419 return "", err
420 }
421 }
422 return pi.Token, nil
423 }
424
View as plain text