Source code for repobee.git

"""Wrapper functions for git commands.

.. module:: git
    :synopsis: Wrapper functions for git CLI commands, such as push and clone.

.. moduleauthor:: Simon Larsén
"""
import asyncio
import os
import subprocess
import collections
import pathlib
import daiquiri
from typing import Iterable, List, Any, Callable

from repobee import exception
from repobee import util

CONCURRENT_TASKS = 20

LOGGER = daiquiri.getLogger(__file__)

Push = collections.namedtuple("Push", ("local_path", "repo_url", "branch"))


def _ensure_repo_dir_exists(repo_url: str, cwd: str) -> pathlib.Path:
    """Checks if a dir for the repo url exists, and if it does not, creates it.
    Also initializez (or reinitializes, if it alrady exists) as a git repo.
    """
    repo_name = util.repo_name(repo_url)
    dirpath = pathlib.Path(cwd) / repo_name
    if not dirpath.exists():
        dirpath.mkdir()
    _git_init(dirpath)
    return dirpath


def _git_init(dirpath):
    captured_run(["git", "init"], cwd=str(dirpath))


def _pull_clone(repo_url: str, branch: str = "", cwd: str = "."):
    """Simulate a clone with a pull to avoid writing remotes (that could
    include secure tokens) to disk.
    """
    dirpath = _ensure_repo_dir_exists(repo_url, cwd)

    pull_command = "git pull {} {}".format(repo_url, branch).strip().split()

    rc, _, stderr = captured_run(pull_command, cwd=str(dirpath))
    return rc, stderr


async def _pull_clone_async(repo_url: str, branch: str = "", cwd: str = "."):
    """Same as _pull_clone, but asynchronously."""
    dirpath = _ensure_repo_dir_exists(repo_url, cwd)

    pull_command = "git pull {} {}".format(repo_url, branch).strip().split()

    proc = await asyncio.create_subprocess_exec(
        *pull_command,
        cwd=str(dirpath),
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE
    )
    _, stderr = await proc.communicate()
    return proc.returncode, stderr


[docs]def captured_run(*args, **kwargs): """Run a subprocess and capture the output.""" proc = subprocess.run( *args, **kwargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return proc.returncode, proc.stdout, proc.stderr
[docs]def clone_single(repo_url: str, branch: str = "", cwd: str = "."): """Clone a git repository. Args: repo_url: HTTPS url to repository on the form https://<host>/<owner>/<repo>. branch: The branch to clone. cwd: Working directory. Defaults to the current directory. """ rc, stderr = _pull_clone(repo_url, branch, cwd) if rc != 0: raise exception.CloneFailedError( "Failed to clone", rc, stderr, repo_url )
async def _clone_async(repo_url: str, branch: str = "", cwd="."): """Clone git repositories asynchronously. Args: repo_url: A url to clone. branch: Which branch to clone. cwd: Working directory. """ rc, stderr = await _pull_clone_async(repo_url, branch, cwd) if rc != 0: raise exception.CloneFailedError( "Failed to clone {}".format(repo_url), returncode=rc, stderr=stderr, url=repo_url, ) else: LOGGER.info("Cloned into {}".format(repo_url))
[docs]def clone(repo_urls: Iterable[str], cwd: str = ".") -> List[Exception]: """Clone all repos asynchronously. Args: repo_urls: URLs to repos to clone. cwd: Working directory. Defaults to the current directory. Returns: URLs from which cloning failed. """ # TODO valdate repo_urls return [ exc.url for exc in _batch_execution(_clone_async, repo_urls, cwd=cwd) if isinstance(exc, exception.CloneFailedError) ]
async def _push_async(pt: Push): """Asynchronous call to git push, pushing directly to the repo_url and branch. Args: pt: A Push namedtuple. """ command = ["git", "push", pt.repo_url, pt.branch] proc = await asyncio.create_subprocess_exec( *command, cwd=os.path.abspath(pt.local_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: raise exception.PushFailedError( "Failed to push to {}".format(pt.repo_url), proc.returncode, stderr, pt.repo_url, ) elif b"Everything up-to-date" in stderr: LOGGER.info("{} is up-to-date".format(pt.repo_url)) else: LOGGER.info("Pushed files to {} {}".format(pt.repo_url, pt.branch)) def _push_no_retry(push_tuples: Iterable[Push]) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushes once and only once to each repo. Args: push_tuples: Push namedtuples defining local and remote repos. Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ return [ exc.url for exc in _batch_execution(_push_async, push_tuples) if isinstance(exc, exception.PushFailedError) ]
[docs]def push(push_tuples: Iterable[Push], tries: int = 3) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushing to repos is tried a maximum of ``tries`` times (i.e. pushing is _retried_ ``tries - 1`` times.) Args: push_tuples: Push namedtuples defining local and remote repos. tries: Amount of times to try to push (including initial push). Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ if tries < 1: raise ValueError("tries must be larger than 0") # confusing, but failed_pts needs an initial value failed_pts = list(push_tuples) for i in range(tries): LOGGER.info("pushing, attempt {}/{}".format(i + 1, tries)) failed_urls = set(_push_no_retry(failed_pts)) failed_pts = [pt for pt in push_tuples if pt.repo_url in failed_urls] if not failed_pts: break LOGGER.warning("{} pushes failed ...".format(len(failed_pts))) return [pt.repo_url for pt in failed_pts]
def _batch_execution( batch_func: Callable[[Iterable[Any], Any], List[asyncio.Task]], arg_list: List[Any], *batch_func_args, **batch_func_kwargs ) -> List[Exception]: """Take a batch function (any function whos first argument is an iterable) and send in send in CONCURRENT_TASKS amount of arguments from the arg_list until it is exhausted. The batch_func_kwargs are provided on each call. Args: batch_func: A function that takes an iterable as a first argument and returns a list of asyncio.Task objects. arg_list: A list of objects that are of the same type as the batch_func's first argument. batch_func_kwargs: Additional keyword arguments to the batch_func. Returns: a list of exceptions raised in the tasks returned by the batch function. """ completed_tasks = [] loop = asyncio.get_event_loop() for i in range(0, len(arg_list), CONCURRENT_TASKS): tasks = [ loop.create_task( batch_func(list_arg, *batch_func_args, **batch_func_kwargs) ) for list_arg in arg_list[i : i + CONCURRENT_TASKS] ] loop.run_until_complete(asyncio.wait(tasks)) completed_tasks += tasks exceptions = [ task.exception() for task in completed_tasks if task.exception() ] for exc in exceptions: LOGGER.error(str(exc)) return exceptions