diff --git a/qubesadmin/tests/tools/qvm_template.py b/qubesadmin/tests/tools/qvm_template.py index f25ee0e1..f73e89ee 100644 --- a/qubesadmin/tests/tools/qvm_template.py +++ b/qubesadmin/tests/tools/qvm_template.py @@ -5559,6 +5559,205 @@ def execute(pubkey, packagename): gen_rpm(False, execute) self.assertAllCalled() + @mock.patch("qubesadmin.tools.qvm_template._is_file_in_repo_templates_keys_dir") + def test_260_gpg_key_and_ssl_cert_in_payload(self, mock_file_in_keysdir): + with tempfile.NamedTemporaryFile() as repo_conf1, \ + tempfile.NamedTemporaryFile() as repo_conf2, \ + tempfile.NamedTemporaryFile(prefix="gpg-") as gpg_key_primary, \ + tempfile.NamedTemporaryFile(prefix="sslcert-") as ssl_cert, \ + tempfile.NamedTemporaryFile(prefix="sslkey-") as ssl_key: + mock_file_in_keysdir.return_value = True + repo_str1 = \ +'''[qubes-templates-itl] +name = Qubes Templates repository +#baseurl = https://yum.qubes-os.org/r$releasever/templates-itl +#baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl +metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink +enabled = 1 +fastestmirror = 1 +metadata_expire = 7d +gpgcheck = 1 +gpgkey = file://{} +'''.format(gpg_key_primary.name) + repo_str2 = \ +'''[qubes-templates-itl-testing] +name = Qubes Templates repository +#baseurl = https://yum.qubes-os.org/r$releasever/templates-itl-testing +#baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl-testing +metalink = https://yum.qubes-os.org/r$releasever/templates-itl-testing/repodata/repomd.xml.metalink +enabled = 0 +fastestmirror = 1 +gpgcheck = 1 +gpgkey = file://{} +sslclientcert = {} +sslclientkey = {} +'''.format(gpg_key_primary.name, + ssl_cert.name, + ssl_key.name) + repo_conf1.write(repo_str1.encode()) + repo_conf1.flush() + repo_conf2.write(repo_str2.encode()) + repo_conf2.flush() + gpg_key_primary.write(b"ABC") + gpg_key_primary.flush() + ssl_cert.write(b"BCD") + ssl_cert.flush() + ssl_key.write(b"CDE") + ssl_key.flush() + wrapper = ''' +###!Q!BEGIN-QUBES-WRAPPER!Q!### +#{} +#QkNE +#{} +#Q0RF +#{} +#QUJD +###!Q!END-QUBES-WRAPPER!Q!###'''.format(ssl_cert.name, + ssl_key.name, + gpg_key_primary.name) + args = argparse.Namespace( + repos=[('enablerepo', 'repo1'), ('enablerepo', 'repo2'), + ('disablerepo', 'repo3'), ('disablerepo', 'repo4'), + ('disablerepo', 'repo5')], + releasever='4.2', + repo_files=[repo_conf1.name, repo_conf2.name] + ) + res = qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, + 'qubes-template-fedora-32', + True) + self.assertEqual(res, +'''--enablerepo=repo1 +--enablerepo=repo2 +--disablerepo=repo3 +--disablerepo=repo4 +--disablerepo=repo5 +--refresh +--releasever=4.2 +qubes-template-fedora-32 +--- +''' + repo_str1 + '\n' + repo_str2 + '\n' + wrapper) + self.assertAllCalled() + + @mock.patch("qubesadmin.tools.qvm_template._is_file_in_repo_templates_keys_dir") + def test_261_gpg_key_not_found_should_not_raise_error(self, mock_file_in_keysdir): + with tempfile.NamedTemporaryFile() as repo_conf: + mock_file_in_keysdir.return_value = False + repo_str = \ +'''[qubes-templates-itl] +name = Qubes Templates repository +#baseurl = https://yum.qubes-os.org/r$releasever/templates-itl +#baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl +metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink +enabled = 1 +fastestmirror = 1 +metadata_expire = 7d +gpgcheck = 1 +gpgkey = file:///path/to/non-existing/path +''' + repo_conf.write(repo_str.encode()) + repo_conf.flush() + args = argparse.Namespace( + repos=[('enablerepo', 'repo1'), ('disablerepo', 'repo2'), + ('disablerepo', 'repo3'), ('disablerepo', 'repo4'), + ('disablerepo', 'repo5')], + releasever='4.2', + repo_files=[repo_conf.name] + ) + res = qubesadmin.tools.qvm_template.qrexec_payload(args, self.app, + 'qubes-template-fedora-32', + True) + self.assertEqual(res, +'''--enablerepo=repo1 +--disablerepo=repo2 +--disablerepo=repo3 +--disablerepo=repo4 +--disablerepo=repo5 +--refresh +--releasever=4.2 +qubes-template-fedora-32 +--- +''' + repo_str + '\n') + self.assertAllCalled() + + @mock.patch("qubesadmin.tools.qvm_template._encode_key") + def test_262_gpg_key_with_releasever(self, mock_encode_key): + with tempfile.NamedTemporaryFile() as repo_conf: + mock_encode_key.return_value = "" + repo_str = \ +'''[qubes-templates-itl] +name = Qubes Templates repository +#baseurl = https://yum.qubes-os.org/r$releasever/templates-itl +#baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl +metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink +enabled = 1 +fastestmirror = 1 +metadata_expire = 7d +gpgcheck = 1 +gpgkey = file:///etc/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-$releasever-primary +''' + repo_conf.write(repo_str.encode()) + repo_conf.flush() + args = argparse.Namespace( + repos=[('enablerepo', 'repo1'), ('disablerepo', 'repo2'), + ('disablerepo', 'repo3'), ('disablerepo', 'repo4'), + ('disablerepo', 'repo5')], + releasever='4.2', + repo_files=[repo_conf.name] + ) + qubesadmin.tools.qvm_template.qrexec_payload(args, + self.app, + 'qubes-template-fedora-32', + True) + mock_encode_key.assert_called_with( + "file:///etc/qubes/repo-templates/keys/RPM-GPG-KEY-qubes-4.2-primary") + self.assertAllCalled() + + def test_263_invalid_keys_paths_must_be_ignored(self): + with tempfile.NamedTemporaryFile() as repo_conf, \ + tempfile.NamedTemporaryFile() as gpg_key: + repo_str = \ +'''[qubes-templates-itl] +name = Qubes Templates repository +#baseurl = https://yum.qubes-os.org/r$releasever/templates-itl +#baseurl = http://yum.qubesosfasa4zl44o4tws22di6kepyzfeqv3tg4e3ztknltfxqrymdad.onion/r$releasever/templates-itl +metalink = https://yum.qubes-os.org/r$releasever/templates-itl/repodata/repomd.xml.metalink +enabled = 1 +fastestmirror = 1 +metadata_expire = 7d +gpgcheck = 1 +gpgkey = file://{} +'''.format(gpg_key.name) + + repo_conf.write(repo_str.encode()) + repo_conf.flush() + gpg_key.write(b"ABC") + gpg_key.flush() + self.maxDiff = None + args = argparse.Namespace( + repos=[('enablerepo', 'repo1'), ('disablerepo', 'repo2'), + ('disablerepo', 'repo3'), ('disablerepo', 'repo4'), + ('disablerepo', 'repo5')], + releasever='4.2', + repo_files=[repo_conf.name] + ) + res = qubesadmin.tools.qvm_template.qrexec_payload(args, + self.app, + 'qubes-template-fedora-32', + True) + self.assertTrue(os.path.exists(gpg_key.name)) + self.assertEqual(res, +'''--enablerepo=repo1 +--disablerepo=repo2 +--disablerepo=repo3 +--disablerepo=repo4 +--disablerepo=repo5 +--refresh +--releasever=4.2 +qubes-template-fedora-32 +--- +''' + repo_str + '\n') + self.assertAllCalled() + @mock.patch('qubesadmin.tools.qvm_template.repolist') def test_300_repo_files_glob(self, mock_repolist): with tempfile.TemporaryDirectory() as temp_dir: diff --git a/qubesadmin/tools/qvm_template.py b/qubesadmin/tools/qvm_template.py index e645088a..7c87e0d3 100644 --- a/qubesadmin/tools/qvm_template.py +++ b/qubesadmin/tools/qvm_template.py @@ -20,6 +20,7 @@ """Tool for managing VM templates.""" import argparse +import base64 import collections import configparser import datetime @@ -59,6 +60,8 @@ LOCK_FILE = '/var/tmp/qvm-template.lck' DATE_FMT = '%Y-%m-%d %H:%M:%S' TAR_HEADER_BYTES = 512 +WRAPPER_PAYLOAD_BEGIN = "###!Q!BEGIN-QUBES-WRAPPER!Q!###" +WRAPPER_PAYLOAD_END = "###!Q!END-QUBES-WRAPPER!Q!###" UPDATEVM = str('global UpdateVM') @@ -465,6 +468,52 @@ def qrexec_popen( stderr=subprocess.PIPE ) +def _is_file_in_repo_templates_keys_dir(path: str) -> bool: + """Check if the given path is a file located repo-template keys dir""" + return os.path.isfile(path) and path.startswith( + "/etc/qubes/repo-templates/keys/") + +def _encode_key(path): + """Base64-encoe a file to be placed in qvm-template payload""" + if path.startswith("file://"): + path = path[7:] + + if not _is_file_in_repo_templates_keys_dir(path): + return "" + + encoded_key = "#" + path + "\n" + with open(path, "rb") as key: + encoded_key += f"#{base64.b64encode(key.read()).decode('ascii')}\n" + return encoded_key + +def _replace_dnf_vars(path, releasever): + """Replace supported dnf variables in repo""" + for var in ["$releasever", "${releasever}"]: + path = path.replace(var, releasever) + return path + +def _append_keys(payload, releasever): + """Add GPG key and SSL cert/keys to qvm-template payload""" + config = configparser.ConfigParser() + try: + config.read_string(payload) + except RuntimeError: + return "" + + file_list = set() + for section in config.sections(): + for option in ["gpgkey", "sslclientcert", "sslclientkey"]: + if config.has_option(section, option): + file_list.add( + _replace_dnf_vars(config.get(section, option), + releasever)) + + encoded_keys = "".join( + [_encode_key(file_path) for file_path in sorted(file_list)]) + if not encoded_keys: + return "" + + return f"\n{WRAPPER_PAYLOAD_BEGIN}\n{encoded_keys}{WRAPPER_PAYLOAD_END}" def qrexec_payload(args: argparse.Namespace, app: qubesadmin.app.QubesBase, spec: str, refresh: bool) -> str: @@ -502,9 +551,14 @@ def check_newline(string, name): check_newline(spec, 'template name') payload += spec + '\n' payload += '---\n' + + repo_config = "" for path in args.repo_files: with open(path, 'r', encoding='utf-8') as fd: - payload += fd.read() + '\n' + repo_config += fd.read() + '\n' + payload += repo_config + + payload += _append_keys(repo_config, args.releasever) return payload