Source code for cookieninja.vcs

"""Helper functions for working with version control systems."""
import logging
import os
import subprocess  # nosec
from pathlib import Path
from shutil import which
from typing import Optional

from .exceptions import (
    RepositoryCloneFailed,
    RepositoryNotFound,
    UnknownRepoType,
    VCSNotInstalled,
)
from .utils import make_sure_path_exists, prompt_and_delete

logger = logging.getLogger(__name__)


BRANCH_ERRORS = [
    "error: pathspec",
    "unknown revision",
]


[docs]def identify_repo(repo_url): """Determine if `repo_url` should be treated as a URL to a git or hg repo. Repos can be identified by prepending "hg+" or "git+" to the repo URL. :param repo_url: Repo URL of unknown type. :returns: ('git', repo_url), ('hg', repo_url), or None. """ repo_url_values = repo_url.split("+") if len(repo_url_values) == 2: repo_type = repo_url_values[0] if repo_type in ["git", "hg"]: return repo_type, repo_url_values[1] else: raise UnknownRepoType else: if "git" in repo_url: return "git", repo_url elif "codecommit" in repo_url: return "git-remote-codecommit", repo_url elif "bitbucket" in repo_url: return "hg", repo_url else: raise UnknownRepoType
[docs]def is_vcs_installed(repo_type): """ Check if the version control system for a repo type is installed. :param repo_type: """ return bool(which(repo_type))
[docs]def clone( repo_url: str, checkout: Optional[str] = None, recurse_submodules: bool = False, clone_to_dir: "os.PathLike[str]" = ".", no_input: bool = False, ): """Clone a repo to the current directory. :param repo_url: Repo URL of unknown type. :param checkout: The branch, tag or commit ID to checkout after clone. :param recurse_submodules: Clone submodules if set to `True` :param clone_to_dir: The directory to clone to. Defaults to the current directory. :param no_input: Do not prompt for user input and eventually force a refresh of cached resources. :returns: str with path to the new directory of the repository. """ # Ensure that clone_to_dir exists clone_to_dir = Path(clone_to_dir).expanduser() make_sure_path_exists(clone_to_dir) # identify the repo_type repo_type, repo_url = identify_repo(repo_url) # check that the appropriate VCS for the repo_type is installed if not is_vcs_installed(repo_type): msg = f"'{repo_type}' is not installed." raise VCSNotInstalled(msg) repo_url = repo_url.rstrip("/") repo_name = os.path.split(repo_url)[1] repo_args = { "git": ["git", "clone"], "git-remote-codecommit": ["git", "clone"], "hg": ["hg", "clone"], } clone_command = repo_args[repo_type] # avoid warning if defined in if-elif if "git" == repo_type: # git repository. repo_name = repo_name.split(":")[-1].rsplit(".git")[0] repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) if recurse_submodules: clone_command.append("--recurse-submodules") elif "git-remote-codecommit" == repo_type: repo_type = "git" repo_name = repo_name.split("@")[-1] repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) if recurse_submodules: clone_command.append("--recurse-submodules") else: # hg repository. repo_dir = os.path.normpath(os.path.join(clone_to_dir, repo_name)) clone_command.append(repo_url) logger.debug(f"repo_dir is {repo_dir}") if os.path.isdir(repo_dir): clone = prompt_and_delete(repo_dir, no_input=no_input) else: clone = True if clone: try: subprocess.check_output( # nosec clone_command, cwd=clone_to_dir, stderr=subprocess.STDOUT, ) if checkout is not None: checkout_params = [checkout] # Avoid Mercurial "--config" and "--debugger" injection vulnerability if repo_type == "hg": checkout_params.insert(0, "--") subprocess.check_output( # nosec [repo_type, "checkout", *checkout_params], cwd=repo_dir, stderr=subprocess.STDOUT, ) except subprocess.CalledProcessError as clone_error: output = clone_error.output.decode("utf-8") if "not found" in output.lower(): raise RepositoryNotFound( f"The repository {repo_url} could not be found, " "have you made a typo?" ) from clone_error if any(error in output for error in BRANCH_ERRORS): raise RepositoryCloneFailed( f"The {checkout} branch of repository " f"{repo_url} could not found, have you made a typo?" ) from clone_error logger.error("git clone failed with error: %s", output) raise return repo_dir