Source code for repobee.git

"""Wrapper functions for git commands.

.. module:: git
    :synopsis: Wrapper functions for git CLI commands, such as push and clone.

.. moduleauthor:: Simon Larsén
"""
import asyncio
import os
import subprocess
import collections
import daiquiri
from typing import Iterable, List, Any, Callable

from repobee import util
from repobee import exception

CONCURRENT_TASKS = 20

LOGGER = daiquiri.getLogger(__file__)

Push = collections.namedtuple("Push", ("local_path", "repo_url", "branch"))


def _insert_token(url: str, token: str) -> str:
    """Insert a token into the url as described here:
        https://blog.github.com/2012-09-21-easier-builds-and-deployments-using-git-over-https-and-oauth/

    Args:
        url: A url to a git repo.
        token: A GitHub OAUTH token, with or without username (e.g. on the form
        `<token>` or `<username>:<token>`)

    Returns:
        The provided url with the token inserted
    """
    if not token:
        raise ValueError("invalid token, empty token not allowed")
    return url.replace("https://", "https://{}@".format(token))


def _insert_user_and_token(https_url: str, user: str, token: str) -> str:
    """Insert a username and an oauth token into the https url as described here:
        https://blog.github.com/2012-09-21-easier-builds-and-deployments-using-git-over-https-and-oauth/

    Args:
        https_url: A url on the form `https://host.topdomain`
        user: A GitHub username.
        token: A GitHub OAUTH token.

    Returns:
        The provided url with the username and token inserted
    """
    return _insert_token(https_url, "{}:{}".format(user, token))


