Source code for multigit.gitrepo

# -*- coding: utf-8 -*-

# Import stuff
import sys
from git import Repo, exc as git_exception


[docs] class Gitrepo(object): '''Processes a single git repo entity as defined by multigit.''' def __init__(self): pass
[docs] def status(self, repoconf): ''' Finds the status of the repo configuration that gets as param. :param repoconf: the configuration dictionary of the git repository to be processed. :return: returns the same repoconf dictionary provided as parameter with a new 'status' key populated and, optionally a 'extra_info' key. ''' #print(str(repoconf)) repostatus = repoconf repostatus['status'] = 'UNPROCESSED' if not 'gitref_type' in repostatus: repostatus['gitref_type'] = None # Let's check if it's at least cloned try: repo = Repo(repoconf['path']) except git_exception.NoSuchPathError as e: # repo not yet cloned repostatus['status'] = 'NOT_CLONED' except git_exception.InvalidGitRepositoryError as e: # directory exists but, for whatever reason, is not a valid repo repostatus['status'] = 'ERROR' my_error_msg = "directory '" + repostatus['path'] + "' exists, but it's not a valid git sandbox." my_error_msg += "\n\tPlease, review its contents." repostatus['extra_info'] = my_error_msg # if still unprocessed it's because the repo is there # Check remotes if repostatus['status'] == 'UNPROCESSED': if repostatus['repo'] != repo.remotes.origin.url: repostatus['status'] = 'WRONG_REMOTE' my_error_msg = "Requested remote is '" + repostatus['repo'] + "' " my_error_msg += "but current origin is '" + repo.remotes.origin.url + "'." repostatus['extra_info'] = my_error_msg # if still unprocessed it's because the repo is there and the remote is right # Let's try to update its status if repostatus['status'] == 'UNPROCESSED': try: repo.git.fetch(prune=True) except git_exception.GitCommandError as e: if e.status == 128: repostatus['status'] = 'ERROR' repostatus['extra_info'] = e.stderr.replace('stderr: ','').strip('\n').strip() else: raise if repostatus['status'] == 'UNPROCESSED': try: local_commit = repo.commit().hexsha except ValueError as e: if ( "Reference at 'refs/heads/master' does not exist" in str(e) or "Reference at 'refs/heads/main' does not exist" in str(e) ): # Remote repo exists, but it's still "un-initialized" (lacks its first commit) repostatus['status'] = 'EMPTY' else: raise e # if still unprocessed, it's a good repo. # can it be updated? if repostatus['status'] == 'UNPROCESSED': if repo.is_dirty(): repostatus['status'] = 'DIRTY' # is the proper gitref already checked out? if repostatus['status'] == 'UNPROCESSED': # print(str(repo)) if ( 'gitref_type' in repostatus and repostatus['gitref_type'] is not None ): if repostatus['gitref_type'] == 'branch': if repo.head.is_detached: repostatus['status'] = 'PENDING_UPDATE' repostatus['from'] = repo.commit().hexsha repostatus['to'] = repostatus['branch'] elif repo.head.ref.name != repostatus['branch']: repostatus['status'] = 'PENDING_UPDATE' repostatus['from'] = repo.head.ref.name repostatus['to'] = repostatus['branch'] else: # default branch requested # find its remote name (i.e. 'origin/master') remote_head = next(ref for ref in repo.remotes.origin.refs if '/HEAD' in ref.name).ref.name # ...and convert to a proper local name (i.e. 'master') default_branch = remote_head.split('/', 1)[-1] if repo.head.is_detached: repostatus['status'] = 'PENDING_UPDATE' repostatus['from'] = repo.commit().hexsha repostatus['to'] = default_branch elif default_branch != repo.head.ref.name: repostatus['status'] = 'PENDING_UPDATE' repostatus['from'] = repo.head.ref.name repostatus['to'] = default_branch # Let's check its current commit vs the remote one if repostatus['status'] == 'UNPROCESSED': if ( 'gitref_type' in repostatus and repostatus['gitref_type'] is not None ): gitref_type = repostatus['gitref_type'] desired_gitref = repostatus[gitref_type] if gitref_type == 'branch': remote_ref = str('origin/' + repostatus[gitref_type]) else: remote_ref = str(repostatus[gitref_type]) try: desired_commit = str(repo.commit(remote_ref)) except git_exception.BadName as e: if ("Ref '" + remote_ref + "' did not resolve to an object" in str(e)): # The requested gitref doesn't exist at the remote end (for whatever reason) desired_commit = None # ...so no desired commit possible repostatus['status'] = 'WRONG_REMOTE' repostatus['extra_info'] = "It seems you requested a gitref that can't be found on remote.\n" repostatus['extra_info'] += str(e) else: raise e else: desired_commit = repo.remotes.origin.refs.HEAD.commit.hexsha desired_gitref = repo.git.symbolic_ref('refs/remotes/origin/HEAD').replace('refs/remotes/origin/','') if (desired_commit and local_commit != desired_commit): repostatus['status'] = 'PENDING_UPDATE' repostatus['from'] = local_commit repostatus['to'] = desired_commit # if still unprocessed, it's up to date if repostatus['status'] == 'UNPROCESSED': repostatus['status'] = 'UP_TO_DATE' return repostatus
[docs] def update(self, repoconf): ''' Updates a repo entry as per its current state. :param repoconf: the configuration dictionary of the git repository to be processed. :return: returns the same repoconf dictionary provided as parameter with a new 'status' key populated after update process and, optionally, a 'extra_info' key. ''' # First, let's check the repository's current status repostatus = self.status(repoconf) # print(repostatus) # Then, let's operate on the repository depending on its status if ( # all these are statuses we can't deal with here repostatus['status'] == 'ERROR' or repostatus['status'] == 'UP_TO_DATE' or repostatus['status'] == 'EMPTY' or repostatus['status'] == 'DIRTY' or repostatus['status'] == 'WRONG_REMOTE' ): pass elif repostatus['status'] == 'NOT_CLONED': # Let's try to clone it: # you can `git clone` or `git clone --branch` (which can take either branch or tag but **not** a commit) # in case a specific commit is requested, first "bare" clone, then move to the requested commit. try: if ( repostatus['gitref_type'] == 'branch' or repostatus['gitref_type'] == 'tag' ): gitref_type = repostatus['gitref_type'] repo = Repo.clone_from( url = repostatus['repo'], to_path = repostatus['path'], branch = repostatus[gitref_type], ) else: repo = Repo.clone_from( url = repostatus['repo'], to_path = repostatus['path'], ) if repostatus['gitref_type'] == 'commit': repo.git.checkout(repostatus['commit']) repostatus['status'] = 'CLONED' except git_exception.GitCommandError as e: if e.status == 128: repostatus['status'] = 'ERROR' repostatus['extra_info'] = e.stderr.replace('stderr: ','').strip('\n').strip() else: raise elif repostatus['status'] == 'PENDING_UPDATE': repo = Repo(repostatus['path']) repo.git.fetch(prune=True) if ( 'gitref_type' in repostatus and repostatus['gitref_type'] is not None ): gitref_type = repostatus['gitref_type'] desired_gitref = repostatus[gitref_type] if gitref_type == 'branch': remote_ref = str('origin/' + repostatus[gitref_type]) else: remote_ref = str(repostatus[gitref_type]) else: desired_gitref = repo.git.symbolic_ref('refs/remotes/origin/HEAD').replace('refs/remotes/origin/','') try: repo.git.checkout(desired_gitref) except git_exception.GitCommandError as e: repostatus['status'] = 'ERROR' repostatus['extra_info'] = str(e) if not repo.head.is_detached: try: repo.git.pull() except git_exception.GitCommandError as e: repostatus['status'] = 'ERROR' repostatus['extra_info'] = e.stderr.replace('stderr: ','').strip('\n').strip() if repostatus['status'] != 'ERROR': repostatus['status'] = 'UPDATED' return repostatus
if __name__ == '__main__': # execute only if run as a script sys.exit( main() )