...

Text file src/github.com/GoogleCloudPlatform/k8s-config-connector/scripts/dclsampleconverter/config.py

Documentation: github.com/GoogleCloudPlatform/k8s-config-connector/scripts/dclsampleconverter

     1# Copyright 2022 Google LLC
     2#
     3# Licensed under the Apache License, Version 2.0 (the "License");
     4# you may not use this file except in compliance with the License.
     5# You may obtain a copy of the License at
     6#
     7#      http://www.apache.org/licenses/LICENSE-2.0
     8#
     9# Unless required by applicable law or agreed to in writing, software
    10# distributed under the License is distributed on an "AS IS" BASIS,
    11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12# See the License for the specific language governing permissions and
    13# limitations under the License.
    14"""This file provides the Config class which manages a single resource config
    15
    16file in a sample.
    17"""
    18from __future__ import annotations
    19
    20import os
    21import re
    22import ruamel.yaml
    23
    24from absl import logging
    25from collections.abc import Mapping
    26from collections.abc import Set
    27from dataclasses import dataclass
    28from typing import Any, Callable
    29
    30import strings
    31
    32# This regex matches a dcl resource reference. The submatch is the resource
    33# name.
    34REFERENCE_REGEXP = re.compile(
    35    r'{{\s*ref:([a-z0-9_]*\.[a-z_]*\.[a-z_]*(\.[a-z_]*)?):[a-zA-Z0-9_\.\[\]]*\s*}}'
    36)
    37
    38# Maps variable names to the comment associated with them in doc samples.
    39VARIABLE_COMMENTS = {
    40    '${ORG_ID?}':
    41        '# Replace "${ORG_ID?}" with the numeric ID for your organization',
    42    '${PROJECT_ID?}':
    43        '# Replace "${PROJECT_ID?}" with your project ID',
    44}
    45
    46# These suffixes should be removed from reference field names.
    47# TODO(kcc-eng) Source this constant directly from
    48# pkg/dcl/extension/extension.go
    49TRIMMABLE_REFERENCE_SUFFIXES = [
    50    'Name', 'Id', 'IdOrNum', 'Email', 'Link', 'Reference'
    51]
    52
    53
    54def add_dependency(dependencies: Set[str], value: str) -> None:
    55  """Add the file name of the sample the given field value depends on, if any."""
    56  # Take only the last DCL reference in the string as this is most likely the
    57  # referenced resource.
    58  reference_match = None
    59  for reference_match in re.finditer(REFERENCE_REGEXP, value):
    60    pass
    61  if reference_match:
    62    dependencies.add(f'samples/{reference_match.group(1)}')
    63
    64
    65def find_dependencies(base: Mapping[str, Any]) -> Set[str]:
    66  """Returns a set of config file names that the given config depends on."""
    67  dependencies = set()
    68  for key, value in base.items():
    69    if isinstance(value, dict):
    70      dependencies |= find_dependencies(value)
    71    elif isinstance(value, list):
    72      for element in value:
    73        if isinstance(element, dict):
    74          dependencies |= find_dependencies(element)
    75        elif isinstance(element, str):
    76          add_dependency(dependencies, element)
    77    elif isinstance(value, str):
    78      add_dependency(dependencies, value)
    79  return dependencies
    80
    81
    82@dataclass
    83class Config:
    84  """Corresponds to a single resource json file in DCL and a yaml file in KCC."""
    85
    86  def __init__(self, service: str, dcl_file: str):
    87    self.dcl_file = dcl_file
    88    # Infer service and resource name from filename.
    89    file_name = os.path.split(self.dcl_file)[1]
    90    file_name_parts = file_name.split('.')
    91    if len(file_name_parts) == 3:
    92      # This config belongs to the main service of the sample.
    93      self.service = service
    94    elif len(file_name_parts) == 4:
    95      self.service = file_name_parts[-3]
    96    else:
    97      logging.warning(
    98          f'{strings.colors.WARNING}Unable to infer service and resource name from {file_name}.{strings.colors.ENDC}'
    99      )
   100      return
   101    self.resource = file_name_parts[-2]
   102    self.kcc_kind = strings.snake_to_title(f'{self.service}_{self.resource}')
   103    logging.info(f'Loading a {self.kcc_kind} from:\n{self.dcl_file}')
   104    yaml = ruamel.yaml.YAML(typ='safe', pure=True)
   105    with open(self.dcl_file) as file_object:
   106      self.dcl_config = yaml.load(file_object)
   107
   108  def get(self, field) -> Any:
   109    """Returns the value of the given field in this resource's dcl config."""
   110    return self.dcl_config.get(field)
   111
   112  def depends_on(self) -> Set[str]:
   113    return find_dependencies(self.dcl_config)
   114
   115  def convert_project(
   116      self, variables_replaced: Mapping[str, Any]) -> Mapping[str, Any]:
   117    """Convert the config of a project resource file from DCL to KCC format."""
   118    metadata = {'name': variables_replaced['name']}
   119    parent = variables_replaced['parent']
   120    parent_type, parent_id = parent.split('s/')
   121    spec = {
   122        f'{parent_type}Ref': {
   123            'external': parent_id,
   124        },
   125    }
   126    display_name = variables_replaced.get('displayName')
   127    if display_name:
   128      spec['name'] = display_name
   129    else:
   130      # No display name, use project ID.
   131      spec['name'] = variables_replaced['name']
   132    return {
   133        'apiVersion': 'resourcemanager.cnrm.cloud.google.com/v1beta1',
   134        'kind': 'Project',
   135        'metadata': metadata,
   136        'spec': spec,
   137    }
   138
   139  def convert(self, dependencies: Mapping[str, Config],
   140              replacements: Mapping[str, str]) -> Mapping[str, Any]:
   141    """Convert the config of a single resource file from DCL to KCC format."""
   142    # Replace references before variables because the reference value will
   143    # become a variable then be converted to a KCC value.
   144    references_replaced = replace_strings(
   145        self.dcl_config, lambda s: resolve_reference(s, dependencies))
   146    variables_replaced = replace_strings(
   147        references_replaced, lambda s: strings.replace_map(replacements, s))
   148    if self.kcc_kind == 'Project':
   149      return self.convert_project(variables_replaced)
   150    if 'name' not in variables_replaced:
   151      variables_replaced['name'] = f'{self.kcc_kind.lower()}-${{uniqueId}}'
   152    # TODO(kcc-eng): Handle cases where 'labels' is not the metadata
   153    # labels field.
   154    metadata = {
   155        key: variables_replaced[key]
   156        for key in ('name', 'labels')
   157        if key in variables_replaced
   158    }
   159    spec = {
   160        key: value
   161        for key, value in variables_replaced.items()
   162        if key not in ('name', 'labels', 'project')
   163    }
   164    project = variables_replaced.get('project')
   165    if isinstance(project, dict):
   166      # The resource is in a project other than the base test project.
   167      spec['projectRef'] = project
   168    elif isinstance(project, str):
   169      spec['projectRef'] = {'external': f'projects/{project}'}
   170    converted = {
   171        'apiVersion': f'{self.service}.cnrm.cloud.google.com/v1beta1',
   172        'kind': self.kcc_kind,
   173    }
   174    if metadata:
   175      converted['metadata'] = metadata
   176    if spec:
   177      converted['spec'] = spec
   178    return converted
   179
   180
   181def replace_strings(base: Mapping[str, Any],
   182                    replacer: Callable[str, Any]) -> Mapping[str, Any]:
   183  """Recursively replace string values in the given mapping according to the given function, then return the result.
   184
   185  Also renames keys when the function returns a dict instead of a str.
   186  """
   187  replaced_base = {}
   188  for key, value in base.items():
   189    if isinstance(value, dict):
   190      replaced_base[key] = replace_strings(value, replacer)
   191    elif isinstance(value, list):
   192      replaced_elements = []
   193      for element in value:
   194        if isinstance(element, dict):
   195          replaced_elements.append(replace_strings(element, replacer))
   196        elif isinstance(element, str):
   197          replaced_elements.append(replacer(element))
   198        else:
   199          replaced_elements.append(element)
   200      replaced_base[key] = replaced_elements
   201    elif isinstance(value, str):
   202      resolved = replacer(value)
   203      if isinstance(resolved, dict):
   204        # Only a resolved reference will return a dict from the replacer
   205        # function.
   206        for reference_suffix in TRIMMABLE_REFERENCE_SUFFIXES:
   207          key = key.removesuffix(reference_suffix)
   208        # TODO(kcc-eng): Handle references with multiple referenced types.
   209        replaced_base[f'{key}Ref'] = resolved
   210      else:
   211        replaced_base[key] = resolved
   212    else:
   213      replaced_base[key] = value
   214  return replaced_base
   215
   216
   217def resolve_reference(value: str, dependencies: Mapping[str, Config]) -> Any:
   218  """Return a KCC reference to the sample indicated by the given reference value."""
   219  reference_match = re.search(REFERENCE_REGEXP, value)
   220  if not reference_match:
   221    return value
   222  return {
   223      'name':
   224          dependencies.get(f'samples/{reference_match.group(1)}').get('name')
   225  }
   226
   227
   228def prepare_sample(config: Mapping[str, Any],
   229                   replacer: Callable[str, Any]) -> Mapping[str, Any]:
   230  """Return the given KCC config in the format for a user sample."""
   231  return add_comments(quote_spec(replace_strings(config, replacer)))
   232
   233
   234def quote_spec(config: Mapping[str, Any]) -> Mapping[str, Any]:
   235  """Return the given KCC config with all strings in spec quoted."""
   236  quoted = {key: value for key, value in config.items() if key != 'spec'}
   237  spec = config.get('spec')
   238  if not spec:
   239    return quoted
   240  quoted['spec'] = replace_strings(
   241      spec, lambda s: ruamel.yaml.scalarstring.DoubleQuotedScalarString(s))
   242  return quoted
   243
   244
   245def add_comments(base: Mapping[str, Any], depth: int = 0) -> Mapping[str, Any]:
   246  """Return the given KCC config with comments needed for user samples."""
   247  commented = ruamel.yaml.comments.CommentedMap()
   248  for key, value in base.items():
   249    if isinstance(value, dict):
   250      commented[key] = add_comments(value, depth + 2)
   251    else:
   252      commented[key] = value
   253      if isinstance(value, str):
   254        for variable, comment in VARIABLE_COMMENTS.items():
   255          if variable in value:
   256            commented.yaml_set_start_comment(comment, depth)
   257  return commented

View as plain text