"""Wrapper functions for git commands.
.. module:: git
:synopsis: Wrapper functions for git CLI commands, such as push and clone.
.. moduleauthor:: Simon Larsén
"""
import asyncio
import os
import subprocess
import collections
import pathlib
import enum
import shutil
from typing import Iterable, List, Any, Callable, Tuple
import more_itertools
import git
import repobee_plug as plug
from _repobee import exception
from _repobee import util
CONCURRENT_TASKS = 20
Push = collections.namedtuple("Push", ("local_path", "repo_url", "branch"))
def _ensure_repo_dir_exists(repo_url: str, cwd: str) -> pathlib.Path:
"""Checks if a dir for the repo url exists, and if it does not, creates it.
Also initializez (or reinitializes, if it alrady exists) as a git repo.
"""
repo_name = util.repo_name(repo_url)
dirpath = pathlib.Path(cwd) / repo_name
if not dirpath.exists():
dirpath.mkdir()
_git_init(dirpath)
return dirpath
def _git_init(dirpath):
captured_run(["git", "init"], cwd=str(dirpath))
async def _pull_clone_async(repo_url: str, branch: str = "", cwd: str = "."):
"""Simulate a clone with a pull to avoid writing remotes (that could
include secure tokens) to disk.
"""
dirpath = _ensure_repo_dir_exists(repo_url, cwd)
pull_command = "git pull {} {}".format(repo_url, branch).strip().split()
proc = await asyncio.create_subprocess_exec(
*pull_command,
cwd=str(dirpath),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
return proc.returncode, stderr
[docs]def captured_run(*args, **kwargs):
"""Run a subprocess and capture the output."""
proc = subprocess.run(
*args, **kwargs, stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
return proc.returncode, proc.stdout, proc.stderr
[docs]def clone_single(repo_url: str, branch: str = "", cwd: str = "."):
"""Clone a git repository with ``git clone``.
This should only be used for temporary cloning, as any secure tokens in the
repo URL are stored in the repository.
Args:
repo_url: HTTPS url to repository on the form
https://<host>/<owner>/<repo>.
branch: The branch to clone.
cwd: Working directory. Defaults to the current directory.
"""
command = [*"git clone --single-branch".split(), repo_url] + (
[branch] if branch else []
)
rc, _, stderr = captured_run(command, cwd=cwd)
if rc != 0:
raise exception.CloneFailedError(
"Failed to clone", rc, stderr, repo_url,
)
async def _clone_async(repo_url: str, branch: str = "", cwd="."):
"""Clone git repositories asynchronously.
Args:
repo_url: A url to clone.
branch: Which branch to clone.
cwd: Working directory.
"""
rc, stderr = await _pull_clone_async(repo_url, branch, cwd)
if rc != 0:
raise exception.CloneFailedError(
"Failed to clone {}".format(repo_url),
returncode=rc,
stderr=stderr,
url=repo_url,
)
else:
plug.log.info("Cloned into {}".format(repo_url))
[docs]class CloneStatus(enum.Enum):
CLONED = enum.auto()
EXISTED = enum.auto()
FAILED = enum.auto()
def clone_student_repos(
repos: List[plug.StudentRepo],
clone_dir: pathlib.Path,
api: plug.PlatformAPI,
) -> Iterable[Tuple[CloneStatus, plug.StudentRepo]]:
assert all(map(lambda r: r.path is not None, repos))
local = [repo for repo in repos if repo.path.exists()]
if local:
local_repo_ids = [f"{repo.team.name}/{repo.name}" for repo in local]
plug.log.warning(
f"Found local repos, skipping: {', '.join(local_repo_ids)}"
)
non_local = [repo for repo in repos if not repo.path.exists()]
plug.log.info(f"Cloning into {non_local}")
url_to_non_local = {api.insert_auth(repo.url): repo for repo in non_local}
failed_urls = clone(url_to_non_local.keys(), cwd=str(clone_dir))
failed_repos = {url_to_non_local[url] for url in failed_urls}
success_repos = {repo for repo in non_local if repo not in failed_repos}
for repo in success_repos:
shutil.copytree(src=clone_dir / repo.name, dst=repo.path)
return (
[(CloneStatus.EXISTED, repo) for repo in local]
+ [(CloneStatus.CLONED, repo) for repo in success_repos]
+ [(CloneStatus.FAILED, repo) for repo in failed_repos]
)
[docs]def clone(repo_urls: Iterable[str], cwd: str = ".") -> List[str]:
"""Clone all repos asynchronously.
Args:
repo_urls: URLs to repos to clone.
cwd: Working directory. Defaults to the current directory.
Returns:
URLs from which cloning failed.
"""
return [
exc.url
for exc in _batch_execution(_clone_async, repo_urls, cwd=cwd)
if isinstance(exc, exception.CloneFailedError)
]
async def _push_async(pt: Push):
"""Asynchronous call to git push, pushing directly to the repo_url and branch.
Args:
pt: A Push namedtuple.
"""
command = ["git", "push", pt.repo_url, pt.branch]
proc = await asyncio.create_subprocess_exec(
*command,
cwd=os.path.abspath(pt.local_path),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
_, stderr = await proc.communicate()
if proc.returncode != 0:
raise exception.PushFailedError(
"Failed to push to {}".format(pt.repo_url),
proc.returncode,
stderr,
pt.repo_url,
)
elif b"Everything up-to-date" in stderr:
plug.log.info("{} is up-to-date".format(pt.repo_url))
else:
plug.log.info("Pushed files to {} {}".format(pt.repo_url, pt.branch))
def _push_no_retry(push_tuples: Iterable[Push]) -> List[str]:
"""Push to all repos defined in push_tuples asynchronously. Amount of
concurrent tasks is limited by CONCURRENT_TASKS.
Pushes once and only once to each repo.
Args:
push_tuples: Push namedtuples defining local and remote repos.
Returns:
urls to which pushes failed with exception.PushFailedError. Other
errors are only logged.
"""
return [
exc.url
for exc in _batch_execution(_push_async, push_tuples)
if isinstance(exc, exception.PushFailedError)
]
[docs]def push(push_tuples: Iterable[Push], tries: int = 3) -> List[str]:
"""Push to all repos defined in push_tuples asynchronously. Amount of
concurrent tasks is limited by CONCURRENT_TASKS. Pushing to repos is tried
a maximum of ``tries`` times (i.e. pushing is _retried_ ``tries - 1``
times.)
Args:
push_tuples: Push namedtuples defining local and remote repos.
tries: Amount of times to try to push (including initial push).
Returns:
urls to which pushes failed with exception.PushFailedError. Other
errors are only logged.
"""
if tries < 1:
raise ValueError("tries must be larger than 0")
# confusing, but failed_pts needs an initial value
failed_pts = list(push_tuples)
for i in range(tries):
plug.log.info("Pushing, attempt {}/{}".format(i + 1, tries))
failed_urls = set(_push_no_retry(failed_pts))
failed_pts = [pt for pt in failed_pts if pt.repo_url in failed_urls]
if not failed_pts:
break
plug.log.warning("{} pushes failed ...".format(len(failed_pts)))
return [pt.repo_url for pt in failed_pts]
def _batch_execution(
batch_func: Callable[[Iterable[Any], Any], List[asyncio.Task]],
arg_list: Iterable[Any],
*batch_func_args,
**batch_func_kwargs,
) -> List[Exception]:
"""Take a batch function (any function whos first argument is an iterable)
and send in send in CONCURRENT_TASKS amount of arguments from the arg_list
until it is exhausted. The batch_func_kwargs are provided on each call.
Args:
batch_func: A function that takes an iterable as a first argument and
returns a list of asyncio.Task objects.
arg_list: A list of objects that are of the same type as the
batch_func's first argument.
batch_func_kwargs: Additional keyword arguments to the batch_func.
Returns:
a list of exceptions raised in the tasks returned by the batch
function.
"""
loop = asyncio.get_event_loop()
return loop.run_until_complete(
_batch_execution_async(
batch_func, arg_list, *batch_func_args, **batch_func_kwargs
)
)
async def _batch_execution_async(
batch_func: Callable[[Iterable[Any], Any], List[asyncio.Task]],
arg_list: Iterable[Any],
*batch_func_args,
**batch_func_kwargs,
) -> List[Exception]:
import tqdm.asyncio
exceptions = []
loop = asyncio.get_event_loop()
for batch, args_chunk in enumerate(
more_itertools.ichunked(arg_list, CONCURRENT_TASKS), start=1
):
tasks = [
loop.create_task(
batch_func(arg, *batch_func_args, **batch_func_kwargs)
)
for arg in args_chunk
]
for coro in tqdm.asyncio.tqdm_asyncio.as_completed(
tasks, desc=f"Progress batch {batch}"
):
try:
await coro
except exception.GitError as exc:
exceptions.append(exc)
for exc in exceptions:
plug.log.error(str(exc))
return exceptions
[docs]def active_branch(repo_path: pathlib.Path) -> str:
"""Get the active branch from the given repo.
Args:
repo_path: Path to a repo.
Returns:
The active branch of the repo.
"""
return git.Repo(repo_path).active_branch.name