1 package forwarder
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "path"
8 "strings"
9
10 "cloud.google.com/go/pubsub"
11 "github.com/google/go-containerregistry/pkg/name"
12 "go.uber.org/multierr"
13
14 "edge-infra.dev/pkg/f8n/kinform/sql"
15 sovereign "edge-infra.dev/pkg/f8n/sovereign/model"
16 "edge-infra.dev/pkg/f8n/warehouse/oci/remote"
17 "edge-infra.dev/pkg/lib/fog"
18 )
19
20 const (
21 Insertion = "INSERT"
22 Deletion = "DELETE"
23 )
24
25 type Fwder struct {
26 DST []Destination
27
28
29 SourceRepository string
30 SQL *sql.DBHandle
31 }
32
33
34
35
36
37 type Message struct {
38
39
40
41
42
43
44 Action string `json:"action,omitempty"`
45
46
47
48 Digest string `json:"digest"`
49
50
51
52 Tag string `json:"tag"`
53
54
55
56
57
58 ProjectID string `json:"projectID,omitempty"`
59
60
61
62
63
64 Repository string `json:"registry,omitempty"`
65 }
66
67 func (f *Fwder) HandleMsg(ctx context.Context, msg *pubsub.Message) error {
68 log := fog.FromContext(ctx)
69
70 m := &Message{}
71 if err := json.Unmarshal(msg.Data, m); err != nil {
72 IncPackageForwardErrs(*m)
73 return err
74 }
75 log = log.WithValues("ref", m.Digest)
76
77 ref, err := name.NewDigest(m.Digest, name.StrictValidation)
78 if err != nil {
79 IncPackageForwardErrs(*m)
80 return fmt.Errorf("failed to parse reference from digest %s: %w", m.Digest, err)
81 }
82
83
84 registry, srcrepository, image := parseRef(ref)
85
86 if m.Tag != "" {
87 dstTag, err := name.NewTag(m.Tag, name.StrictValidation)
88 if err != nil {
89 IncPackageForwardErrs(*m)
90 return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err)
91 }
92
93
94 registry, _, _ = parseRef(dstTag)
95 }
96
97 if f.SourceRepository != "" && f.SourceRepository != srcrepository {
98 log.Info("ignoring because repository doesn't match configured source",
99 "repository", srcrepository,
100 "source", f.SourceRepository,
101 )
102 IncPackageForwardSkips(*m)
103 return nil
104 }
105
106
107
108 if err := f.Ingest(ctx, m.Action, image, m.Tag, ref); err != nil {
109 log.Info("failed to ingest registry event. forwarding anyways", "error", err)
110 }
111
112 var destinations []Destination
113
114
115 if m.ProjectID != "" && m.Repository != "" {
116 destinations = append(destinations, Destination{ProjectID: m.ProjectID, Repository: m.Repository})
117 } else {
118 if m.ProjectID != "" || m.Repository != "" {
119 log.Info("partial forwarder config given. ignoring and using defaults",
120 "projectID", m.ProjectID, "registry", m.Repository,
121 )
122 }
123 destinations = append(destinations, f.DST...)
124 }
125
126 if m.Action != "" && m.Action != Insertion {
127 log.Info("ignoring because action is present and not INSERT", "action", m.Action)
128 IncPackageForwardSkips(*m)
129 return nil
130 }
131
132 isPromotion := msg.Attributes["promotion"] == "true"
133 var fwdErrs []error
134 for _, d := range destinations {
135
136 fm := &Message{
137 ProjectID: d.ProjectID,
138 Repository: d.Repository,
139 Digest: m.Digest,
140 Tag: m.Tag,
141 }
142 if isPromotion {
143 if err := f.promote(ctx, fm, registry, image, ref); err != nil {
144 log.Info("failed to insert promotion record. forwarding anyways",
145 "error", err)
146 }
147 }
148 if err := f.forward(ctx, fm, registry, image, ref); err != nil {
149 fwdErrs = append(fwdErrs, err)
150 }
151 }
152 return multierr.Combine(fwdErrs...)
153 }
154
155 func (f *Fwder) forward(ctx context.Context, m *Message, registry, image string, ref name.Digest) error {
156 log := fog.FromContext(ctx)
157
158
159
160 dstRoot := path.Join(registry, m.ProjectID, m.Repository, image)
161
162 dst, err := name.ParseReference(fmt.Sprintf("%s@%s", dstRoot, ref.Identifier()))
163 if err != nil {
164 return fmt.Errorf("failed to parse new destination reference: %w", err)
165 }
166 log = log.WithValues("dst", dst.String())
167 log.Info("forwarding")
168
169 a, err := remote.Get(ref)
170 if err != nil {
171 IncPackageForwardErrs(*m)
172 return fmt.Errorf("failed to fetch %s: %w", m.Digest, err)
173 }
174
175 if err := remote.Write(a, dst); err != nil {
176 IncPackageForwardErrs(*m)
177 return fmt.Errorf("failed to write %s: %w", dst, err)
178 }
179
180 log.Info("forwarded")
181
182
183 if m.Tag != "" {
184 ref, err := name.NewTag(m.Tag, name.StrictValidation)
185 if err != nil {
186 return fmt.Errorf("failed to parse reference from tag %s: %w", m.Tag, err)
187 }
188 dstTag := fmt.Sprintf("%s:%s", dstRoot, ref.Identifier())
189 tag, err := name.NewTag(dstTag, name.StrictValidation)
190 if err != nil {
191 return fmt.Errorf("failed to parse updated tag reference %s: %W", tag, err)
192 }
193
194
195
196 trimmed, found := strings.CutSuffix(tag.TagStr(), "-rc")
197 if found {
198 tag = tag.Tag(trimmed)
199 }
200 if err := remote.Tag(tag, a); err != nil {
201 return fmt.Errorf("failed to tag %s: %w", dstTag, err)
202 }
203 log.Info("tagged forwarded artifact", "tag", ref.Identifier())
204 }
205
206 IncPackageForwards(*m)
207 return nil
208 }
209
210 func (f *Fwder) Ingest(ctx context.Context, action, image, tag string, ref name.Digest) error {
211 if f.SQL == nil {
212 return fmt.Errorf("no database connection, cant ingest artifact")
213 }
214 digestPts := strings.Split(ref.DigestStr(), ":")
215 if len(digestPts) != 2 && digestPts[0] != "sha256" {
216 return fmt.Errorf("choked on digest string")
217 }
218 digest := digestPts[1]
219
220 switch action {
221 case Insertion:
222 _, err := f.SQL.InsertArtifactVersion(ctx, image, tag, digest)
223 if err != nil {
224 return fmt.Errorf("failed to insert artifact_version. err: %v", err)
225 }
226 case Deletion:
227 if err := f.SQL.DeleteArtifactVersion(ctx, image, digest); err != nil {
228 return fmt.Errorf("failed to delete artifact_version. err: %v", err)
229 }
230 }
231
232 return nil
233 }
234
235 func (f *Fwder) promote(ctx context.Context, m *Message, registry, image string, ref name.Digest) error {
236 if f.SQL == nil {
237 return fmt.Errorf("no database connection, cant promote artifact")
238 }
239
240 digestPts := strings.Split(ref.DigestStr(), ":")
241 if len(digestPts) != 2 && digestPts[0] != "sha256" {
242 return fmt.Errorf("choked on digest string")
243 }
244 digest := digestPts[1]
245
246
247 artifactVersion, err := f.SQL.QueryArtifactVersion(ctx, image, digest)
248 if err != nil {
249 return fmt.Errorf("artifact_version not found, cant promote. err: %v", err)
250 }
251
252 repository := path.Join(registry, m.ProjectID, m.Repository, image)
253 a := sovereign.Artifact{
254 ProjectID: m.ProjectID,
255 Repository: repository,
256 ArtifactVersion: artifactVersion.ID,
257 }
258 _, err = f.SQL.InsertArtifact(ctx, a)
259 if err != nil {
260 return fmt.Errorf("failed to insert artifact. err: %v", err)
261 }
262 return nil
263 }
264
View as plain text