from github.GithubException import UnknownObjectException from github import Github, InputGitAuthor, GithubObject from storage.storage import BaseStorage import os import logging logger = logging.getLogger(__name__) # Github Support - https://pypi.org/project/PyGithub/ class GithubStorage(BaseStorage): TYPE = 'github' def __init__(self, url=None, username=None, password=None, source=None, destination=None, encryption_key=None, sender_name=None, sender_email=None): # The repository is added to the url parameter. Use a '#' as seperator. The repository needs to be created first. # Ex: https://api.github.com/#RepositoryName (url, self.repository) = url.split('#') destination = destination.strip('/') super().__init__(url, username, password, source, destination, encryption_key, sender_name, sender_email) # Create a commiter object when the data is uploaded through one of the invited accounts. self.committer = GithubObject.NotSet if sender_name is not None or sender_email is not None: self.committer = InputGitAuthor(name=sender_name, email=sender_email) def __connect(self): try: assert(self.repo) except AttributeError: client = Github(self.password) self.repo = client.get_user().get_repo(self.repository) logger.info('Created Github.com connection') def file_exists(self, filepath): self.__connect() try: self.repo.get_contents(filepath) return True except UnknownObjectException: return False def directory_exists(self, filepath): return True def _make_folder_action(self, path): # On GitHub you cannot create empty directories. So this actions will always succeed return True def _upload_file_action(self, source, destination): self.__connect() # Read the file and post to Github. The library will convert to Base64 with open(source, 'rb') as datafile: self.repo.create_file(destination.strip('/'), f'Upload from VRE DataDropOff\n Added file: {destination}', datafile.read(), committer=self.committer) return True def _download_file_action(self, source, destination): self.__connect() download = self.repo.get_contents(source) with open(destination, 'wb') as destination_file: destination_file.write(download.decoded_content) return True