# Copyright (C) 2024 The Qt Company Ltd. # Contact: https://www.qt.io/licensing/ # # You may use this file under the terms of the 3-clause BSD license. # See the file LICENSE in qt/qtrepotools for details. # """This script listens for incoming webhook requests of patchset-created type from Gerrit, checks out the patch locally, and checks security headers on it. It then posts a comment for each issue identified to Gerrit with the results. """ import asyncio import base64 import fnmatch import json import logging import os import re import shutil import sys import time import traceback from functools import wraps from logging.handlers import TimedRotatingFileHandler from subprocess import CalledProcessError import aiohttp from aiohttp import web # Configure logging LOG_DIR = "logging" os.makedirs(LOG_DIR, exist_ok=True) LOG_FILE = os.path.join(LOG_DIR, "qtsecuritybot.log") handler = TimedRotatingFileHandler(LOG_FILE, when='midnight', backupCount=90) handler.setFormatter(logging.Formatter( '%(asctime)s - %(levelname)s - %(message)s')) logging.basicConfig(level=logging.INFO, handlers=[handler]) log = logging.getLogger() GERRIT_USERNAME = os.environ.get('QTSECURITYBOT_GERRIT_USERNAME') GERRIT_PASSWORD = os.environ.get('QTSECURITYBOT_GERRIT_PASSWORD') if not GERRIT_USERNAME or not GERRIT_PASSWORD: log.info( 'Please set the QTSECURITYBOT_GERRIT_USERNAME and QTSECURITYBOT_GERRIT_PASSWORD environment variables.') sys.exit(1) # Base64 encode the username and password GERRIT_AUTH = GERRIT_USERNAME + ':' + GERRIT_PASSWORD GERRIT_AUTH = GERRIT_AUTH.encode('utf-8') GERRIT_AUTH = base64.b64encode(GERRIT_AUTH).decode('utf-8') CONFIG = { 'CLONE_TIMEOUT': 240, 'CHECKOUT_TIMEOUT': 60, 'MAX_RETRIES': 10, 'RETRY_DELAY': 5, 'MAX_LOCK_WAIT': 900, # 15 minutes, 'TEAMS_URL': os.environ.get('QTSECURITYBOT_TEAMS_WEBHOOK_URL'), 'TEAMS_ERROR_URL': os.environ.get('QTSECURITYBOT_TEAMS_ERROR_WEBHOOK_URL'), } class Lock: """A simple lock class to prevent multiple events from being handled at the same time. """ def __init__(self): self.locked = False self._lock_time = 0 async def acquire(self): """Acquire the lock with timeout.""" start_time = time.time() while self.locked: if time.time() - start_time > CONFIG['MAX_LOCK_WAIT']: raise TimeoutError("Lock acquisition timed out") await asyncio.sleep(1) self.locked = True self._lock_time = time.time() def release(self): """Release the lock.""" self.locked = False self._lock_time = 0 def log_errors(f): """Decorator to log any unhandled errors in a function.""" @wraps(f) async def wrapper(*args, **kwargs): try: return await f(*args, **kwargs) except Exception as e: log.error("Error in %s: %s\n%s", f.__name__, str(e), traceback.format_exc()) raise return wrapper semaphore = Lock() async def clone_repo(data): """Clone the target repo and check out the branch if needed.""" log.info("Cloning repo %s", data['change']['project']) repo_name = data['repo_name'] if os.path.exists(repo_name): # return if the repo already exists return repo_url = "https://codereview.qt-project.org/" + \ data['change']['project'] + ".git" if not shutil.which('git'): raise FileNotFoundError("Git executable not found in PATH") try: p = await asyncio.create_subprocess_exec( 'git', 'clone', repo_url, repo_name, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(p.communicate(), timeout=CONFIG['CLONE_TIMEOUT']) if p.returncode != 0: raise CalledProcessError( p.returncode, 'git clone', output=stdout, stderr=stderr) log.info("Repository cloned successfully: %s", stdout.decode()) except FileNotFoundError as e: log.error("Error: %s", e) except asyncio.TimeoutError: log.error("Error: Subprocess timed out") except CalledProcessError as e: raise e async def checkout_patch(data): """Check out the patch.""" log.info("%s: Checking out patch", data['change']['number']) # Check out the patch repo_dir = data['repo_name'] # git clean -fdx first to remove any untracked files p = await asyncio.create_subprocess_exec('git', '-C', repo_dir, 'clean', '-fdx') await p.communicate() try: # Fetch the patch first p = await asyncio.create_subprocess_exec( 'git', '-C', repo_dir, 'fetch', 'origin', data['patchSet']['ref'], stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(p.communicate(), timeout=240) if p.returncode != 0: raise CalledProcessError( p.returncode, 'git fetch', output=stdout, stderr=stderr) log.info("Fetch successful: %s", stdout.decode()) p = await asyncio.create_subprocess_exec( 'git', '-C', repo_dir, 'checkout', 'FETCH_HEAD', stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for(p.communicate(), timeout=240) if p.returncode != 0: raise CalledProcessError( p.returncode, 'git checkout', output=stdout, stderr=stderr) log.info("Checkout successful: %s", stdout.decode()) except FileNotFoundError as e: log.error("Error: %s", e) except asyncio.TimeoutError: log.error("Error: Subprocess timed out") except CalledProcessError as e: raise e async def run_security_header_check(data): """Run securityHeader on the patch.""" log.info("%s: Running Security Header checks", data['change']['number']) comments_per_file = {} def _check_header_lines(lines, file_name, is_new_patch): line_number = None # Start line numbering at 1 for _line_number, line in enumerate(lines, 1): if _line_number > 50: break # Only check the first 50 lines # Check for security headers. Match lazily. Enforcement is done via sanity bot. comment_pattern = re.compile( r'^\s*(#|\/\/|\/\*|\*|