[docs]def captured_run(*args, **kwargs): """Run a subprocess and capture the output.""" proc = subprocess.run( *args, **kwargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) return proc.returncode, proc.stdout, proc.stderr
[docs]def clone_single( repo_url: str, token: str, single_branch: bool = True, branch: str = None, cwd: str = ".", ): """Clone a git repository. Args: repo_url: HTTPS url to repository on the form https://<host>/<owner>/<repo>. single_branch: Whether or not to clone a single branch. branch: The branch to clone. cwd: Working directory. Defaults to the current directory. token: A GitHub OAUTH token. """ util.validate_types( repo_url=(repo_url, str), single_branch=(single_branch, bool), branch=(branch, (str, type(None))), cwd=(cwd, (str)), ) if isinstance(branch, str) and not branch: raise ValueError("branch must not be empty") options = [] if single_branch: options.append("--single-branch") if branch is not None: options += ["-b", branch] clone_command = ["git", "clone", _insert_token(repo_url, token), *options] rc, _, stderr = captured_run(clone_command, cwd=cwd) if rc != 0: raise exception.CloneFailedError( "Failed to clone", rc, stderr, repo_url )
async def _clone_async( repo_url: str, token: str, single_branch: bool = True, branch: str = None, cwd=".", ): """Clone git repositories asynchronously. Args: repo_url: A url to clone. single_branch: Whether to clone a single branch or not. branch: Which branch to clone. cwd: Working directory. token: A GitHub OAUTH token. """ command = ["git", "clone", _insert_token(repo_url, token)] if single_branch: command.append("--single-branch") proc = await asyncio.create_subprocess_exec( *command, cwd=cwd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: raise exception.CloneFailedError( "Failed to clone {}".format(repo_url), returncode=proc.returncode, stderr=stderr, url=repo_url, ) else: LOGGER.info("Cloned into {}".format(repo_url))
[docs]def clone( repo_urls: Iterable[str], token: str, single_branch: bool = True, cwd: str = ".", ) -> List[Exception]: """Clone all repos asynchronously. Args: repo_urls: URLs to repos to clone. single_branch: Whether or not to clone only the default branch. cwd: Working directory. Defaults to the current directory. token: A GitHub OAUTH token. Returns: URLs from which cloning failed. """ # TODO valdate repo_urls util.validate_types(single_branch=(single_branch, bool), cwd=(cwd, str)) util.validate_non_empty(repo_urls=repo_urls, single_branch=single_branch) return [ exc.url for exc in _batch_execution( _clone_async, repo_urls, token, single_branch, cwd=cwd ) if isinstance(exc, exception.CloneFailedError) ]
async def _push_async(pt: Push, user: str, token: str): """Asynchronous call to git push, pushing directly to the repo_url and branch. Args: pt: A Push namedtuple. user: The username to use in the push. token: A GitHub OAUTH token. """ util.validate_types(push_tuple=(pt, Push), user=(user, str)) util.validate_non_empty(user=user) command = [ "git", "push", _insert_user_and_token(pt.repo_url, user, token), pt.branch, ] proc = await asyncio.create_subprocess_exec( *command, cwd=os.path.abspath(pt.local_path), stdout=subprocess.PIPE, stderr=subprocess.PIPE ) _, stderr = await proc.communicate() if proc.returncode != 0: raise exception.PushFailedError( "Failed to push to {}".format(pt.repo_url), proc.returncode, stderr, pt.repo_url, ) elif b"Everything up-to-date" in stderr: LOGGER.info("{} is up-to-date".format(pt.repo_url)) else: LOGGER.info("Pushed files to {} {}".format(pt.repo_url, pt.branch)) def _push_no_retry( push_tuples: Iterable[Push], user: str, token: str ) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushes once and only once to each repo. Args: push_tuples: Push namedtuples defining local and remote repos. user: The username to put in the push. token: A GitHub OAUTH token. Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ # TODO valdate push_tuples util.validate_types(user=(user, str)) util.validate_non_empty(push_tuples=push_tuples, user=user) return [ exc.url for exc in _batch_execution(_push_async, push_tuples, user, token) if isinstance(exc, exception.PushFailedError) ]
[docs]def push( push_tuples: Iterable[Push], user: str, token: str, tries: int = 3 ) -> List[str]: """Push to all repos defined in push_tuples asynchronously. Amount of concurrent tasks is limited by CONCURRENT_TASKS. Pushing to repos is tried a maximum of ``tries`` times (i.e. pushing is _retried_ ``tries - 1`` times.) Args: push_tuples: Push namedtuples defining local and remote repos. user: The username to put in the push. token: A GitHub OAUTH token. tries: Amount of times to try to push (including initial push). Returns: urls to which pushes failed with exception.PushFailedError. Other errors are only logged. """ util.validate_types(tries=(tries, int)) if tries < 1: raise ValueError("tries must be larger than 0") # confusing, but failed_pts needs an initial value failed_pts = list(push_tuples) for i in range(tries): LOGGER.info("pushing, attempt {}/{}".format(i + 1, tries)) failed_urls = set(_push_no_retry(failed_pts, user, token)) failed_pts = [pt for pt in push_tuples if pt.repo_url in failed_urls] if not failed_pts: break LOGGER.warning("{} pushes failed ...".format(len(failed_pts))) return [pt.repo_url for pt in failed_pts]
def _batch_execution( batch_func: Callable[[Iterable[Any], Any], List[asyncio.Task]], arg_list: List[Any], *batch_func_args, **batch_func_kwargs ) -> List[Exception]: """Take a batch function (any function whos first argument is an iterable) and send in send in CONCURRENT_TASKS amount of arguments from the arg_list until it is exhausted. The batch_func_kwargs are provided on each call. Args: batch_func: A function that takes an iterable as a first argument and returns a list of asyncio.Task objects. arg_list: A list of objects that are of the same type as the batch_func's first argument. batch_func_kwargs: Additional keyword arguments to the batch_func. Returns: a list of exceptions raised in the tasks returned by the batch function. """ completed_tasks = [] loop = asyncio.get_event_loop() for i in range(0, len(arg_list), CONCURRENT_TASKS): tasks = [ loop.create_task( batch_func(list_arg, *batch_func_args, **batch_func_kwargs) ) for list_arg in arg_list[i : i + CONCURRENT_TASKS] ] loop.run_until_complete(asyncio.wait(tasks)) completed_tasks += tasks exceptions = [ task.exception() for task in completed_tasks if task.exception() ] for exc in exceptions: LOGGER.error(str(exc)) return exceptions