1# Copyright 2022 Google LLC
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14"""This file provides the Config class which manages a single resource config
15
16file in a sample.
17"""
18from __future__ import annotations
19
20import os
21import re
22import ruamel.yaml
23
24from absl import logging
25from collections.abc import Mapping
26from collections.abc import Set
27from dataclasses import dataclass
28from typing import Any, Callable
29
30import strings
31
32# This regex matches a dcl resource reference. The submatch is the resource
33# name.
34REFERENCE_REGEXP = re.compile(
35 r'{{\s*ref:([a-z0-9_]*\.[a-z_]*\.[a-z_]*(\.[a-z_]*)?):[a-zA-Z0-9_\.\[\]]*\s*}}'
36)
37
38# Maps variable names to the comment associated with them in doc samples.
39VARIABLE_COMMENTS = {
40 '${ORG_ID?}':
41 '# Replace "${ORG_ID?}" with the numeric ID for your organization',
42 '${PROJECT_ID?}':
43 '# Replace "${PROJECT_ID?}" with your project ID',
44}
45
46# These suffixes should be removed from reference field names.
47# TODO(kcc-eng) Source this constant directly from
48# pkg/dcl/extension/extension.go
49TRIMMABLE_REFERENCE_SUFFIXES = [
50 'Name', 'Id', 'IdOrNum', 'Email', 'Link', 'Reference'
51]
52
53
54def add_dependency(dependencies: Set[str], value: str) -> None:
55 """Add the file name of the sample the given field value depends on, if any."""
56 # Take only the last DCL reference in the string as this is most likely the
57 # referenced resource.
58 reference_match = None
59 for reference_match in re.finditer(REFERENCE_REGEXP, value):
60 pass
61 if reference_match:
62 dependencies.add(f'samples/{reference_match.group(1)}')
63
64
65def find_dependencies(base: Mapping[str, Any]) -> Set[str]:
66 """Returns a set of config file names that the given config depends on."""
67 dependencies = set()
68 for key, value in base.items():
69 if isinstance(value, dict):
70 dependencies |= find_dependencies(value)
71 elif isinstance(value, list):
72 for element in value:
73 if isinstance(element, dict):
74 dependencies |= find_dependencies(element)
75 elif isinstance(element, str):
76 add_dependency(dependencies, element)
77 elif isinstance(value, str):
78 add_dependency(dependencies, value)
79 return dependencies
80
81
82@dataclass
83class Config:
84 """Corresponds to a single resource json file in DCL and a yaml file in KCC."""
85
86 def __init__(self, service: str, dcl_file: str):
87 self.dcl_file = dcl_file
88 # Infer service and resource name from filename.
89 file_name = os.path.split(self.dcl_file)[1]
90 file_name_parts = file_name.split('.')
91 if len(file_name_parts) == 3:
92 # This config belongs to the main service of the sample.
93 self.service = service
94 elif len(file_name_parts) == 4:
95 self.service = file_name_parts[-3]
96 else:
97 logging.warning(
98 f'{strings.colors.WARNING}Unable to infer service and resource name from {file_name}.{strings.colors.ENDC}'
99 )
100 return
101 self.resource = file_name_parts[-2]
102 self.kcc_kind = strings.snake_to_title(f'{self.service}_{self.resource}')
103 logging.info(f'Loading a {self.kcc_kind} from:\n{self.dcl_file}')
104 yaml = ruamel.yaml.YAML(typ='safe', pure=True)
105 with open(self.dcl_file) as file_object:
106 self.dcl_config = yaml.load(file_object)
107
108 def get(self, field) -> Any:
109 """Returns the value of the given field in this resource's dcl config."""
110 return self.dcl_config.get(field)
111
112 def depends_on(self) -> Set[str]:
113 return find_dependencies(self.dcl_config)
114
115 def convert_project(
116 self, variables_replaced: Mapping[str, Any]) -> Mapping[str, Any]:
117 """Convert the config of a project resource file from DCL to KCC format."""
118 metadata = {'name': variables_replaced['name']}
119 parent = variables_replaced['parent']
120 parent_type, parent_id = parent.split('s/')
121 spec = {
122 f'{parent_type}Ref': {
123 'external': parent_id,
124 },
125 }
126 display_name = variables_replaced.get('displayName')
127 if display_name:
128 spec['name'] = display_name
129 else:
130 # No display name, use project ID.
131 spec['name'] = variables_replaced['name']
132 return {
133 'apiVersion': 'resourcemanager.cnrm.cloud.google.com/v1beta1',
134 'kind': 'Project',
135 'metadata': metadata,
136 'spec': spec,
137 }
138
139 def convert(self, dependencies: Mapping[str, Config],
140 replacements: Mapping[str, str]) -> Mapping[str, Any]:
141 """Convert the config of a single resource file from DCL to KCC format."""
142 # Replace references before variables because the reference value will
143 # become a variable then be converted to a KCC value.
144 references_replaced = replace_strings(
145 self.dcl_config, lambda s: resolve_reference(s, dependencies))
146 variables_replaced = replace_strings(
147 references_replaced, lambda s: strings.replace_map(replacements, s))
148 if self.kcc_kind == 'Project':
149 return self.convert_project(variables_replaced)
150 if 'name' not in variables_replaced:
151 variables_replaced['name'] = f'{self.kcc_kind.lower()}-${{uniqueId}}'
152 # TODO(kcc-eng): Handle cases where 'labels' is not the metadata
153 # labels field.
154 metadata = {
155 key: variables_replaced[key]
156 for key in ('name', 'labels')
157 if key in variables_replaced
158 }
159 spec = {
160 key: value
161 for key, value in variables_replaced.items()
162 if key not in ('name', 'labels', 'project')
163 }
164 project = variables_replaced.get('project')
165 if isinstance(project, dict):
166 # The resource is in a project other than the base test project.
167 spec['projectRef'] = project
168 elif isinstance(project, str):
169 spec['projectRef'] = {'external': f'projects/{project}'}
170 converted = {
171 'apiVersion': f'{self.service}.cnrm.cloud.google.com/v1beta1',
172 'kind': self.kcc_kind,
173 }
174 if metadata:
175 converted['metadata'] = metadata
176 if spec:
177 converted['spec'] = spec
178 return converted
179
180
181def replace_strings(base: Mapping[str, Any],
182 replacer: Callable[str, Any]) -> Mapping[str, Any]:
183 """Recursively replace string values in the given mapping according to the given function, then return the result.
184
185 Also renames keys when the function returns a dict instead of a str.
186 """
187 replaced_base = {}
188 for key, value in base.items():
189 if isinstance(value, dict):
190 replaced_base[key] = replace_strings(value, replacer)
191 elif isinstance(value, list):
192 replaced_elements = []
193 for element in value:
194 if isinstance(element, dict):
195 replaced_elements.append(replace_strings(element, replacer))
196 elif isinstance(element, str):
197 replaced_elements.append(replacer(element))
198 else:
199 replaced_elements.append(element)
200 replaced_base[key] = replaced_elements
201 elif isinstance(value, str):
202 resolved = replacer(value)
203 if isinstance(resolved, dict):
204 # Only a resolved reference will return a dict from the replacer
205 # function.
206 for reference_suffix in TRIMMABLE_REFERENCE_SUFFIXES:
207 key = key.removesuffix(reference_suffix)
208 # TODO(kcc-eng): Handle references with multiple referenced types.
209 replaced_base[f'{key}Ref'] = resolved
210 else:
211 replaced_base[key] = resolved
212 else:
213 replaced_base[key] = value
214 return replaced_base
215
216
217def resolve_reference(value: str, dependencies: Mapping[str, Config]) -> Any:
218 """Return a KCC reference to the sample indicated by the given reference value."""
219 reference_match = re.search(REFERENCE_REGEXP, value)
220 if not reference_match:
221 return value
222 return {
223 'name':
224 dependencies.get(f'samples/{reference_match.group(1)}').get('name')
225 }
226
227
228def prepare_sample(config: Mapping[str, Any],
229 replacer: Callable[str, Any]) -> Mapping[str, Any]:
230 """Return the given KCC config in the format for a user sample."""
231 return add_comments(quote_spec(replace_strings(config, replacer)))
232
233
234def quote_spec(config: Mapping[str, Any]) -> Mapping[str, Any]:
235 """Return the given KCC config with all strings in spec quoted."""
236 quoted = {key: value for key, value in config.items() if key != 'spec'}
237 spec = config.get('spec')
238 if not spec:
239 return quoted
240 quoted['spec'] = replace_strings(
241 spec, lambda s: ruamel.yaml.scalarstring.DoubleQuotedScalarString(s))
242 return quoted
243
244
245def add_comments(base: Mapping[str, Any], depth: int = 0) -> Mapping[str, Any]:
246 """Return the given KCC config with comments needed for user samples."""
247 commented = ruamel.yaml.comments.CommentedMap()
248 for key, value in base.items():
249 if isinstance(value, dict):
250 commented[key] = add_comments(value, depth + 2)
251 else:
252 commented[key] = value
253 if isinstance(value, str):
254 for variable, comment in VARIABLE_COMMENTS.items():
255 if variable in value:
256 commented.yaml_set_start_comment(comment, depth)
257 return commented
View as plain text