Source code for pyautoupdate.launcher

from __future__ import absolute_import, print_function

from datetime import datetime
from logging import WARNING
import multiprocessing
import os
import pprint
import re
import shutil
from sys import version_info
import tempfile
import warnings

# Module in different place depending on python version
if version_info[0] == 2: # pragma: no branch
    from urlparse import urlparse, urlunparse
else:
    from urllib.parse import urlparse, urlunparse

from pkg_resources import parse_version, PEP440Warning
try:
    from pkg_resources import SetuptoolsVersion as Version
    #from packaging.version import Version
    # Behaves differently than the packaging.version
    # Causing test failures
except ImportError:
    from pkg_resources.extern.packaging.version import Version
from setuptools.archive_util import unpack_archive, UnrecognizedFormat

import requests

from ._file_glob import copy_glob
from .exceptions import ProcessRunningException, CorruptedFileWarning


[docs]class Launcher(object): """Creates a :class:`Launcher` object. This class provides the main functionality of the Pyautoupdate module. :param str filepath: Path to file to execute :param str url: Base URL from which to download new versions .. note:: This must be an HTTPS url. HTTP urls are silently changed into HTTPS. **Use of HTTPS is strictly enforced.** Parameters, queries, and fragments will be stripped from the URL. :param str newfiles: Name of ``.zip``, ``tar.gz`` or ``.tar.bz2`` archive with new versions to download from site :param int log_level: Logging level for the built in logger :param tuple args: ``args`` passed to the launched code :param dict kwargs: ``kwargs`` passed to the launched code When the code is launched, the following variables are already defined: +-------------+-------------------------------------------------+ |Variable Name|Value Description | +=============+=================================================+ |``filepath`` |Path to the file that was initially launched | +-------------+-------------------------------------------------+ |``url`` |Base url to check and download new versions | +-------------+-------------------------------------------------+ |``newfiles`` |Name of the archive to download from the server | +-------------+-------------------------------------------------+ |``check_new``|Method to check for updated code | +-------------+-------------------------------------------------+ |``pid`` |PID of parent process that spawns the code | +-------------+-------------------------------------------------+ |``log`` |Logger for Pyautoupdate and for the executed code| +-------------+-------------------------------------------------+ |``args`` |``args`` tuple for the spawned code | +-------------+-------------------------------------------------+ |``kwargs`` |``kwargs`` dict for the spawned code | +-------------+-------------------------------------------------+ .. warning:: The :class:`Launcher` uses :class:`multiprocessing.Process` to run the code. Please ensure that all arguments passed in as ``args`` and ``kwargs`` can be pickled. Non-pickleable arguments cannot be passed to the child process on Windows, and an error will be raised when attempting to run user code. """ # The name of the file containing version numbers version_doc = "version.txt" # A log file that records the results of checking for updates version_check_log = "version_check.log" # The file with the paths of the code and resources in an application file_list = "filelist.txt" # The directory into which the newer versions are unpacked updatedir = ".pyautodownloads" # A marker file used to indicate that a new version is available queue_update = ".queue" def __init__(self, filepath, url, newfiles='project.zip', log_level=WARNING, *args, **kwargs): # Initialize logger self.log = multiprocessing.get_logger() self.log.setLevel(log_level) # Create handle to self.log only if necessary if len(self.log.handlers) == 0: # Create handler to sys.stderr multiprocessing.log_to_stderr() self.log.info("Initializing launcher") self.log.debug("Validating files") # Check that self.version_doc is valid if not self.version_doc_validator(): self.log.error("{0} does not have a valid version number!\n" "{0} is a reserved file name.\n" "It will be overwritten by this program!\n" "If the {0} is corrupted,\n" "Please use the logfile at {1} to restore it." .format(self.version_doc, self.version_check_log)) warnings.warn("{0} is corrupted!".format(self.version_doc), CorruptedFileWarning, stacklevel=2) # Check that self.version_log is valid open(self.version_check_log, 'a').close() # "Touch" self.version_log if not self.version_log_validator(): self.log.warning("Log file at {0} is corrupted!\n" "{0} is a reserved file name.\n" "Please ensure that your program is " "not using it.".format(self.version_check_log)) warnings.warn("{0} is corrupted!" .format(self.version_check_log), CorruptedFileWarning, stacklevel=2) self.log.debug("Validating arguments") # Check that filepath is specified if len(filepath) == 0: raise ValueError("Filepath must not be empty") if ".." in os.path.relpath(filepath, start=os.getcwd()): # Filepath must be inside # Enforcing this prevents ../../../etc/passwd style attacks raise ValueError("Filepath must be inside of folder " "containing initial script") self.filepath = filepath # Check that URL is specified if len(url) == 0: raise ValueError("URL must not be empty") self.url = url # URL parsing section schemaobj = urlparse(self.url) # Add https schema if necessary and replace http with https if schemaobj.scheme not in ["", "https", "http"]: raise ValueError("Url must be http or https") if schemaobj.scheme == "": self.url = "https://" + self.url schemaobj = urlparse(self.url) # Intended behavior is to remove parameters, query, and fragment self.url = urlunparse(("https", schemaobj.netloc, schemaobj.path, "", "", "")) # Append slash to end of URL if it is not present if not url.endswith("/"): self.url = self.url + "/" # Check for valid newfiles # Split along both types of path seperators if len(re.split(r"\\|\/", os.path.normpath(newfiles))) > 1: raise ValueError("newfiles should be a single archive name") elif not newfiles.endswith((".zip", ".tar.gz", ".tar.bz2")): raise ValueError("newfiles must be a zip, gzip, or bzip file") else: self.newfiles = newfiles # Initialize other variables self.update = multiprocessing.Lock() self.pid = os.getpid() self.args = args self.kwargs = kwargs self.__process = multiprocessing.Process(target=self._call_code, args=self.args, kwargs=self.kwargs) self.past_terminated = False self.__process_alive = multiprocessing.Event() self.log.info("Launcher initialized successfully") ####################### Filename getters and validators ######################
[docs] def version_doc_validator(self): """Validates the file containing the current version number. :return: Whether the version_doc is a proper version :rtype: bool """ # Version is valid only if it exists version_valid = os.path.isfile(self.version_doc) if version_valid: try: # If statement earlier signifies that version file must exist with open(self.version_doc, "r") as version_check: # Read and parse version # If vers is empty, SetuptoolsLegacyVersion is created # Empty version cannot be valid vers = version_check.read() vers_obj = parse_version(vers) if not isinstance(vers_obj, Version): raise PEP440Warning except PEP440Warning: # Thrown if file has invalid version version_valid = False return version_valid
[docs] def version_log_validator(self): """Validates the file containing the version history. :return: Whether the version_log is formatted properly :rtype: bool """ valid_log = True # Match log file against regex with open(self.version_check_log, "r") as log_file: log_syntax = re.compile( r"Old .+?\|(New .+?|Up to date|Server invalid)\|Time .+?") version = log_file.read() if version != "\n" and len(version) > 0: has_match = re.match(log_syntax, version) valid_log = bool(has_match) return valid_log
########################### Process manipulation ############################# @property def process_is_alive(self): """Property indicating whether the process is alive To see if user code is running, please use :meth:`Launcher.process_code_running`. Note that the process needs to initialize itself before it can run user code. """ return self.__process.is_alive() @property def process_code_running(self): """Property indicating whether the user code is running To see if the process has started, please use :meth:`Launcher.process_is_alive`. Note that the process needs to initialize itself before it can run user code. """ return self.__process_alive.is_set() @property def process_pid(self): """Property indicating the process PID, if it exists""" return self.__process.pid @property def process_exitcode(self): """Property indicating the process exitcode, if it exists""" if self.past_terminated: # SIGTERM is signal 15 on Linux # Preserve compatibility on Windows return -15 else: return self.__process.exitcode
[docs] def process_join(self, timeout=None): """Joins the process .. seealso:: :meth:`multiprocessing.Process.join` """ self.log.info("Joining process") self.__process.join(timeout)
[docs] def process_terminate(self): """Terminates the process. .. warning:: All the provisos of :meth:`multiprocessing.Process.terminate` apply. Attempts are made in the code to ensure that internal variables inside the Launcher class are properly cleaned up. However, there is little protection for user supplied code in case of termination. :return: Whether process was terminated :rtype: bool """ # TODO: Troubleshoot xfail test if self.process_is_alive: self.log.warning("Terminating Process") self.__process.terminate() self.__process_alive.clear() # Release lock to avoid update deadlock later self.log.debug("Releasing code lock after termination") self.update.release() # Reinitialize process now because is_alive is not properly reset # After a process termination self.log.debug("Reinitializing process object after termination") self.__process = None self.__process = multiprocessing.Process(target= self._call_code, args=self.args, kwargs=self.kwargs) self.past_terminated = True return True else: self.log.warning("Attempted to terminate dead process") return False
########################### Code execution methods ###########################
[docs] def _call_code(self, *args, **kwargs): """Internal function to execute the user code. This is used as target of a :class:`multiprocessing.Process` instance. :param tuple args: ``*args`` tuple from self.args :param dict kwargs: ``**kwargs`` dict from self.kwargs .. warning:: End users should never call this directly. Please use the :meth:`run` method instead. """ # Open code file # Acquire lock here to avoid TOCTTOU issues with opened code file # multiprocessing.get_logger again since this is not pickleable local_log = multiprocessing.get_logger() local_log.debug("Acquiring code lock to run code") self.update.acquire() with open(self.filepath, mode='r') as code_file: code = code_file.read() # Set up variables visible to child process localvar = vars(self).copy() # Manipulate __dict__ attribute to add handle to check_new localvar["check_new"] = self.check_new # Remove handle to process object and lock # Neither should not be tampered with in child process code del localvar["_Launcher__process"] del localvar["_Launcher__process_alive"] del localvar["update"] del localvar["past_terminated"] # Pass in args, kwargs, and logger localvar["args"] = args localvar["kwargs"] = kwargs localvar["log"] = local_log local_log.debug("Starting process with " "the following local variables:\n" + pprint.pformat(localvar)) # Execute code in file local_log.info("Starting code from file") try: self.__process_alive.set() exec(code, dict(), localvar) finally: local_log.debug("Releasing code lock after running code") self.update.release() self.__process_alive.clear() # Reset past_terminated to False # (if terminated and rerun, past_terminated should be false) self.past_terminated = False
[docs] def run(self, background=False): """Runs the user code. If background is ``False``, returns the Process's exitcode. :param bool background: Whether to run code in background :return: the exit code if background is ``False`` :rtype: :class:`int` or :class:`None` """ # Find the right error to raise depending on python version self.log.info("Checking file existence") try: error_to_raise = FileNotFoundError except NameError: error_to_raise = IOError if not os.path.isfile(self.filepath): raise error_to_raise("No file at {0}".format(self.filepath)) self.log.info("Checking process status") if self.process_is_alive: self.log.error("Process is already running") raise ProcessRunningException elif self.process_pid is None: # Process has not run yet self.log.info("Process has not run yet") self.log.info("Starting process") # self.log is not pickleable # The variable will be reinstantiated inside _call_code # Temporarily remove here and reinstantiate after start del self.log try: self.__process.start() finally: self.log = multiprocessing.get_logger() self.log.info("Process started") if not background: self.process_join() # Exit code can be used by program that calls the launcher return self.process_exitcode elif self.process_exitcode is not None: # Process has already terminated # Reinitialize the process instance self.log.info("Process has already finished") self.log.info("Reinitializing process object") self.__process = multiprocessing.Process(target= self._call_code, args=self.args, kwargs=self.kwargs) # Recursion, since this will reset @property properties self.run(background) else: # pragma: no cover # Should never happen self.log.error("Process exitcode exists without PID!") self.log.error("The application is probably in an unstable state.")
######################### New code retrieval methods #########################
[docs] def check_new(self): """Retrieves the latest version number from the remote host. :return: Whether a newer version is available :rtype: bool .. note:: This function internally uses setuptool's ``parse_version`` to compare versions. Any versioning scheme conforming to :pep:`440` can be used. When the server contains an invalid version specification, this returns ``false``. .. versionchanged:: 1.0.0 Previously, an invalid server version would cause an exception. """ self.log.info("Checking for updates") request_time = datetime.utcnow() # If self.queue_update is already present, return false # TODO: Check again? if os.path.isfile(self.queue_update): with open(self.queue_update, 'r') as new_version: newver = new_version.read() newver_obj = parse_version(newver) return isinstance(newver_obj, Version) else: versionurl = self.url + self.version_doc # Get new files self.log.debug("Retrieving new version from {0}" .format(versionurl)) get_new = requests.get(versionurl, allow_redirects=True) get_new.raise_for_status() newver = get_new.text newver_obj = parse_version(newver) # Read in old version with open(self.version_doc, 'r') as old_version: oldver = old_version.read() oldver = oldver.rstrip("\n") # Compare old version with new version invalid = not isinstance(newver_obj, Version) # Check if new version is valid if invalid: self.log.error("Retrieved version number is invalid!\n" "Please contact the software authors.\n" "Please include the generated data dump " "in a bug report.") # Write notification into log file version_to_add = "Old {0}|Server Invalid|Time {1}\n"\ .format(oldver, request_time) with open(self.version_check_log, "a") as log_file: log_file.write(version_to_add) newver_dump = None # If invalid, dump into dump file try: newver_dump = tempfile.NamedTemporaryFile(prefix="newverdump", delete=False, mode="wt", dir=os.getcwd()) self.log.error("Writing invalid version into {0}" .format(newver_dump.name)) newver_dump.write(newver) # finally runs after return statement return False except Exception: self.log.exception("Unable to write data dump") return False finally: if newver_dump is not None: newver_dump.close() # Throw warning after logging # If version is invalid, upgrade cannot succeed warnings.warn("Invalid Server version!", CorruptedFileWarning) # newver_obj will be proper version by this point has_new = (newver_obj > parse_version(oldver)) # Add entry to the logfile and update version.txt if has_new: version_to_add = "Old {0}|New {1}|Time {2}\n"\ .format(oldver, newver, request_time) with open(self.queue_update, 'w') as new_version: new_version.write(newver) else: version_to_add = "Old {0}|Up to date|Time {1}\n"\ .format(oldver, request_time) with open(self.version_check_log, "a") as log_file: log_file.write(version_to_add) return has_new
[docs] def _reset_update_files(self): """Resets the update files to its default state. It empties the existing update directory or creates a new one if it doesn't exist. """ self.log.debug("Resetting update directory") if os.path.isdir(self.updatedir): # Remove old contents shutil.rmtree(self.updatedir) # Make new empty directory # shutil.rmtree would have deleted the directory os.mkdir(self.updatedir) # Remove old archive if it is left behind if os.path.isfile(self.newfiles): os.remove(self.newfiles)
[docs] def _get_new(self, allow_redirects=True, chunk_size=512): """Retrieves the new archive and extracts it to self.updatedir.""" if not os.path.isfile(self.queue_update): self.log.info("No need to retrieve new version as " "existing one is up to date") return self.log.info("Retrieving new version") newurl = self.url + self.newfiles # Get new files http_get = requests.get(newurl, stream=True, allow_redirects=allow_redirects) http_get.raise_for_status() with open(self.newfiles, 'wb') as filehandle: for chunk in http_get.iter_content(chunk_size=chunk_size): if chunk: filehandle.write(chunk) # Unpack archive and remove it after extraction try: self.log.info("Unpacking downloaded archive") unpack_archive(self.newfiles, self.updatedir) except UnrecognizedFormat: self.log.error("Retrieved version archive is invalid!\n" "Please contact the software authors.\n" "Please include the invalid archive " "in a bug report.") os.rename(self.newfiles, self.newfiles + ".dump") else: # Remove archive only if unpack operation succeeded self.log.info("Removing archive after extraction") os.remove(self.newfiles)
[docs] def _replace_files(self): """Replaces the existing files with the downloaded files. :return: Whether update succeeded :rtype: bool """ # Only replace if update and replacement are queued is_downloaded = os.path.isdir(self.updatedir) and \ os.listdir(self.updatedir) if not (os.path.isfile(self.queue_update) and is_downloaded): return False # Attempt to acquire code lock here and exit if unable to # The finally block runs after the "return" statement # This can cause a double-release under some circumstances # Acquiring the lock here prevents this from happening else: self.log.debug("Acquiring code log to update files") if not self.update.acquire(False): self.log.warning("Could not acquire lock to update files") return False try: # TODO: Make this code safer and possibly leave diagnostics # if the update operation errors out in the middle self.log.debug("Writing new version into {0}" .format(self.version_doc)) os.rename(self.version_doc, self.version_doc + ".bak") os.rename(self.queue_update, self.version_doc) os.remove(self.version_doc + ".bak") self.log.info("Replacing files") # Read in files from filelist and move to tempdir # If update fails, it is important to leave tempdir for diagnostics # Not cleaned up in a finally statement by design tempdir = tempfile.mkdtemp() self.log.debug("Created tempdir at {0}".format(tempdir)) self.log.info("Backing up current filelist") filelist_backup = None try: filelist_backup = tempfile.NamedTemporaryFile(delete=False) with open(self.file_list, "r+b") as file_handle: shutil.copyfileobj(file_handle, filelist_backup) except Exception: self.log.exception("Backup of current filelist failed!") raise finally: if filelist_backup is not None: filelist_backup.close() self.log.info("Moving old files to tempdir") with open(self.file_list, "r") as file_handle: for line in file_handle: file_rm = os.path.normpath(os.path.join(".", line)) file_rm = file_rm.rstrip("\n") # Confirm that each file in filelist exists if not os.path.isfile(file_rm): self.log.error("{0} contains the invalid " "filepath {1}.\n" "Please check that {0} is not being " "used!\n" "Otherwise the {0} is corrupted.\n" "Updates will fail until this is " "restored." .format(self.file_list, file_rm)) warnings.warn("{0} is corrupted and contains the " "invalid path {1}!" .format(self.file_list, file_rm), CorruptedFileWarning, stacklevel=2) else: file_rm_temp = os.path.join(tempdir, file_rm) file_rm_temp_dir = os.path.dirname(file_rm_temp) if not os.path.isdir(file_rm_temp_dir): # exist_ok does not exist in Python 2 os.makedirs(file_rm_temp_dir) if file_rm.split(os.path.sep)[0] not in \ [self.updatedir, self.version_doc, self.version_check_log]: self.log.debug("Moving {0} to {1}".format(file_rm, tempdir)) shutil.move(file_rm, file_rm_temp) file_rm_dir = os.path.dirname(file_rm) if os.path.isdir(file_rm_dir): if not os.listdir(file_rm_dir): os.rmdir(file_rm_dir) self.log.debug("Removing directory {0}" .format(file_rm_dir)) self.log.info("Removing old filelist") os.remove(self.file_list) self.log.info("Creating new filelist") filelist_new = list() relpath_start = os.path.join(self.updatedir) for dirpath, _, filenames in os.walk(self.updatedir): # _ is dirnames, but it is unused for filename in filenames: filepath = os.path.normpath(os.path.join(dirpath, filename)) filepath = os.path.relpath(filepath, start=relpath_start) filepath += "\n" filelist_new.append(filepath) self.log.debug("New filelist is:\n" + pprint.pformat(filelist_new)) self.log.info("Writing new filelist to {0}" .format(self.file_list)) with open(self.file_list, "w") as file_handle: file_handle.writelines(filelist_new) self.log.info("Copying downloaded contents to current directory") copy_glob(os.path.join(self.updatedir, "*"), ".") self.log.info("Removing backup filelist") os.remove(filelist_backup.name) self.log.info("Removing tempdir") shutil.rmtree(tempdir) self._reset_update_files() except Exception: self.log.exception("An error occured during the update process.") self.log.error("The temporary directory used is left behind at {}." .format(tempdir)) raise finally: self.log.debug("Releasing lock after updating files") self.update.release() return True
[docs] def update_code(self): """Updates the code if necessary. :return: Whether update succeeded :rtype: bool """ if self.check_new(): # self.check_new will create a self.queue_update file self.log.info("Beginning update process") if (not os.path.isdir(self.updatedir) or (os.path.isdir(self.updatedir) and os.listdir(self.updatedir))): self._reset_update_files() self._get_new() update_successful = self._replace_files() if update_successful: self._reset_update_files() self.log.info("Update successful") else: self.log.info("Update failed") else: self.log.info("Already up to date") update_successful = False return update_successful