@@ -1736,6 +1739,62 @@ loading of the container at first boot
+
+
+
+
+
+
+
+
+ suse
+ rhel
+ debian
+ archlinux
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
Syncing: {cert_file} to {ca_chroot_path}'
+ )
+ DataSync(cert_file, ca_chroot_path).sync_data()
+ has_new_certs = True
+ else:
+ log.warning(
+ f'--> Skipping: {cert_file}: '
+ 'does not exist or is directory'
+ )
+ if has_new_certs:
+ ca_tool = ca_update_info['tool']
+ log.info(
+ f'--> Updating CA certificate store with: {ca_tool}'
+ )
+ Command.run(
+ ['chroot', self.root_dir, ca_tool]
+ )
+ except Exception as issue:
+ raise KiwiBootStrapPhaseFailed(
+ self.issue_message.format(
+ headline='Failed to setup custom CA certificates',
+ reason=issue
+ )
+ )
+ else:
+ log.warning(
+ 'Could not determine CA update tool, skipping setup'
+ )
+
def _install_archives(self, archive_list, archive_target_dir_dict):
log.info("Installing archives")
for archive in archive_list:
diff --git a/kiwi/tasks/system_build.py b/kiwi/tasks/system_build.py
index a42aa463d7f..755b1cbd029 100644
--- a/kiwi/tasks/system_build.py
+++ b/kiwi/tasks/system_build.py
@@ -28,6 +28,8 @@
[--add-repo-credentials=...]
[--add-package=...]
[--add-bootstrap-package=...]
+ [--ca-cert=...]
+ [--ca-target-distribution=]
[--delete-package=...]
[--set-container-derived-from=]
[--set-container-tag=]
@@ -71,6 +73,15 @@
--clear-cache
delete repository cache for each of the used repositories
before installing any package
+ --ca-cert=
+ include additional CA certificate to import immediately after
+ bootstrap and make available during the build process.
+ --ca-target-distribution=
+ Specify target distribution for the import of certificates
+ via the --ca-cert options(s) and/or the provided
+ from the image description. The selected distribution is used
+ in KIWI to map the distribution specific CA storage path and
+ update tool for the import process.
--delete-package=
delete the given package name
--description=
@@ -133,6 +144,10 @@
from kiwi.privileges import Privileges
from kiwi.path import Path
+from kiwi.exceptions import (
+ KiwiCATargetDistributionError
+)
+
log = logging.getLogger('kiwi')
@@ -252,6 +267,28 @@ def process(self):
self.command_args['--set-container-derived-from']
)
+ if self.command_args['--ca-cert']:
+ ca_certs = self.command_args['--ca-cert']
+ if ca_certs:
+ target_distribution = \
+ self.command_args['--ca-target-distribution'] or \
+ self.xml_state.get_certificates_target_distribution()
+ if not target_distribution or not Defaults.get_ca_update_map(
+ target_distribution
+ ):
+ raise KiwiCATargetDistributionError(
+ 'No or invalid CA target distribution, {} {}'.format(
+ 'set via --ca-target-distribution.',
+ 'allowed values are {}'.format(
+ Defaults.get_ca_target_distributions()
+ )
+ )
+ )
+ for certificate in ca_certs:
+ self.xml_state.add_certificate(
+ certificate, target_distribution
+ )
+
self.run_checks(self.checks_after_command_args)
log.info('Preparing new root system')
@@ -279,6 +316,9 @@ def process(self):
# call post_bootstrap.sh script if present
setup.call_post_bootstrap_script()
+ # Setup custom CA certificates after bootstrap package install
+ system.setup_ca_certificates()
+
system.install_system(
manager
)
diff --git a/kiwi/tasks/system_prepare.py b/kiwi/tasks/system_prepare.py
index 42a0e4b5cae..b66c9711751 100644
--- a/kiwi/tasks/system_prepare.py
+++ b/kiwi/tasks/system_prepare.py
@@ -28,6 +28,8 @@
[--add-repo-credentials=...]
[--add-package=...]
[--add-bootstrap-package=...]
+ [--ca-cert=...]
+ [--ca-target-distribution=]
[--delete-package=...]
[--set-container-derived-from=]
[--set-container-tag=]
@@ -69,6 +71,15 @@
--clear-cache
delete repository cache for each of the used repositories
before installing any package
+ --ca-cert=
+ include additional CA certificate to import immediately after
+ bootstrap and make available during the build process.
+ --ca-target-distribution=
+ Specify target distribution for the import of certificates
+ via the --ca-cert options(s) and/or the provided
+ from the image description. The selected distribution is used
+ in KIWI to map the distribution specific CA storage path and
+ update tool for the import process.
--delete-package=
delete the given package name
--description=
@@ -129,6 +140,10 @@
from kiwi.defaults import Defaults
from kiwi.system.profile import Profile
+from kiwi.exceptions import (
+ KiwiCATargetDistributionError
+)
+
log = logging.getLogger('kiwi')
@@ -235,6 +250,28 @@ def process(self):
self.command_args['--set-container-derived-from']
)
+ if self.command_args['--ca-cert']:
+ ca_certs = self.command_args['--ca-cert']
+ if ca_certs:
+ target_distribution = \
+ self.command_args['--ca-target-distribution'] or \
+ self.xml_state.get_certificates_target_distribution()
+ if not target_distribution or not Defaults.get_ca_update_map(
+ target_distribution
+ ):
+ raise KiwiCATargetDistributionError(
+ 'No or invalid CA target distribution, {} {}'.format(
+ 'set via --ca-target-distribution.',
+ 'allowed values are {}'.format(
+ Defaults.get_ca_target_distributions()
+ )
+ )
+ )
+ for certificate in ca_certs:
+ self.xml_state.add_certificate(
+ certificate, target_distribution
+ )
+
self.run_checks(self.checks_after_command_args)
log.info('Preparing system')
@@ -262,6 +299,9 @@ def process(self):
# call post_bootstrap.sh script if present
setup.call_post_bootstrap_script()
+ # Setup custom CA certificates after bootstrap package install
+ system.setup_ca_certificates()
+
system.install_system(
manager
)
diff --git a/kiwi/xml_parse.py b/kiwi/xml_parse.py
index 2103d6ce518..9b0ecbb6295 100644
--- a/kiwi/xml_parse.py
+++ b/kiwi/xml_parse.py
@@ -16,7 +16,7 @@
# kiwi/schema/kiwi_for_generateDS.xsd
#
# Command line:
-# /home/ms/.cache/pypoetry/virtualenvs/kiwi-Btua-i95-py3.11/bin/generateDS.py -f --external-encoding="utf-8" --no-dates --no-warnings -o "kiwi/xml_parse.py" kiwi/schema/kiwi_for_generateDS.xsd
+# /home/ms/.cache/pypoetry/virtualenvs/kiwi-ZjQOFkNX-py3.11/bin/generateDS.py -f --external-encoding="utf-8" --no-dates --no-warnings -o "kiwi/xml_parse.py" kiwi/schema/kiwi_for_generateDS.xsd
#
# Current working directory (os.getcwd()):
# kiwi
@@ -813,7 +813,7 @@ class image(GeneratedsSuper):
"""The root element of the configuration file"""
subclass = None
superclass = None
- def __init__(self, name=None, displayname=None, id=None, schemaversion=None, noNamespaceSchemaLocation=None, schemaLocation=None, include=None, description=None, preferences=None, profiles=None, users=None, drivers=None, strip=None, repository=None, containers=None, packages=None, extension=None):
+ def __init__(self, name=None, displayname=None, id=None, schemaversion=None, noNamespaceSchemaLocation=None, schemaLocation=None, include=None, certificates=None, description=None, preferences=None, profiles=None, users=None, drivers=None, strip=None, repository=None, containers=None, packages=None, extension=None):
self.original_tagname_ = None
self.name = _cast(None, name)
self.displayname = _cast(None, displayname)
@@ -825,6 +825,10 @@ def __init__(self, name=None, displayname=None, id=None, schemaversion=None, noN
self.include = []
else:
self.include = include
+ if certificates is None:
+ self.certificates = []
+ else:
+ self.certificates = certificates
if description is None:
self.description = []
else:
@@ -881,6 +885,11 @@ def set_include(self, include): self.include = include
def add_include(self, value): self.include.append(value)
def insert_include_at(self, index, value): self.include.insert(index, value)
def replace_include_at(self, index, value): self.include[index] = value
+ def get_certificates(self): return self.certificates
+ def set_certificates(self, certificates): self.certificates = certificates
+ def add_certificates(self, value): self.certificates.append(value)
+ def insert_certificates_at(self, index, value): self.certificates.insert(index, value)
+ def replace_certificates_at(self, index, value): self.certificates[index] = value
def get_description(self): return self.description
def set_description(self, description): self.description = description
def add_description(self, value): self.description.append(value)
@@ -953,6 +962,7 @@ def validate_safe_posix_name(self, value):
def hasContent_(self):
if (
self.include or
+ self.certificates or
self.description or
self.preferences or
self.profiles or
@@ -1014,6 +1024,8 @@ def exportChildren(self, outfile, level, namespaceprefix_='', name_='image', fro
eol_ = ''
for include_ in self.include:
include_.export(outfile, level, namespaceprefix_, name_='include', pretty_print=pretty_print)
+ for certificates_ in self.certificates:
+ certificates_.export(outfile, level, namespaceprefix_, name_='certificates', pretty_print=pretty_print)
for description_ in self.description:
description_.export(outfile, level, namespaceprefix_, name_='description', pretty_print=pretty_print)
for preferences_ in self.preferences:
@@ -1075,6 +1087,11 @@ def buildChildren(self, child_, node, nodeName_, fromsubclass_=False):
obj_.build(child_)
self.include.append(obj_)
obj_.original_tagname_ = 'include'
+ elif nodeName_ == 'certificates':
+ obj_ = certificates.factory()
+ obj_.build(child_)
+ self.certificates.append(obj_)
+ obj_.original_tagname_ = 'certificates'
elif nodeName_ == 'description':
obj_ = description.factory()
obj_.build(child_)
@@ -2707,6 +2724,174 @@ def buildChildren(self, child_, node, nodeName_, fromsubclass_=False):
# end class container
+class certificates(GeneratedsSuper):
+ subclass = None
+ superclass = None
+ def __init__(self, profiles=None, target_distribution=None, certificate=None):
+ self.original_tagname_ = None
+ self.profiles = _cast(None, profiles)
+ self.target_distribution = _cast(None, target_distribution)
+ if certificate is None:
+ self.certificate = []
+ else:
+ self.certificate = certificate
+ def factory(*args_, **kwargs_):
+ if CurrentSubclassModule_ is not None:
+ subclass = getSubclassFromModule_(
+ CurrentSubclassModule_, certificates)
+ if subclass is not None:
+ return subclass(*args_, **kwargs_)
+ if certificates.subclass:
+ return certificates.subclass(*args_, **kwargs_)
+ else:
+ return certificates(*args_, **kwargs_)
+ factory = staticmethod(factory)
+ def get_certificate(self): return self.certificate
+ def set_certificate(self, certificate): self.certificate = certificate
+ def add_certificate(self, value): self.certificate.append(value)
+ def insert_certificate_at(self, index, value): self.certificate.insert(index, value)
+ def replace_certificate_at(self, index, value): self.certificate[index] = value
+ def get_profiles(self): return self.profiles
+ def set_profiles(self, profiles): self.profiles = profiles
+ def get_target_distribution(self): return self.target_distribution
+ def set_target_distribution(self, target_distribution): self.target_distribution = target_distribution
+ def hasContent_(self):
+ if (
+ self.certificate
+ ):
+ return True
+ else:
+ return False
+ def export(self, outfile, level, namespaceprefix_='', name_='certificates', namespacedef_='', pretty_print=True):
+ imported_ns_def_ = GenerateDSNamespaceDefs_.get('certificates')
+ if imported_ns_def_ is not None:
+ namespacedef_ = imported_ns_def_
+ if pretty_print:
+ eol_ = '\n'
+ else:
+ eol_ = ''
+ if self.original_tagname_ is not None:
+ name_ = self.original_tagname_
+ showIndent(outfile, level, pretty_print)
+ outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', ))
+ already_processed = set()
+ self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='certificates')
+ if self.hasContent_():
+ outfile.write('>%s' % (eol_, ))
+ self.exportChildren(outfile, level + 1, namespaceprefix_='', name_='certificates', pretty_print=pretty_print)
+ showIndent(outfile, level, pretty_print)
+ outfile.write('%s%s>%s' % (namespaceprefix_, name_, eol_))
+ else:
+ outfile.write('/>%s' % (eol_, ))
+ def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='certificates'):
+ if self.profiles is not None and 'profiles' not in already_processed:
+ already_processed.add('profiles')
+ outfile.write(' profiles=%s' % (self.gds_encode(self.gds_format_string(quote_attrib(self.profiles), input_name='profiles')), ))
+ if self.target_distribution is not None and 'target_distribution' not in already_processed:
+ already_processed.add('target_distribution')
+ outfile.write(' target_distribution=%s' % (self.gds_encode(self.gds_format_string(quote_attrib(self.target_distribution), input_name='target_distribution')), ))
+ def exportChildren(self, outfile, level, namespaceprefix_='', name_='certificates', fromsubclass_=False, pretty_print=True):
+ if pretty_print:
+ eol_ = '\n'
+ else:
+ eol_ = ''
+ for certificate_ in self.certificate:
+ certificate_.export(outfile, level, namespaceprefix_, name_='certificate', pretty_print=pretty_print)
+ def build(self, node):
+ already_processed = set()
+ self.buildAttributes(node, node.attrib, already_processed)
+ for child in node:
+ nodeName_ = Tag_pattern_.match(child.tag).groups()[-1]
+ self.buildChildren(child, node, nodeName_)
+ return self
+ def buildAttributes(self, node, attrs, already_processed):
+ value = find_attr_value_('profiles', node)
+ if value is not None and 'profiles' not in already_processed:
+ already_processed.add('profiles')
+ self.profiles = value
+ value = find_attr_value_('target_distribution', node)
+ if value is not None and 'target_distribution' not in already_processed:
+ already_processed.add('target_distribution')
+ self.target_distribution = value
+ self.target_distribution = ' '.join(self.target_distribution.split())
+ def buildChildren(self, child_, node, nodeName_, fromsubclass_=False):
+ if nodeName_ == 'certificate':
+ obj_ = certificate.factory()
+ obj_.build(child_)
+ self.certificate.append(obj_)
+ obj_.original_tagname_ = 'certificate'
+# end class certificates
+
+
+class certificate(GeneratedsSuper):
+ subclass = None
+ superclass = None
+ def __init__(self, name=None):
+ self.original_tagname_ = None
+ self.name = _cast(None, name)
+ def factory(*args_, **kwargs_):
+ if CurrentSubclassModule_ is not None:
+ subclass = getSubclassFromModule_(
+ CurrentSubclassModule_, certificate)
+ if subclass is not None:
+ return subclass(*args_, **kwargs_)
+ if certificate.subclass:
+ return certificate.subclass(*args_, **kwargs_)
+ else:
+ return certificate(*args_, **kwargs_)
+ factory = staticmethod(factory)
+ def get_name(self): return self.name
+ def set_name(self, name): self.name = name
+ def hasContent_(self):
+ if (
+
+ ):
+ return True
+ else:
+ return False
+ def export(self, outfile, level, namespaceprefix_='', name_='certificate', namespacedef_='', pretty_print=True):
+ imported_ns_def_ = GenerateDSNamespaceDefs_.get('certificate')
+ if imported_ns_def_ is not None:
+ namespacedef_ = imported_ns_def_
+ if pretty_print:
+ eol_ = '\n'
+ else:
+ eol_ = ''
+ if self.original_tagname_ is not None:
+ name_ = self.original_tagname_
+ showIndent(outfile, level, pretty_print)
+ outfile.write('<%s%s%s' % (namespaceprefix_, name_, namespacedef_ and ' ' + namespacedef_ or '', ))
+ already_processed = set()
+ self.exportAttributes(outfile, level, already_processed, namespaceprefix_, name_='certificate')
+ if self.hasContent_():
+ outfile.write('>%s' % (eol_, ))
+ self.exportChildren(outfile, level + 1, namespaceprefix_='', name_='certificate', pretty_print=pretty_print)
+ outfile.write('%s%s>%s' % (namespaceprefix_, name_, eol_))
+ else:
+ outfile.write('/>%s' % (eol_, ))
+ def exportAttributes(self, outfile, level, already_processed, namespaceprefix_='', name_='certificate'):
+ if self.name is not None and 'name' not in already_processed:
+ already_processed.add('name')
+ outfile.write(' name=%s' % (self.gds_encode(self.gds_format_string(quote_attrib(self.name), input_name='name')), ))
+ def exportChildren(self, outfile, level, namespaceprefix_='', name_='certificate', fromsubclass_=False, pretty_print=True):
+ pass
+ def build(self, node):
+ already_processed = set()
+ self.buildAttributes(node, node.attrib, already_processed)
+ for child in node:
+ nodeName_ = Tag_pattern_.match(child.tag).groups()[-1]
+ self.buildChildren(child, node, nodeName_)
+ return self
+ def buildAttributes(self, node, attrs, already_processed):
+ value = find_attr_value_('name', node)
+ if value is not None and 'name' not in already_processed:
+ already_processed.add('name')
+ self.name = value
+ def buildChildren(self, child_, node, nodeName_, fromsubclass_=False):
+ pass
+# end class certificate
+
+
class repository(k_source):
"""The Name of the Repository"""
subclass = None
@@ -10355,6 +10540,8 @@ def main():
"argument",
"bootloader",
"bootloadersettings",
+ "certificate",
+ "certificates",
"collectionModule",
"configoption",
"container",
diff --git a/kiwi/xml_state.py b/kiwi/xml_state.py
index 8edaf7b699f..eecb1bb56b3 100644
--- a/kiwi/xml_state.py
+++ b/kiwi/xml_state.py
@@ -2453,6 +2453,60 @@ def add_repository(
)
)
+ def add_certificate(self, cert_file: str, target_distribution: str) -> None:
+ """
+ Add to main section
+ The main section will be created if it does not exist. Also
+ setup the target_distribution in the resulting main section.
+ """
+ certificates_section = self._profiled(
+ self.xml_data.get_certificates()
+ )
+ if not certificates_section:
+ self.xml_data.set_certificates(
+ [
+ xml_parse.certificates(
+ target_distribution=target_distribution,
+ certificate=[xml_parse.certificate(name=cert_file)]
+ )
+ ]
+ )
+ else:
+ certificates_section[0].set_target_distribution(
+ target_distribution
+ )
+ certificates_section[0].add_certificate(
+ xml_parse.certificate(
+ name=cert_file
+ )
+ )
+
+ def get_certificates(self) -> List[str]:
+ """
+ Read list of certificates
+ """
+ cert_list = []
+ certificates_section = self._profiled(
+ self.xml_data.get_certificates()
+ )
+ if certificates_section:
+ for certificate in certificates_section[0].get_certificate():
+ cert_list.append(certificate.get_name())
+ return sorted(list(set(cert_list)))
+
+ def get_certificates_target_distribution(self) -> str:
+ """
+ Read CA target distribution
+ """
+ target_distribution = ''
+ certificates_section = self._profiled(
+ self.xml_data.get_certificates()
+ )
+ if certificates_section:
+ target_distribution = \
+ certificates_section[0].get_target_distribution()
+ return target_distribution
+
def resolve_this_path(self) -> None:
"""
Resolve any this:// repo source path into the path
diff --git a/test/data/example_config.xml b/test/data/example_config.xml
index 8f98eb4446f..a82eabfe3b3 100644
--- a/test/data/example_config.xml
+++ b/test/data/example_config.xml
@@ -1,6 +1,9 @@
+
+
+
diff --git a/test/unit/cli_test.py b/test/unit/cli_test.py
index d4484688953..a511d55baba 100644
--- a/test/unit/cli_test.py
+++ b/test/unit/cli_test.py
@@ -62,6 +62,8 @@ def setup(self):
'--set-repo-credentials': None,
'--add-package': [],
'--add-bootstrap-package': [],
+ '--ca-cert': [],
+ '--ca-target-distribution': None,
'--delete-package': [],
'--set-container-derived-from': None,
'--set-container-tag': None,
diff --git a/test/unit/system/prepare_test.py b/test/unit/system/prepare_test.py
index 3908f29f232..ebbf508e20f 100644
--- a/test/unit/system/prepare_test.py
+++ b/test/unit/system/prepare_test.py
@@ -530,3 +530,73 @@ def test_clean_package_manager_leftovers(self, mock_manager, mock_repo):
mock_manager.return_value.__enter__.return_value = manager
self.system.clean_package_manager_leftovers()
manager.clean_leftovers.assert_called_once_with()
+
+ @patch('os.path.isdir', return_value=True)
+ @patch('os.path.exists', return_value=True)
+ def test_setup_ca_certificates_name_exists_but_is_dir(
+ self, mock_os_path_exists, mock_os_path_isdir
+ ):
+ with self._caplog.at_level(logging.WARNING):
+ self.system.setup_ca_certificates()
+ assert '/some/ca/filename: does not exist or is directory' in \
+ self._caplog.text
+
+ def test_setup_ca_certificates_no_update_info(self):
+ self.state.get_certificates_target_distribution = Mock(
+ return_value=None
+ )
+ with self._caplog.at_level(logging.WARNING):
+ self.system.setup_ca_certificates()
+ assert 'Could not determine CA update tool, skipping setup' in \
+ self._caplog.text
+
+ @patch('kiwi.system.prepare.Command.run')
+ @patch('kiwi.system.prepare.DataSync')
+ @patch('os.path.exists', return_value=True)
+ @patch('kiwi.system.prepare.Path')
+ @patch('os.path.isdir', return_value=False)
+ def test_setup_ca_certificates_success(
+ self,
+ mock_os_path_isdir,
+ mock_Path,
+ mock_os_path_exists,
+ mock_DataSync,
+ mock_Command_run
+ ):
+ self.state.get_certificates_target_distribution = Mock(
+ return_value='suse'
+ )
+ mock_DataSync_instance = Mock()
+ mock_DataSync.return_value = mock_DataSync_instance
+
+ self.system.setup_ca_certificates()
+
+ ca_chroot_path = 'root_dir/etc/pki/trust/anchors'
+ mock_Path.create.assert_called_once_with(ca_chroot_path)
+ mock_DataSync.assert_called_once_with(
+ '/some/ca/filename', ca_chroot_path
+ )
+ mock_Command_run.assert_called_once_with(
+ ['chroot', 'root_dir', 'update-ca-certificates']
+ )
+
+ @patch('kiwi.system.prepare.Command.run')
+ @patch('kiwi.system.prepare.DataSync')
+ @patch('os.path.exists', return_value=True)
+ @patch('kiwi.system.prepare.Path')
+ @patch('os.path.isdir', return_value=False)
+ def test_setup_ca_certificates_raises(
+ self,
+ mock_os_path_isdir,
+ mock_Path,
+ mock_os_path_exists,
+ mock_DataSync,
+ mock_Command_run
+ ):
+ self.state.get_certificates_target_distribution = Mock(
+ return_value='suse'
+ )
+ mock_DataSync.side_effect = Exception
+
+ with raises(KiwiBootStrapPhaseFailed):
+ self.system.setup_ca_certificates()
diff --git a/test/unit/tasks/system_build_test.py b/test/unit/tasks/system_build_test.py
index 7a16e9ba670..2cb3c345678 100644
--- a/test/unit/tasks/system_build_test.py
+++ b/test/unit/tasks/system_build_test.py
@@ -1,12 +1,15 @@
import logging
import sys
import os
-from pytest import fixture
+from pytest import (
+ fixture, raises
+)
from unittest.mock import (
patch, call, Mock, MagicMock
)
import kiwi
+from kiwi.exceptions import KiwiCATargetDistributionError
from ..test_helper import argv_kiwi_tests
@@ -93,6 +96,8 @@ def _init_command_args(self):
self.task.command_args['--add-container-label'] = []
self.task.command_args['--clear-cache'] = False
self.task.command_args['--signing-key'] = []
+ self.task.command_args['--ca-cert'] = []
+ self.task.command_args['--ca-target-distribution'] = None
@patch('kiwi.logger.Logger.set_logfile')
@patch('kiwi.xml_state.XMLState.get_repositories_signing_keys')
@@ -162,6 +167,7 @@ def test_process_system_build(
system_prepare.install_bootstrap.assert_called_once_with(
manager.__enter__.return_value, []
)
+ system_prepare.setup_ca_certificates.assert_called_once_with()
system_prepare.install_system.assert_called_once_with(
manager.__enter__.return_value
)
@@ -445,3 +451,25 @@ def test_process_system_prepare_ignore_repos_used_for_build(
self.task.command_args['--ignore-repos-used-for-build'] = True
self.task.process()
mock_delete_repos.assert_called_once_with()
+
+ @patch('kiwi.xml_state.XMLState.add_certificate')
+ @patch('kiwi.tasks.system_build.SystemPrepare')
+ @patch('kiwi.logger.Logger.set_logfile')
+ def test_process_system_prepare_with_custom_ca_certs(
+ self, mock_log, mock_SystemPrepare, mock_add_certificate
+ ):
+ system_prepare = Mock()
+ system_prepare.setup_repositories = Mock(
+ return_value=MagicMock()
+ )
+ mock_SystemPrepare.return_value.__enter__.return_value = system_prepare
+ self._init_command_args()
+ self.task.command_args['--ca-cert'] = ['/some/ca/filename']
+ with raises(KiwiCATargetDistributionError):
+ self.task.process()
+ self.task.command_args['--ca-target-distribution'] = 'suse'
+ self.task.process()
+ mock_add_certificate.assert_called_once_with(
+ '/some/ca/filename', 'suse'
+ )
+ system_prepare.setup_ca_certificates.assert_called_once_with()
diff --git a/test/unit/tasks/system_prepare_test.py b/test/unit/tasks/system_prepare_test.py
index 3c963140f80..1251edaf86a 100644
--- a/test/unit/tasks/system_prepare_test.py
+++ b/test/unit/tasks/system_prepare_test.py
@@ -1,12 +1,15 @@
import logging
import sys
import os
-from pytest import fixture
+from pytest import (
+ fixture, raises
+)
from unittest.mock import (
patch, call, Mock, MagicMock
)
import kiwi
+from kiwi.exceptions import KiwiCATargetDistributionError
from ..test_helper import argv_kiwi_tests
@@ -86,6 +89,8 @@ def _init_command_args(self):
self.task.command_args['--set-container-tag'] = None
self.task.command_args['--add-container-label'] = []
self.task.command_args['--signing-key'] = []
+ self.task.command_args['--ca-cert'] = []
+ self.task.command_args['--ca-target-distribution'] = None
@patch('kiwi.xml_state.XMLState.get_repositories_signing_keys')
@patch('kiwi.tasks.system_prepare.SystemPrepare')
@@ -410,3 +415,25 @@ def test_process_system_prepare_delete_repos_used_for_build(
self.task.command_args['--ignore-repos-used-for-build'] = True
self.task.process()
mock_delete_repos.assert_called_once_with()
+
+ @patch('kiwi.xml_state.XMLState.add_certificate')
+ @patch('kiwi.tasks.system_prepare.SystemPrepare')
+ @patch('kiwi.logger.Logger.set_logfile')
+ def test_ca_certs_path_handling(
+ self, mock_log, mock_SystemPrepare, mock_add_certificate
+ ):
+ system_prepare = Mock()
+ system_prepare.setup_repositories = Mock(
+ return_value=MagicMock()
+ )
+ mock_SystemPrepare.return_value.__enter__.return_value = system_prepare
+ self._init_command_args()
+ self.task.command_args['--ca-cert'] = ['/some/ca/filename']
+ with raises(KiwiCATargetDistributionError):
+ self.task.process()
+ self.task.command_args['--ca-target-distribution'] = 'suse'
+ self.task.process()
+ mock_add_certificate.assert_called_once_with(
+ '/some/ca/filename', 'suse'
+ )
+ system_prepare.setup_ca_certificates.assert_called_once_with()
diff --git a/test/unit/xml_state_test.py b/test/unit/xml_state_test.py
index cc8a2ef9fc0..5da9fcccec4 100644
--- a/test/unit/xml_state_test.py
+++ b/test/unit/xml_state_test.py
@@ -1398,3 +1398,21 @@ def test_btrfs_default_volume_requested(
assert self.state.btrfs_default_volume_requested() is False
mock_get_btrfs_set_default_volume.return_value = None
assert self.state.btrfs_default_volume_requested() is True
+
+ def test_get_certificates(self):
+ assert self.state.get_certificates() == ['/some/ca/filename']
+ assert self.state.get_certificates_target_distribution() == 'suse'
+ self.state.add_certificate('/new/file', 'rhel')
+ assert self.state.get_certificates() == [
+ '/new/file', '/some/ca/filename'
+ ]
+ assert self.state.get_certificates_target_distribution() == 'rhel'
+ self.state.add_certificate('/new/file', 'rhel')
+ assert self.state.get_certificates() == [
+ '/new/file', '/some/ca/filename'
+ ]
+ assert self.apt_state.get_certificates() == []
+ assert self.apt_state.get_certificates_target_distribution() == ''
+ self.apt_state.add_certificate('/new/file', 'debian')
+ assert self.apt_state.get_certificates() == ['/new/file']
+ assert self.apt_state.get_certificates_target_distribution() == 'debian'