# Copyright (C) 2023 The Qt Company Ltd. # SPDX-License-Identifier: LicenseRef-Qt-Commercial OR LGPL-3.0-only OR GPL-2.0-only OR GPL-3.0-only import datetime import json import logging import tempfile import unittest from typing import List, Optional, cast import common import coordinator import database import git import host import qt import runner import storage class Repository(git.Repository): """ Dummy repository. It is used to test the retry feature. """ def __init__(self, timeouts: int) -> None: super().__init__(directory=".") self.timeouts = timeouts async def reset(self, revision: str, log_directory: str) -> Optional[common.Error]: if self.timeouts > 0: self.timeouts -= 1 return common.CommandTimedOut("Git reset command timed out") else: return None class Database(storage.Environment): """ Dummy implementation of the database. It is used to test the retry feature for data uploads. """ def __init__(self, timeouts: int) -> None: self.timeouts = timeouts async def store( self, results: List[qt.TestFileResult], issues: List[qt.TestFileIssue], work_item: coordinator.WorkItem, host_info: host.Info, logger: Optional[logging.Logger], ) -> Optional[common.Error]: if self.timeouts > 0: self.timeouts -= 1 return database.UploadTimedOut("Data upload timed out") else: return None class Test(unittest.IsolatedAsyncioTestCase): async def test_reset_git_repository(self) -> None: """ If the Git reset command times out, we try again. """ async def reset(timeouts: int, retries: int) -> Optional[common.Error]: repository = Repository(timeouts) return await runner.reset_git_repository( git_repository=repository, revision="abcd", log_directory=".", retries=retries, retry_delay=0, logger=None, ) self.assertIsNone(await reset(timeouts=0, retries=0)) self.assertIsInstance(await reset(timeouts=1, retries=0), common.CommandTimedOut) self.assertIsNone(await reset(timeouts=1, retries=1)) self.assertIsInstance(await reset(timeouts=2, retries=1), common.CommandTimedOut) self.assertIsNone(await reset(timeouts=2, retries=2)) async def test_store_results(self) -> None: """ If the data upload times out, we try again. """ async def upload(timeouts: int, retries: int) -> Optional[common.Error]: database = Database(timeouts) return await runner.store_results( storage_environment=database, results=[], issues=[], work_item=coordinator.WorkItem( integration_id=1, integration_url=None, integration_timestamp=datetime.datetime(1970, 1, 1), integration_data=[], branch="main", revision="abcd", ), host_info=host.Info( name="host", os="Ubuntu", cpu="amd", ), retries=retries, retry_delay=0, logger=None, ) self.assertIsNone(await upload(timeouts=0, retries=0)) self.assertIsInstance(await upload(timeouts=1, retries=0), database.UploadTimedOut) self.assertIsNone(await upload(timeouts=1, retries=1)) self.assertIsInstance(await upload(timeouts=2, retries=1), database.UploadTimedOut) self.assertIsNone(await upload(timeouts=2, retries=2)) class TestArguments(unittest.TestCase): def test_parse(self) -> None: """ We can parse the required arguments. """ arguments = runner.Arguments.parse(["--configuration", "file", "--output", "directory"]) self.assertEqual(arguments.configuration_file, "file") self.assertEqual(arguments.output_directory, "directory") class TestConfiguration(unittest.TestCase): def test_load(self) -> None: """ We can detect errors in a configuration file. """ with tempfile.NamedTemporaryFile(mode="w") as f: json.dump( { # Empty fields are errors. "coordinator_info": {"url": "https://coordinator.com/", "secret": ""}, "qtbase_git_remote": {"url": "ssh://codereview.qt-project.org/qt/qtbase"}, }, f, ) f.seek(0) configuration = runner.Configuration.load(file=f.name, skip_upload=True) self.assertIsInstance(configuration, common.Error) error = cast(common.Error, configuration) self.assertEqual(error.message.splitlines()[0], "Configuration file contains errors:")