diff --git a/tools/ci/python_packages/gitlab_api.py b/tools/ci/python_packages/gitlab_api.py index 6ac0ffae1f8..cdce2ed2467 100644 --- a/tools/ci/python_packages/gitlab_api.py +++ b/tools/ci/python_packages/gitlab_api.py @@ -1,6 +1,7 @@ -# SPDX-FileCopyrightText: 2022-2024 Espressif Systems (Shanghai) CO LTD +# SPDX-FileCopyrightText: 2022-2026 Espressif Systems (Shanghai) CO LTD # SPDX-License-Identifier: Apache-2.0 import argparse +import copy import logging import os import re @@ -9,13 +10,9 @@ import tarfile import tempfile import time import zipfile +from collections.abc import Callable from functools import wraps from typing import Any -from typing import Callable -from typing import Dict -from typing import List -from typing import Optional -from typing import Union import gitlab @@ -36,7 +33,7 @@ def retry(func: TR) -> TR: while True: try: res = func(self, *args, **kwargs) - except (IOError, EOFError, gitlab.exceptions.GitlabError) as e: + except (OSError, EOFError, gitlab.exceptions.GitlabError) as e: if isinstance(e, gitlab.exceptions.GitlabError): if e.response_code == 500: # retry on this error @@ -52,9 +49,11 @@ def retry(func: TR) -> TR: raise e # get out of the loop else: logging.warning( - 'Network failure in {}, retrying ({})'.format(getattr(func, '__name__', '(unknown callable)'), - retried)) - time.sleep(2 ** retried) # wait a bit more after each retry + 'Network failure in {}, retrying ({})'.format( + getattr(func, '__name__', '(unknown callable)'), retried + ) + ) + time.sleep(2**retried) # wait a bit more after each retry continue else: break @@ -63,26 +62,26 @@ def retry(func: TR) -> TR: return wrapper -class Gitlab(object): +class Gitlab: JOB_NAME_PATTERN = re.compile(r'(\w+)(\s+(\d+)/(\d+))?') DOWNLOAD_ERROR_MAX_RETRIES = 3 DEFAULT_BUILD_CHILD_PIPELINE_NAME = 'Build Child Pipeline' - def __init__(self, project_id: Union[int, str, None] = None): + def __init__(self, project_id: int | str | None = None): config_data_from_env = os.getenv('PYTHON_GITLAB_CONFIG') if config_data_from_env: # prefer to load config from env variable with tempfile.NamedTemporaryFile('w', delete=False) as temp_file: temp_file.write(config_data_from_env) - config_files = [temp_file.name] # type: Optional[List[str]] + config_files = [temp_file.name] else: # otherwise try to use config file at local filesystem config_files = None self._init_gitlab_inst(project_id, config_files) @retry - def _init_gitlab_inst(self, project_id: Optional[int], config_files: Optional[List[str]]) -> None: + def _init_gitlab_inst(self, project_id: int | None, config_files: list[str] | None) -> None: gitlab_id = os.getenv('LOCAL_GITLAB_HTTPS_HOST') # if None, will use the default gitlab server self.gitlab_inst = gitlab.Gitlab.from_config(gitlab_id=gitlab_id, config_files=config_files) @@ -109,7 +108,7 @@ class Gitlab(object): self.project = None @retry - def get_project_id(self, name: str, namespace: Optional[str] = None) -> int: + def get_project_id(self, name: str, namespace: str | None = None) -> int: """ search project ID by name @@ -152,7 +151,7 @@ class Gitlab(object): archive_file.extractall(destination) @retry - def download_artifact(self, job_id: int, artifact_path: List[str], destination: Optional[str] = None) -> List[bytes]: + def download_artifact(self, job_id: int, artifact_path: list[str], destination: str | None = None) -> list[bytes]: """ download specific path of job artifacts and extract to destination. @@ -169,7 +168,7 @@ class Gitlab(object): try: data = job.artifact(a_path) # type: bytes except gitlab.GitlabGetError as e: - logging.error("Failed to download '{}' from job {}".format(a_path, job_id)) + logging.error(f"Failed to download '{a_path}' from job {job_id}") raise e raw_data_list.append(data) if destination: @@ -185,7 +184,7 @@ class Gitlab(object): return raw_data_list @retry - def find_job_id(self, job_name: str, pipeline_id: Optional[str] = None, job_status: str = 'success') -> List[Dict]: + def find_job_id(self, job_name: str, pipeline_id: str | None = None, job_status: str = 'success') -> list[dict]: """ Get Job ID from job name of specific pipeline @@ -208,8 +207,9 @@ class Gitlab(object): return job_id_list @retry - def download_archive(self, ref: str, destination: str, project_id: Optional[int] = None, - cache_dir: Optional[str] = None) -> str: + def download_archive( + self, ref: str, destination: str, project_id: int | None = None, cache_dir: str | None = None + ) -> str: """ Download archive of certain commit of a repository and extract to destination path @@ -233,10 +233,11 @@ class Gitlab(object): try: project.repository_archive(sha=ref, streamed=True, action=fw.write) except gitlab.GitlabGetError as e: - logging.error('Failed to archive from project {}'.format(project_id)) + logging.error(f'Failed to archive from project {project_id}') raise e - logging.info('Downloaded archive size: {:.03f}MB'.format( - float(os.path.getsize(local_archive_file)) / (1024 * 1024))) + logging.info( + f'Downloaded archive size: {float(os.path.getsize(local_archive_file)) / (1024 * 1024):.03f}MB' + ) return self.decompress_archive(local_archive_file, destination) @@ -245,25 +246,39 @@ class Gitlab(object): try: project.repository_archive(sha=ref, streamed=True, action=temp_file.write) except gitlab.GitlabGetError as e: - logging.error('Failed to archive from project {}'.format(project_id)) + logging.error(f'Failed to archive from project {project_id}') raise e - logging.info('Downloaded archive size: {:.03f}MB'.format(float(os.path.getsize(temp_file.name)) / (1024 * 1024))) + logging.info(f'Downloaded archive size: {float(os.path.getsize(temp_file.name)) / (1024 * 1024):.03f}MB') return self.decompress_archive(temp_file.name, destination) + @staticmethod + def _to_win32_long_path(path: str) -> str: + normalized_path = os.path.normpath(os.path.abspath(path)) + if normalized_path.startswith('\\\\?\\'): + return normalized_path + if normalized_path.startswith('\\\\'): + return '\\\\?\\UNC\\' + normalized_path[2:] + return '\\\\?\\' + normalized_path + @staticmethod def decompress_archive(path: str, destination: str) -> str: full_destination = os.path.abspath(destination) - # By default max path length is set to 260 characters - # Prefix `\\?\` extends it to 32,767 characters - if sys.platform == 'win32': - full_destination = '\\\\?\\' + full_destination try: with tarfile.open(path, 'r') as archive_file: - root_name = archive_file.getnames()[0] - archive_file.extractall(full_destination) + members = archive_file.getmembers() + root_name = members[0].name + if sys.platform == 'win32': + # tarfile keeps archive member names in POSIX form. Normalize them before + # combining with a long-path-prefixed destination to avoid invalid mixed separators. + full_destination = Gitlab._to_win32_long_path(full_destination) + members = [copy.copy(member) for member in members] + for member in members: + member.name = member.name.replace('/', '\\') + member.linkname = member.linkname.replace('/', '\\') + archive_file.extractall(full_destination, members=members) except tarfile.TarError as e: logging.error(f'Error while decompressing archive {path}') raise e @@ -280,7 +295,7 @@ class Gitlab(object): job = self.project.jobs.get(job_id) return ','.join(job.tag_list) - def get_downstream_pipeline_ids(self, main_pipeline_id: int) -> List[int]: + def get_downstream_pipeline_ids(self, main_pipeline_id: int) -> list[int]: """ Retrieve the IDs of all downstream child pipelines for a given main pipeline. @@ -313,7 +328,7 @@ class Gitlab(object): return [pid for pid in child_pipeline_ids if pid is not None] - def retry_failed_jobs(self, pipeline_id: int, retry_allowed_failures: bool = False) -> List[int]: + def retry_failed_jobs(self, pipeline_id: int, retry_allowed_failures: bool = False) -> list[int]: """ Retry failed jobs for a specific pipeline. Optionally include jobs marked as 'allowed failures'. @@ -370,10 +385,10 @@ def main() -> None: gitlab_inst.download_archive(args.ref, args.destination) elif args.action == 'get_project_id': ret = gitlab_inst.get_project_id(args.project_name) - print('project id: {}'.format(ret)) + print(f'project id: {ret}') elif args.action == 'retry_failed_jobs': res = gitlab_inst.retry_failed_jobs(args.pipeline_id, args.retry_allowed_failures) - print('jobs retried successfully: {}'.format(res)) + print(f'jobs retried successfully: {res}') elif args.action == 'get_job_tags': ret = gitlab_inst.get_job_tags(args.job_id) print(ret)