smeserver-pungi/build_scripts/post_actions.py
2024-07-05 09:44:42 +10:00

799 lines
24 KiB
Python

#!/usr/bin/env python3
# coding=utf-8
import json
import os
import logging
from collections import defaultdict
from shutil import (
rmtree,
copytree,
copy,
)
import argparse
import requests
from subprocess import check_call
from configparser import ConfigParser
from time import time
from typing import (
List,
Optional,
Dict,
)
from pathlib import Path
from urllib.parse import urljoin
from requests.auth import AuthBase
PUNGI_RESULTS = 'pungi-results'
KICKSTART_REPO = 'BaseOS'
INCLUDED_TO_KICKSTART_REPO = 'AppStream'
PATH_DICTIONARY = defaultdict(lambda: None, **{
'debug': 'debug/{arch}',
'source': 'Source',
'source_packages': 'Source/Packages',
'source_repository': 'Source',
'source_tree': 'Source',
'debug_packages': 'debug/{arch}/Packages',
'debug_repository': 'debug/{arch}',
'debug_tree': 'debug/{arch}',
}) # type: Dict[str ,Optional[str]]
logging.basicConfig(level=logging.INFO)
class BearerAuth(AuthBase):
def __init__(self, token):
self.token = token
def __call__(self, request):
request.headers["Authorization"] = f'Bearer {self.token}'
return request
class Signer:
__session__ = None # type: Optional[requests.Session]
def get_token(self, username, password) -> str:
data = {
'email': username,
'password': password
}
response = requests.post(
url=urljoin(self.endpoint + '/', 'token'),
json=data,
)
response.raise_for_status()
return response.json()['token']
def __init__(self, username, password, endpoint):
self.endpoint = endpoint
self.token = self.get_token(
username=username,
password=password,
)
def sign(
self,
file_path: Path,
keyid: str,
sign_type: str = 'detach-sign',
) -> Path:
auth = BearerAuth(token=self.token)
params = {
'keyid': keyid,
'sign_type': sign_type,
}
files = {
'file': file_path.open('rb'),
}
response = requests.post(
url=urljoin(self.endpoint + '/', 'sign'),
params=params,
files=files,
auth=auth,
)
response.raise_for_status()
if sign_type == 'detach-sign':
file_path = file_path.with_suffix(file_path.suffix + '.asc')
with file_path.open('w') as fd:
fd.write(response.text)
return file_path
@staticmethod
def verify(file_path: Path):
if file_path.suffix == '.asc':
command = f'gpg --verify {file_path} {file_path.with_suffix("")}'
else:
command = f'gpg --verify {file_path}'
check_call(
command,
shell=True,
universal_newlines=True,
)
def move_sources_folder_to_right_place(
latest_path: Path,
repo_name: str,
):
src_of_sources = latest_path.joinpath(repo_name, 'source')
dst_of_sources = latest_path.joinpath(repo_name, PATH_DICTIONARY['source'])
if src_of_sources.exists():
logging.info(
'Move sources to right place for result '
'dir "%s" and repo "%s"',
latest_path,
repo_name,
)
src_of_sources.joinpath('tree').rename(dst_of_sources)
rmtree(src_of_sources)
def move_debug_folder_to_right_place(
latest_path: Path,
repo_name: str,
arch: str,
):
src_of_debug = latest_path.joinpath(repo_name, arch, 'debug')
dst_of_debug = latest_path.joinpath(
repo_name,
PATH_DICTIONARY['debug'].format(arch=arch),
)
if src_of_debug.exists():
logging.info(
'Move a folder with debug rpms to right places for '
'result dir "%s", arch "%s" and repo "%s"',
latest_path,
arch,
repo_name,
)
os.makedirs(dst_of_debug.parent, exist_ok=True)
src_of_debug.joinpath('tree').rename(dst_of_debug)
rmtree(src_of_debug)
def copy_updateinfo_from_platform_repos(
src_repos_path: Path,
latest_path: Path,
repo_name: str,
arch: str,
):
dst_path = latest_path.joinpath(repo_name, arch, 'os')
if not dst_path.exists():
return
for path in src_repos_path.glob(
f'platform-almalinux-[0-9]-{repo_name.lower()}-'
f'{arch}/repodata/*updateinfo*'
):
dst_file = dst_path.joinpath('repodata', path.name)
if dst_file.exists():
return
logging.info('Copy updateinfo.xml for repo "%s"', repo_name)
copy(
path,
dst_file,
)
dst_repodata_path = dst_path.joinpath('repodata')
logging.info('Modify repo "%s" with updateinfo.xml', repo_name)
check_call(
f'modifyrepo_c --mdtype=updateinfo {dst_file} {dst_repodata_path}',
shell=True,
)
return
logging.warning(
'Updateinfo.xml for repo "%s" does not exist',
repo_name,
)
def sign_repomd_xml(
latest_path: Path,
repo_name: str,
arch: str,
pgp_keyid: str,
username: str,
password: str,
endpoint: str,
):
repomd_xml_path_suffix = 'repodata/repomd.xml'
os_repomd_xml_path = latest_path.joinpath(
repo_name,
arch,
'os',
repomd_xml_path_suffix,
)
kickstart_repomd_xml_path = latest_path.joinpath(
repo_name,
arch,
'kickstart',
repomd_xml_path_suffix,
)
source_repomd_xml_path = latest_path.joinpath(
repo_name,
PATH_DICTIONARY['source'],
repomd_xml_path_suffix,
)
debug_repomd_xml_path = latest_path.joinpath(
repo_name,
PATH_DICTIONARY['debug'].format(arch=arch),
repomd_xml_path_suffix
)
logging.info(
'Sign repomd.xml files for "%s" and verify signatures',
repo_name,
)
for repomd_xml_path in (
os_repomd_xml_path,
kickstart_repomd_xml_path,
source_repomd_xml_path,
debug_repomd_xml_path,
):
if not repomd_xml_path.exists():
continue
signer = Signer(
username=username,
password=password,
endpoint=endpoint,
)
file_path = signer.sign(
file_path=repomd_xml_path,
keyid=pgp_keyid,
)
signer.verify(file_path=file_path)
def create_kickstart_folder(
latest_path: Path,
repo_name: str,
arch: str,
):
src_kickstart = latest_path.joinpath(repo_name, arch, 'os')
dst_kickstart = latest_path.joinpath(repo_name, arch, 'kickstart')
if src_kickstart.exists() and not dst_kickstart.exists():
logging.info(
'Make kickstart repo for result dir "%s", '
'repo "%s" and arch "%s"',
latest_path,
repo_name,
arch,
)
copytree(
src_kickstart,
dst_kickstart,
copy_function=os.link,
)
logging.info(
'Copy repodata for a kickstart without using hardlinks'
)
repodata_dst_path = dst_kickstart.joinpath('repodata')
repodata_src_path = src_kickstart.joinpath('repodata')
rmtree(repodata_dst_path)
copytree(
repodata_src_path,
repodata_dst_path,
)
def update_timestamp_in_treeinfo(
tree_info_path: Path,
timestamp: int,
):
replaced_values = {
'general': 'timestamp',
'tree': 'build_timestamp',
}
if not tree_info_path.exists():
return
tree_info_config = ConfigParser()
tree_info_config.read(tree_info_path)
for section, key in replaced_values.items():
tree_info_config.set(
section=section,
option=key,
value=str(timestamp),
)
logging.info(
'Update timestamp "%s" in .treeinfo for "%s"',
timestamp,
tree_info_path,
)
with open(tree_info_path, 'w') as tree_info_fp:
tree_info_config.write(tree_info_fp)
def update_kickstart_treeinfo_file(
tree_info_path: str,
):
replaced_values = {
'packages': None,
'repository': None,
}
tree_info_config = ConfigParser()
tree_info_config.read(tree_info_path)
logging.info(
'Update .treeinfo file "%s": replace path-suffix `os` by `kickstart`',
tree_info_path,
)
section_name = f'variant-{INCLUDED_TO_KICKSTART_REPO}'
for key in replaced_values:
if section_name not in tree_info_config.sections():
continue
replaced_values[key] = tree_info_config.get(
section=section_name,
option=key
).replace('os', 'kickstart')
for key, value in replaced_values.items():
if section_name not in tree_info_config.sections():
continue
tree_info_config.set(
section=section_name,
option=key,
value=value,
)
# because it's hardlink and could be modified
# both files (in dirs `os` and `kickstart`)
os.remove(tree_info_path)
with open(tree_info_path, 'w') as tree_info_fp:
tree_info_config.write(tree_info_fp)
def sign_isos_checksum(
latest_path: Path,
arch: str,
pgp_keyid: str,
username: str,
password: str,
endpoint: str,
):
logging.info('Sign ISOs CHECKSUM and verify signature')
checksum_path = latest_path.joinpath('isos', arch, 'CHECKSUM')
if not checksum_path.exists():
logging.warning('File CHECKSUM is absent')
return
signer = Signer(
username=username,
password=password,
endpoint=endpoint,
)
file_path = signer.sign(
file_path=checksum_path,
keyid=pgp_keyid,
sign_type='clear-sign',
)
signer.verify(file_path=file_path)
def post_processing_images_json_metadata(
latest_path: Path,
arch: str,
):
images_json_path = latest_path.joinpath(
'metadata', arch, 'images.json'
)
logging.info('Post-processing images.json')
if not images_json_path.exists():
logging.warning('images.json is absent')
return
with open(images_json_path, 'r') as images_metadata_fd:
content = json.load(images_metadata_fd)
images = content['payload']['images']
for variant in images:
variant_data = images[variant] # type: Dict[str, List[Dict]]
for arch, images_list in variant_data.items():
variant_data[arch] = [
dict(
image,
**{
'path': str(Path('isos').joinpath(
arch,
Path(image['path']).name,
))
}
) for image in images_list
]
with open(images_json_path, 'w') as images_metadata_fd:
json.dump(
content,
images_metadata_fd,
indent=4,
)
def post_processing_rpms_json_metadata(
latest_path: Path,
arch: str,
):
rpms_json_path = latest_path.joinpath(
'metadata', arch, 'rpms.json'
)
logging.info('Post-processing rpms.json')
if not rpms_json_path.exists():
logging.warning('rpms.json is absent')
return
with open(rpms_json_path, 'r') as rpms_metadata_fd:
content = json.load(rpms_metadata_fd)
rpms = content['payload']['rpms']
for variant in rpms:
variant_data = rpms[variant]
if variant == 'Minimal':
continue
for arch in variant_data:
arch_data = variant_data[arch]
for srpm in arch_data:
srpm_data = arch_data[srpm]
for artifact in srpm_data:
artifact_data = srpm_data[artifact]
path_suffix = PATH_DICTIONARY[
artifact_data['category']
] # type: Optional[str]
if path_suffix is None:
continue
else:
path_suffix = path_suffix.format(arch=arch)
artifact_path = Path(artifact_data['path'])
artifact_data['path'] = str(Path(variant).joinpath(
path_suffix,
artifact_path.parent.name,
artifact_path.name,
))
with open(rpms_json_path, 'w') as rpms_metadata_fd:
if 'Minimal' in content['payload']['rpms']:
del content['payload']['rpms']['Minimal']
json.dump(
content,
rpms_metadata_fd,
indent=4,
)
def post_processing_compose_info_json_metadata(
latest_path: Path,
arch: str,
):
composeinfo_json_path = latest_path.joinpath(
'metadata', arch, 'composeinfo.json'
)
logging.info('Post-processing composeinfo.json')
if not composeinfo_json_path.exists():
logging.warning('composeinfo.json is absent')
return
with open(composeinfo_json_path, 'r') as composeinfo_metadata_fd:
content = json.load(composeinfo_metadata_fd)
variants = content['payload']['variants']
for variant in variants:
variant_paths = variants[variant]['paths']
if variant == 'Minimal':
continue
for path_type in variant_paths:
path_data = variant_paths[path_type] # type: Dict
for arch, path in path_data.items():
path_suffix = PATH_DICTIONARY[
path_type] # type: Optional[str]
if path_suffix is None:
continue
else:
path_suffix = path_suffix.format(arch=arch)
path = Path(variant).joinpath(path_suffix)
path_data[arch] = str(path)
with open(composeinfo_json_path, 'w') as composeinfo_metadata_fd:
if 'Minimal' in content['payload']['variants']:
del content['payload']['variants']['Minimal']
json.dump(
content,
composeinfo_metadata_fd,
indent=4,
)
def move_json_metadata_to_arch_folder(
latest_path: Path,
arch: str,
):
extension = '*.json'
metadata_path = latest_path.joinpath('metadata')
metadata_arch_path = metadata_path.joinpath(arch)
os.makedirs(metadata_arch_path, exist_ok=True)
for json_metadata in Path(metadata_path).glob(extension):
logging.info(
'Copy "%s" to arch directory "%s"',
json_metadata,
metadata_arch_path,
)
json_metadata.rename(metadata_arch_path.joinpath(json_metadata.name))
def move_iso_and_its_artifacts_to_isos_arch_folder(
src_latest_path: Path,
repo_name: str,
arch: str,
exts_of_files: List[str],
dst_latest_path: Optional[Path] = None,
):
src_iso_folder_path = src_latest_path.joinpath(repo_name, arch, 'iso')
isos_arch_folder = (dst_latest_path or src_latest_path).joinpath(
'isos',
arch,
)
os.makedirs(isos_arch_folder, exist_ok=True)
if not src_iso_folder_path.exists():
return
for ext_of_file in exts_of_files:
for src_file_path in src_iso_folder_path.glob(ext_of_file):
dst_file_path = isos_arch_folder.joinpath(src_file_path.name)
if not src_file_path.exists():
continue
if dst_file_path.exists():
continue
logging.info(
'Move iso or iso\'s artifacts from "%s" to "%s"',
src_file_path,
dst_file_path,
)
src_file_path.rename(dst_file_path)
src_checksum_path = src_latest_path.joinpath(
repo_name,
arch,
'iso',
'CHECKSUM',
)
dst_checksum_path = isos_arch_folder.joinpath('CHECKSUM')
if src_checksum_path.exists():
logging.info(
'Write CHEKSUM from "%s" to "%s"',
src_checksum_path,
dst_checksum_path,
)
with open(src_checksum_path, 'r') as src_checksum_file, \
open(dst_checksum_path, 'a+') as dst_checksum_path:
src_checksum_content = src_checksum_file.read()
dst_checksum_path.write(src_checksum_content)
rmtree(src_iso_folder_path)
def rename_latest_dir(
latest_path: Path,
):
old_real_name = Path(os.readlink(latest_path.parent)).name
if not old_real_name.startswith('last_compose_dir'):
return
old_real_name_path = latest_path.parent.parent.joinpath(old_real_name)
new_real_name = f'{int(time())}-{old_real_name}'
new_real_name_path = latest_path.parent.parent.joinpath(new_real_name)
logging.info('New real name path %s', new_real_name_path)
logging.info('Old real name path %s', old_real_name_path)
if not old_real_name_path.exists():
return
logging.info(
'Add the timestamp to name of a '
'real directory with latest result: "%s"',
new_real_name_path,
)
old_real_name_path.rename(new_real_name_path)
os.unlink(latest_path.parent)
os.symlink(new_real_name_path, latest_path.parent)
def create_parser():
parser = argparse.ArgumentParser()
parser.add_argument(
'--env-path',
action='store',
help='A path to folder which will be used '
'for building new distribution',
required=True,
)
parser.add_argument(
'--sign-service-username',
action='store',
help='An username of a sign service',
default=None,
required=False,
)
parser.add_argument(
'--sign-service-password',
action='store',
help='A password of a sign service',
default=None,
required=False,
)
parser.add_argument(
'--sign-service-endpoint',
action='store',
help='An endpoint of a sign service',
default=None,
required=False,
)
parser.add_argument(
'--pgp-sign-keyid',
action='store',
help='PGP sign key ID. Used for signing building artifacts',
required=False,
default=None,
)
parser.add_argument(
'--middle-result-directory',
action='store',
help='A directory with middle result. '
'E.g. a directory contains Minimal iso',
default=None,
required=False,
)
parser.add_argument(
'--arch',
type=str,
help='Architecture of a distribution',
required=True,
)
parser.add_argument(
'--source-repos-folder',
type=str,
help='Path to folder there are stored source repos',
required=True,
)
parser.add_argument(
'--repos',
nargs='+',
type=str,
help='A list of repositories are contained in distribution',
required=True,
)
parser.add_argument(
'--middle-repos',
nargs='+',
type=str,
help='A list of repositories from a middle result '
'which will be used for getting ISOs',
required=False,
default=[],
)
parser.add_argument(
'--not-needed-repos',
nargs='*',
type=str,
help='A list of repositories which are not needed, e.g. Minimal',
required=False,
default=[],
)
return parser
def cli_main():
args = create_parser().parse_args()
pungi_results = Path(args.env_path).joinpath(PUNGI_RESULTS)
latest_result_paths = pungi_results.glob('latest-*')
logging.info(
'We have the following latest results "%s"',
latest_result_paths,
)
extensions_of_files = ['*.iso', '*.manifest']
for latest_path in latest_result_paths:
latest_path = latest_path.joinpath('compose')
build_timestamp = int(time())
for repo in args.repos:
if not latest_path.joinpath(repo).exists():
continue
if repo in args.middle_repos:
not_needed_repo_path = latest_path.joinpath(repo)
if not_needed_repo_path.exists():
rmtree(not_needed_repo_path)
continue
move_sources_folder_to_right_place(
latest_path=latest_path,
repo_name=repo,
)
move_debug_folder_to_right_place(
latest_path=latest_path,
repo_name=repo,
arch=args.arch,
)
copy_updateinfo_from_platform_repos(
src_repos_path=Path(args.source_repos_folder),
latest_path=latest_path,
repo_name=repo,
arch=args.arch,
)
create_kickstart_folder(
latest_path=latest_path,
repo_name=repo,
arch=args.arch,
)
if all(opt is not None for opt in (
args.sign_service_username,
args.sign_service_password,
args.sign_service_endpoint
)):
sign_repomd_xml(
latest_path=latest_path,
repo_name=repo,
arch=args.arch,
username=args.sign_service_username,
password=args.sign_service_password,
endpoint=args.sign_service_endpoint,
pgp_keyid=args.pgp_sign_keyid,
)
move_iso_and_its_artifacts_to_isos_arch_folder(
src_latest_path=latest_path,
repo_name=repo,
arch=args.arch,
exts_of_files=extensions_of_files,
)
repo_path = latest_path.joinpath(repo)
for path in Path(repo_path).rglob('.treeinfo'):
update_timestamp_in_treeinfo(
tree_info_path=path,
timestamp=build_timestamp,
)
if path.parent.name == 'kickstart':
update_kickstart_treeinfo_file(
tree_info_path=str(path),
)
move_json_metadata_to_arch_folder(
latest_path=latest_path,
arch=args.arch,
)
post_processing_compose_info_json_metadata(
latest_path=latest_path,
arch=args.arch,
)
post_processing_rpms_json_metadata(
latest_path=latest_path,
arch=args.arch,
)
post_processing_images_json_metadata(
latest_path=latest_path,
arch=args.arch,
)
if all(opt is not None for opt in (
args.sign_service_username,
args.sign_service_password,
args.sign_service_endpoint
)):
sign_isos_checksum(
latest_path=latest_path,
arch=args.arch,
username=args.sign_service_username,
password=args.sign_service_password,
endpoint=args.sign_service_endpoint,
pgp_keyid=args.pgp_sign_keyid,
)
for repo in args.middle_repos:
if args.middle_result_directory is not None:
move_iso_and_its_artifacts_to_isos_arch_folder(
src_latest_path=Path(
args.middle_result_directory,
).joinpath('compose'),
repo_name=repo,
arch=args.arch,
exts_of_files=extensions_of_files,
dst_latest_path=latest_path,
)
for repo in args.not_needed_repos:
not_needed_repo_path = latest_path.joinpath(repo)
if not_needed_repo_path.exists():
logging.info(
'Remove not needed variant "%s" by path "%s"',
repo,
not_needed_repo_path,
)
rmtree(not_needed_repo_path)
rename_latest_dir(latest_path=latest_path)
if args.middle_result_directory is not None and \
Path(args.middle_result_directory).exists():
rmtree(args.middle_result_directory)
if __name__ == '__main__':
cli_main()