Source code for AdvConfigMgr.config_storage

__author__ = 'dstrohl'

from AdvConfigMgr.config_exceptions import *
from AdvConfigMgr.utils import make_list, merge_dictionaries
from AdvConfigMgr.utils.filehandler import PathHandler
from argparse import ArgumentParser
import copy
import re
import sys
import shutil
from pathlib import Path
from datetime import datetime

__all__ = ['BaseConfigStorageManager', 'StorageManagerManager', 'ConfigCLIStorage', 'ConfigSimpleDictStorage',
           'ConfigFileStorage', 'ConfigStringStorage', 'BaseConfigRecordBasedStorageManager']


[docs]class BaseConfigStorageManager(object): """ Base class for storage managers, efines an expandable storage subsystem for configs. Also, the two methods; BaseConfigStorageManager.read() and BaseConfigStorageManager.write() need to be overwritten to read and write the data in the format needed. if the manager is intended to be a 'standard' one, in other words, if it will be used for automatic read-all/write-all processes, it must be able to run without passing any data or arguments, all configuration must be done during initialization. if it will only be used standalone or on-demand, you can allow any information to be passed. """ """:param str storage_type_name: This is the name of the manager class, it is used in log entries and potentially UI's.""" storage_type_name = 'Base' """:param str storage_name: The internal name of the storage manager, must be unique""" storage_name = None """ :param bool force_strings: If True, the system will convert all options to strings before writing to the manager, and from strings when reading from it. """ force_strings = False """:param bool standard: True if this should be used for read_all or write_all ops""" standard = True """:param bool allow_create: True if this can create options in the system, even if they are not pre-configured.""" allow_create = True """:param bool force: True if this will set options even if they are locked""" force = False """:param bool overwrite: True if this will overwrite options that have existing values""" overwrite = True """:param bool lock_after_read: True if this will lock the option after reading""" lock_after_read = False """:param int priority: the priority of this manager, with smallest being run earlier than larger.""" priority = 100 def __init__(self): """ """ """:type self.manager: AdvConfigMgr.advconfigmgr.ConfigManager """ self.manager = None # this is set during registration. self._flat_dict = None self.last_section_count = 0 self.last_option_count = 0 self.data = None ip.info('Loading storage manager: ', self.storage_name)
[docs] def config(self, config_dict): """ :param config_dict: a dictionary with storage specific configuration options., this is called after the storage manager is loaded. """ self.priority = config_dict.get('priority', self.priority) self.storage_type_name = config_dict.get('storage_type_name', self.storage_type_name) self.storage_name = config_dict.get('storage_name', self.storage_name) self.force_strings = config_dict.get('force_strings', self.force_strings) self.standard = config_dict.get('standard', self.standard) self.allow_create = config_dict.get('allow_create', self.allow_create) self.force = config_dict.get('force', self.force) self.overwrite = config_dict.get('overwrite', self.overwrite) self.lock_after_read = config_dict.get('lock_after_read', self.lock_after_read)
[docs] def read(self, section_name=None, storage_name=storage_name, **kwargs): """ Read from storage and save to the system :param section_name: A string or list of sections to read from in the config. :type section_name: str or list :param str storage_name: A string name of the storage manager, this can be used to override the configured name. :param kwargs: each storage manager may define its own additional args, but must also implement the final kwargs parameter so that if it is called with other arguments, it wont cause an error. :return: the number of sections / options added The recommended implementation method us to read from your storage method (database, special file, etc) and store the arguments in a dictionary or dictionary of dictionaries. then pass that dict to :py:meth:`BaseConfigStorageManager._save_dict`. that method will take care of writing the data, converting it if needed, making sure that it is allowed to write, handling locked sections and options, etc... if the implementation tries to pass data directly to the file manager for importing, it will save the data in :py:meth:`BaseConfigStorageManager.data` where you can read it, so you should check this before processing. You shoudl keep track of the number of sections and options written/read and return these at the end:: return self.last_section_count, self.last_option_count """ raise NotImplementedError
[docs] def write(self, section_name=None, storage_name=storage_name, **kwargs): """ Write data from the system and save to your storage :param section_name: A string or list of sections to write to. :type section_name: str or list :param str storage_name: A string name of the storage manager, this can be used to override the configured name. :param kwargs: each storage manager may define its own additional args, but must also implement the final kwargs parameter so that if it is called with other arguments, it wont cause an error. :return: the number of sections / options written The recommended implementation method is to call :py:meth:`BaseConfigStorageManager._get_dict` which will return a dictionary of the options or dictionary of sections (which are dicts of options) to be saved. You can then iterate through these and save them in your storage system. if you want to return data direct from the write method, you should copy it to :py:meth:`BaseConfigStorageManager.data` after processing. you shoudl keep track of the number of sections and options written/read and return these at the end:: return self.last_section_count, self.last_option_count """ raise NotImplementedError
@property def is_default(self): return self.manager.storage.default_manager.storage_name == self.storage_name def _ok_to_read_section(self, section_name, storage_name=storage_name): ip.debug('checking for OK to READ section, ', section_name, ' with storage ', storage_name) if section_name not in self.manager: if self.allow_create and self.manager.allow_create_from_storage: self.manager.add_section(dict(name=section_name, storage_write_to=self.storage_name)) ip.a().debug('YES').s() return True else: ip.a().debug('NO: section name not valid').s() return False if storage_name == '*': ip.a().debug('YES').s() return True tmp_section_tag = self.manager[section_name].storage_read_from_only if tmp_section_tag is None: ip.a().debug('YES').s() return True if storage_name in list(tmp_section_tag): ip.a().debug('YES').s() return True ip.a().debug('NO: fell through checks.').s() return False def _ok_to_write_section(self, section_name, storage_name=None): ip.debug('checking for OK to WRITE section, ', section_name, ' with storage ', storage_name) if storage_name is None: storage_name = self.storage_name if storage_name == '*': ip.a().debug('YES').s() return True tmp_section_tag = self.manager[section_name].storage_write_to if tmp_section_tag is None: if self in self.manager.storage.manager_list: ip.a().debug('YES').s() return True else: ip.a().debug('NO: section name does not have storage name defined, and storage is not in default.') ip.debug('current storage: ', self) ip.debug('default list: ', self.manager.storage.manager_list).s() return False if tmp_section_tag == '*' or tmp_section_tag == storage_name: ip.a().debug('YES').s() return True ip.a().debug('NO: fell through checks.').s() return False def _get_dict(self, section_name=None, storage_name=storage_name): """ Returns a dictionary of options. :param section_name: a string name of a section or list of section names to get to. if this is a single string, it is assumed it is the base of the dictionary and all keys are options, if this is None or if it is a tuple/list, it is assumed that the keys of the dictionary are sections, containing dictionaries of options. If this is None, all sections will be queried based on their storage name. this does NOT override the storage_name. :type section_name: str or list or None :param str storage_name: allows overriding the storage name :return: A dictionary of the options matching the sections and storage names passed. :rtype: dict """ ip.debug('storage [', self.storage_name, '] creating a dictionary of options.').push() self.last_section_count = 0 self.last_option_count = 0 tmp_ret = {} if isinstance(section_name, str) or self.manager._no_sections: self._flat_dict = True else: self._flat_dict = False if section_name is None: section_name = self.manager.sections else: section_name = make_list(section_name) ip.debug('section name parameter: ', section_name) for section in self.manager: ip.debug('storage [', self.storage_name, '] checking section ', section.name) ok_2_get = False if section_name == [None] or section.name in section_name: ip.debug('storage [', self.storage_name, '] section selectable', section.name) if self._ok_to_write_section(section.name, storage_name): ip.debug('storage [', self.storage_name, '] checking allowed ', section.name) if len(section) > 0: ip.debug('storage [', self.storage_name, '] checking has content ', section.name) ok_2_get = True if ok_2_get: ip.debug('storage [', self.storage_name, '] getting section ', section.name) tmp_sec = {} self.last_section_count += 1 for option in section: opt_success, opt_value = self._get_option(section, option) if opt_success: self.last_option_count += 1 tmp_sec[option.name] = opt_value if self._flat_dict: tmp_ret = tmp_sec else: tmp_ret[section.name] = tmp_sec ip.debug('returning ', tmp_ret) ip.pop() return tmp_ret def _get_option(self, section, option): """ Assumes that the section is already checked for tag permissions, Assumes that the section and option do exist. :param section: the section object to get :param option: the option object to get :return: success, value success: [True/False] True if the data was successfully returned value: the value to store """ ip.debug('getting option [', option, '] for storage ', self.storage_name).a() get_rec = True tmp_ret = None if not option.has_set_value: if not option.has_default_value: get_rec = False elif not section.store_default: get_rec = False if get_rec: tmp_ret = option.to_write(as_string=self.force_strings) ip.s() return get_rec, tmp_ret def _save_dict(self, dict_in, section_name=None, storage_name=None): """ Takes a dictionary and saves it to the system :param dict dict_in: the dictionary to save. if a single section name is passed, OR if the config manager is set to simple config (no sections), this should be a dictionary of options. otherwise this should be a dictionary of sections, each a dictionary of options. :param section_name: a string name of a section or list of section names to save to. if this is a single string, it is assumed it is the base of the dictionary and all keys are options, if this is None, all of the sections in the dict will be processed, if it is a tuple/list, then only the sections in the dict that match items in the section_name will be processed. This does not override the storage names. :type section_name: str or list or None :param str storage_name: allows overriding the storage name """ if storage_name is None: storage_name = self.storage_name self.last_section_count = 0 self.last_option_count = 0 self.processed_sections = [] # used by the record type storage manager. if self.manager._no_sections: section_name = self.manager._DEFAULT_SECT_NAME if isinstance(section_name, str): dict_in = {section_name: dict_in} if section_name is not None: section_name = make_list(section_name) else: section_name = self.manager.sections for section, options in dict_in.items(): section, option = self.manager._xf(section) if section in section_name: if self._ok_to_read_section(section, storage_name): self.processed_sections.append(section) self.last_section_count += 1 storage_version = options.get(self.manager[section].version_option_name, None) options = self.manager[section].migrate_dict(storage_version, options) for option, value in options.items(): sav_suc = self._set_option(section, option, value) if sav_suc: self.last_option_count += 1 def _set_option(self, section_name, option_name, value): """ Assumes that the section is already checked for the tag. .. note:: if the storage method only stores strings, and this has to create an option, that option will be created as a string. """ saved = False ip.debug('reading option [', option_name, '] from storage ', self.storage_name).a() save_option = True section = self.manager[section_name] if option_name not in section: if not self.allow_create: ip.error('option [', option_name, '] does not exist and allow_create is False') raise NoOptionError(option_name, section) elif section.locked and not self.force: ip.error('option [', option_name, '] does not exist and section is locked') raise NoOptionError(option_name, section) section.add(dict(name=option_name, default_value=value, do_not_change=self.lock_after_read)) saved = True else: option_rec = section.item(option_name) if option_rec.has_set_value and not self.overwrite: ip.warning('option [', option_name, '] has a value and overwrite is False') save_option = False elif option_rec.do_not_change and not self.force: ip.warning('option [', option_name, '] has a is locked and force is False') save_option = False if save_option: option_rec.from_read(value, from_string=self.force_strings) option_rec.do_not_change = self.lock_after_read saved = True ip.debug('option [', option_rec.path, '] updated with: ', option_rec) ip.s() return saved def __repr__(self): return self.storage_type_name + ' [' + self.storage_name + ']'
class BaseConfigRecordBasedStorageManager(BaseConfigStorageManager): """ This base method is intended to be used for record based storage managers, when saving options to the system, this will also poll the deleted records list and remove them from the database. """ def _save_dict(self, dict_in, section_name=None, storage_name=None): self.processed_sections = [] super(BaseConfigRecordBasedStorageManager, self)._save_dict(dict_in=dict_in, section_name=section_name, storage_name=storage_name) for section in self.processed_sections: deleted_records = self.manager[section].migration.options_to_remove for del_rec in deleted_records: self.delete_record(section, del_rec) def delete_record(self, section, option): """ This must be implemented for records based storage managers, This method takes a section and option, and deletes that record in the database. """ raise NotImplementedError
[docs]class ConfigCLIStorage(BaseConfigStorageManager): """ Read configuration from the CLI """ storage_type_name = 'CLI Manager' storage_name = 'cli' standard = True #: True if this should be used for read_all/write_all ops force_strings = False #: True if the storage only accepts strings force = True #: True if this will set options even if they are locked overwrite = True #: True if this will overwrite options that have existing values lock_after_read = True #: True if this will lock the option after reading priority = 1 def __init__(self): self._reset_config_cache = True self._cli_parser = None super(ConfigCLIStorage, self).__init__()
[docs] def read(self, section_name=None, storage_name=storage_name, **kwargs): """ will take a dictionary and save it to the system :param dict_in: :param storage_name: :return: """ self._parse_cli(self.data) self.data = None return self.last_section_count, self.last_option_count
[docs] def write(self, section_name=None, storage_name=storage_name, **kwargs): """ cli does not accept writing options -- disabled """ self.last_section_count = 0 self.last_option_count = 0 return self.last_section_count, self.last_option_count
[docs] def reset_cache(self): """ Reloades the cli_parser from the config. """ self._reset_config_cache = True
@property def cli_parser(self): if self._cli_parser is None or self._reset_config_cache: ip.debug('Creating CLI Parser').a() self._cli_parser = ArgumentParser(**self.manager._cli_parser_args) cli_sect = self._cli_parser for s in self.manager: ip.debug('checking for cli options in sections: ', s.name) if self.manager._cli_group_by_section and s._cli_args: ip.debug('creating CLI section: ', s._cli_section_options['title']) cli_sect = self._cli_parser.add_argument_group(**s._cli_section_options) for d, o in s._cli_args.items(): tmp_args = copy.copy(o) tmp_flags = tmp_args.pop('flags') ip.debug('creating CLI argument "', tmp_flags, '" with options ', tmp_args) cli_sect.add_argument(*tmp_flags, **tmp_args) self._reset_config_cache = False else: ip.debug('CLI PARSER FOUND') ip.s() return self._cli_parser def _parse_cli(self, args=None): """ will parse any cli arguments based on the configuration settings :param args: a list of arguments can be passed in which case the method will parse the list instead of sys.args() """ ip.debug('Parsing CLI arguments: ', args) tmp_args = vars(self.cli_parser.parse_args(args)) for dest, value in tmp_args.items(): self.last_option_count += 1 self.manager._cli_args[dest].from_read(value, from_string=True)
[docs]class ConfigSimpleDictStorage(BaseConfigStorageManager): """Read configuration from a dictionary. Keys are section names, values are dictionaries with keys and values that should be present in the section. """ storage_type_name = 'Simple Dictionary Storage' storage_name = 'dict' standard = False #: True if this should be used for read_all/write_all ops
[docs] def read(self, section_name=None, storage_name=storage_name, **kwargs): """ will take a dictionary and save it to the system :param dict_in: :param storage_name: :return: """ self._save_dict(self.data, section_name, storage_name) return self.last_section_count, self.last_option_count
[docs] def write(self, section_name=None, storage_name=storage_name, **kwargs): """ will return a dictionary from the system :param storage_name: :return: """ self.data = self._get_dict(section_name, storage_name) return self.last_section_count, self.last_option_count
class ConfigStringStorage(BaseConfigStorageManager): """ A file manager that returns or saves configuration in the format of a string of list. in a text file this manager handles strings formatted as a standard INI file, or lists of strings formatted that way. :param tuple delimiters: the delimiter between the key and the value :param tuple comment_prefixes: this is a tuple of characters that if they occur as the first non-whitespace character of a line, the line is a comment :param tuple inline_comment_prefixes: this is a tuple of characters that if they occur elsewhere in the line after a whitespace char, the rest of the line is a comment. :param bool space_around_delimiters: True if space should be added around the delimeters. :param bool strict: if False, duplicate sections will be merged, if True, duplicate sections will raise an error """ _delimiters = ('=', ':') _comment_prefixes = ('#', ';') _inline_comment_prefixes = set() _space_around_delimiters = True _strict = True storage_type_name = 'INI String' storage_name = 'string' #: the internal name of the storage manager, must be unique force_strings = True #: True if the storage only accepts strings # Regular expressions for parsing section headers and options _SECT_TMPL = r""" \[ # [ (?P<header>[^]]+) # very permissive! \] # ] """ _OPT_TMPL = r""" (?P<option>.*?) # very permissive! \s*(?P<vi>{delim})\s* # any number of space/tab, # followed by any of the # allowed delimiters, # followed by any space/tab (?P<value>.*)$ # everything up to eol """ _OPT_NV_TMPL = r""" (?P<option>.*?) # very permissive! \s*(?: # any number of space/tab, (?P<vi>{delim})\s* # optionally followed by # any of the allowed # delimiters, followed by any # space/tab (?P<value>.*))?$ # everything up to eol """ # Compiled regular expression for matching sections SECTCRE = re.compile(_SECT_TMPL, re.VERBOSE) # Compiled regular expression for matching options with typical separators OPTCRE = re.compile(_OPT_TMPL.format(delim="=|:"), re.VERBOSE) # Compiled regular expression for matching options with optional values # delimited using typical separators OPTCRE_NV = re.compile(_OPT_NV_TMPL.format(delim="=|:"), re.VERBOSE) # Compiled regular expression for matching leading whitespace in a line NONSPACECRE = re.compile(r"\S") # Possible boolean values in the configuration. BOOLEAN_STATES = {'1': True, 'yes': True, 'true': True, 'on': True, '0': False, 'no': False, 'false': False, 'off': False} def __init__(self): if self._delimiters == ('=', ':'): self._optcre = self.OPTCRE_NV else: d = "|".join(re.escape(d) for d in self._delimiters) self._optcre = re.compile(self._OPT_NV_TMPL.format(delim=d), re.VERBOSE) super(ConfigStringStorage, self).__init__() def read(self, section_name=None, storage_name=storage_name, **kwargs): """ will read an ini file and save it to the system :param section_name: :type section_name: str or list :param str storage_name: :return: :rtype: int """ if isinstance(self.data, str): self.data = self.data.splitlines() out_dict = self._parse_list(self.data, 'passed_string') self._save_dict(out_dict, section_name, storage_name) self.data = None return self.last_section_count, self.last_option_count def _parse_list(self, list_in, filename): """ Parse a sectioned list from a config file. Each section in a configuration file contains a header, indicated by a name in square brackets (`[]'), plus key/value options, indicated by `name' and `value' delimited with a specific substring (`=' or `:' by default). Values can span multiple lines, as long as they are indented deeper than the first line of the value. Depending on the parser's mode, blank lines may be treated as parts of multiline values or ignored. Configuration files may include comments, prefixed by specific characters (`#' and `;' by default). Comments may appear on their own in an otherwise empty line or may be entered in lines holding values or section names. """ out_dict = {} elements_added = set() cursect = None # None, or a dictionary sectname = None optname = None lineno = 0 indent_level = 0 e = None # None, or an exception for lineno, line in enumerate(list_in, start=1): comment_start = sys.maxsize # strip comments for prefix in self._comment_prefixes: if line.strip().startswith(prefix): comment_start = 0 break if comment_start == sys.maxsize: inline_pos = [comment_start] for prefix in self._inline_comment_prefixes: index = line.find(prefix) if index == -1: continue if index == 0 or (index > 0 and line[index - 1].isspace()): inline_pos.append(index) comment_start = min(inline_pos) if comment_start == sys.maxsize: value = line.strip() elif comment_start == 0: value = None else: value = line[:comment_start].strip() if not value: # empty line marks end of value indent_level = sys.maxsize continue # continuation line? first_nonspace = self.NONSPACECRE.search(line) cur_indent_level = first_nonspace.start() if first_nonspace else 0 if (cursect is not None and optname and cur_indent_level > indent_level): cursect[optname].append(value) # a section header or option header? else: indent_level = cur_indent_level # is it a section header? mo = self.SECTCRE.match(value) if mo: sectname = mo.group('header') if sectname in out_dict: if self._strict: raise DuplicateSectionError(sectname, filename, lineno) cursect = out_dict[sectname] else: cursect = {} out_dict[sectname] = cursect elements_added.add(sectname) # So sections can't start with a continuation line optname = None # no section header in the file? elif cursect is None: raise MissingSectionHeaderError(filename, lineno, line) # an option line? else: mo = self._optcre.match(value) if mo: optname, vi, optval = mo.group('option', 'vi', 'value') if not optname: e = self._handle_error(e, filename, lineno, line) optname = optname.strip() if self._strict and optname in cursect: raise DuplicateOptionError(sectname, optname, filename, lineno) # This check is fine because the OPTCRE cannot # match if it would set optval to None if optval is not None: optval = optval.strip() cursect[optname] = [optval] else: # valueless option handling cursect[optname] = None else: # a non-fatal parsing error occurred. set up the # exception but keep going. the exception will be # raised at the end of the file and will contain a # list of all bogus lines e = self._handle_error(e, filename, lineno, line) # if any parsing errors occurred, raise an exception if e: raise e for key, item in out_dict.items(): if self.manager._no_sections: if isinstance(item, list): out_dict[key] = '\n'.join(item).rstrip() else: for option, value in item.items(): if isinstance(value, list): out_dict[key][option] = '\n'.join(value).rstrip() return out_dict # **************************************************************************************************** # *** Write files section # **************************************************************************************************** def _dict_to_list(self, dict_in, new_line=True): tmp_list = [] if self._flat_dict: tmp_list.extend(self._format_section(dict_in, 'DEFAULT', new_line=new_line)) else: for k, s in dict_in.items(): tmp_list.extend(self._format_section(s, k, new_line)) return tmp_list def _format_section(self, option_dict, section_name, new_line): tmp_ret = [] if new_line: line_end = '\n' else: line_end = '' if self._space_around_delimiters: delimiter = " {} ".format(self._delimiters[0]) else: delimiter = self._delimiters[0] tmp_ret.append("[{}]{}".format(section_name, line_end)) for key, value in option_dict.items(): if value is not None: value = delimiter + str(value).replace('\n', '\n\t') else: value = "" tmp_ret.append("{}{}{}".format(key, value, line_end)) if new_line: tmp_ret.append(line_end) return tmp_ret def write(self, section_name=None, storage_name=storage_name, **kwargs): """ will write to an INI file. """ self.data = None tmp_dict_to_save = self._get_dict(section_name=section_name, storage_name=storage_name) self.data = self._dict_to_list(tmp_dict_to_save, False) return self.last_section_count, self.last_option_count def _handle_error(self, exc, fpname, lineno, line): if not exc: exc = ParsingError(fpname) exc.append(lineno, repr(line)) return exc
[docs]class ConfigFileStorage(ConfigStringStorage): """ A file manager that stores config files in a text file this manager can handle multiple files, as well as a string or list of data, as long as the data is in the format of an ini file. it can also handle scanning a directory or list of directories for all files matching a filter pattern. if multiple files or filenames are passed, the files read will be processed in the order they are listed, with sections being merged and options overwriting older ones. if a directory path is passed, the files will be sorted based on the "read_path_order" option and processed in that order. :param tuple delimiters: the delimiter between the key and the value :param tuple comment_prefixes: this is a tuple of characters that if they occur as the first non-whitespace character of a line, the line is a comment :param tuple inline_comment_prefixes: this is a tuple of characters that if they occur elsewhere in the line after a whitespace char, the rest of the line is a comment. :param bool space_around_delimiters: True if space should be added around the delimeters. :param bool strict: if False, duplicate sections will be merged, if True, duplicate sections will raise an error :param read_filenames: a filename or list of file names, assumed to be in the current directory if not otherwise specified for reading. These can also be path/globs and the system will attempt to read all files matching that glob filter. for example, the following are all exampels of valid parameters:: 'myfile.ini' 'dir/myfile.ini' 'dir/\*.ini' ['myfile.ini', 'myotherfile.ini', 'backup_files/myfile_??.ini'] The filename to read from can also be passed during the read operation. :type read_filenames: str or list :param read_path_order: 'alpha' (default) or 'date', the order files will be processed if a path is passed. :param str filename: If used, the single file to read and write to. (cannot be used with read_filenames write_filename.) :param write_filename: the filename to write files to. if None and read_filenames is passed, this will take the first name in the list. if None and read_paths is passed, AND if there is ONLY ONE file in the path that matches the filter, this will use that file. the filename to write to can also be passed during the write operation. :type write_filename: str or None :param bool leave_open: if True, the file objects will be left open while the config manager is loaded. this can speed up file access, but it also uses up file handles, buffers, memory, and has the possibility of corrupted files. :param bool create_files: if False, will not create any files it does not find. :param bool fail_if_no_file: if False, will fail and raise an error if the specified filename is not found. :param bool make_backup_before_writing: if True, the system will make a backup file before writing the configuration. :param str backup_filename: the filename of the backup file. this can have the following formatting keys: '{NUM}' for an incremental number (uses the next available number) '{DATE}' for a date string ('YYYYMMDD') '{STIME}' for a 1 second resolution time string ('HHMMSS') '{MTIME}' for a 1 minute resolution time string {'HHMM') '{NAME}' for the old config file name (without extension) :param str backup_path: if not None (the default) this allows the backup file to be in a different location. :param int max_backup_number: the max number (assuming a backup file and NUM in the filename) :param str encoding: :return: """ storage_type_name = 'INI File' storage_name = 'file' #: the internal name of the storage manager, must be unique _create_files = True _fail_if_no_file = False _make_backup_before_writing = False _backup_filename = '{NAME}_{DATE}_{STIME}.bak' _backup_path = None _max_backup_number = 999 _encoding = None # you should have EITHER a single filename _filename = None # OR file names and sets. _read_filenames = None _read_path_order = 'alpha' _read_path_order_dir = 'asc' _write_filename = None
[docs] def config(self, config_dict): """ :param dict config_dict: a dictionary with storage specific configuration options., This is called after the storage manager is loaded. """ super(ConfigFileStorage, self).config(config_dict=config_dict) self._create_files = config_dict.get('create_files', self._create_files) self._fail_if_no_file = config_dict.get('fail_if_no_file', self._fail_if_no_file) self._make_backup_before_writing = config_dict.get('make_backup_before_writing', self._make_backup_before_writing) self._backup_filename = config_dict.get('backup_filename', self._backup_filename) self._backup_path = config_dict.get('backup_path', self._backup_path) self._max_backup_number = config_dict.get('max_backup_number', self._max_backup_number) self._encoding = config_dict.get('encoding', self._encoding) self._read_path_order = config_dict.get('read_path_order', self._read_path_order) self._read_path_order_dir = config_dict.get('read_path_order_dir', self._read_path_order_dir) self._filename = config_dict.get('filename', self._filename) if self._filename is None: self._read_filenames = config_dict.get('read_filenames', self._read_filenames) self._write_filename = config_dict.get('write_filename', self._write_filename) else: self._read_filenames = [self._filename] self._write_filename = self._filename
@property def get_default_filename(self): tmp_fn = Path(sys.argv[0]) return tmp_fn.with_suffix('.ini')
[docs] def read(self, section_name=None, storage_name=storage_name, files=None, encoding=None, **kwargs): """ will read an ini file and save it to the system :param section_name: :type section_name: str or list :param str storage_name: :param file: :type file: str or FileObject :param str encoding: :return: :rtype: int """ dicts_list = [] if self.data is None: if self._fail_if_no_file: on_does_not_exist = 'raise' else: on_does_not_exist = 'ignore' if encoding is None: encoding = self._encoding if files is None: files = self._read_filenames if files is None: files = self.get_default_filename files = make_list(files) path_list = PathHandler(files, glob_sort_order=self._read_path_order, glob_sort_dir=self._read_path_order_dir, return_type='handle', verify='call', on_does_not_exist=on_does_not_exist, default_open_encoding=encoding) for file in path_list.readable: with file: dicts_list.append(self._parse_list(file, file.name)) else: if isinstance(self.data, str): self.data = self.data.splitlines() dicts_list.append(self._parse_list(self.data, 'passed_file')) out_dict = merge_dictionaries(dicts_list) self._save_dict(out_dict, section_name, storage_name) self.data = None return self.last_section_count, self.last_option_count
def _make_backup(self, filename): """ creates a formatted backup filename. """ dt_date = datetime.now().strftime('%Y%m%d') dt_stime = datetime.now().strftime('%H%M%S') dt_mtime = datetime.now().strftime('%H%M') dest_fn = None name = Path(filename).name format_dict = dict(name=name, date=dt_date, stime=dt_stime, mtime=dt_mtime, num='*') backup_filename = copy.copy(self._backup_filename) backup_filename = backup_filename.format(format_dict) # if the path needs a number, test for it. if '*' in backup_filename: num_key = '{:0' + str(len(str(self._max_backup_number))) + '}' backup_filename = backup_filename.replace('*', num_key) for n in range(self._max_backup_number): tmp_filename = backup_filename.format(num=n) test_path = Path(self._backup_path, tmp_filename) if not test_path.exists(): dest_fn = str(test_path) break else: dest_fn = backup_filename if dest_fn is not None: shutil.copy(filename, dest_fn) else: ip.warning('Destination filename could not be created')
[docs] def write(self, section_name=None, storage_name=storage_name, file=None, encoding=None, **kwargs): """ will write to an INI file. """ self.data = None tmp_dict_to_save = self._get_dict(section_name=section_name, storage_name=storage_name) self.data = self._dict_to_list(tmp_dict_to_save) exists = True if file is None: if self._write_filename is None: filename = self.get_default_filename exists = filename.exists() else: filename = Path(self._write_filename) elif isinstance(file, str): filename = Path(file) exists = filename.exists() if not exists and not self._create_files: raise FileNotFoundError() if exists and self._make_backup_before_writing: # noinspection PyUnboundLocalVariable self._make_backup(filename.name) if file is None: file = filename.open(mode='w', encoding=encoding) for l in self.data: file.write(l) file.close() return self.last_section_count, self.last_option_count
def _handle_error(self, exc, fpname, lineno, line): if not exc: exc = ParsingError(fpname) exc.append(lineno, repr(line)) return exc
[docs]class StorageManagerManager(object): """ A class to handle storage managers """ def __init__(self, config_manager, managers=None, cli_parser_name='cli', cli_manager=None, storage_config=None, default_storage_managers=None): """ :param config_manager: a link to the ConfigurationManager object :param managers: the managers to be registered. The first manager passed will be imported as the default :param cli_parser_name: the name of the cli parser if not 'cli', if None, this will disable CLI parsing. :param cli_manager: None uses the standard CLI Parser, this allows replacement of the default cli manager """ self.config_manager = config_manager self.storage_managers = {} self.manager_list = [] if default_storage_managers is None: self.default_managers = [] else: self.default_managers = default_storage_managers self._storage_config = {} if storage_config is not None: self._storage_config = storage_config if managers is not None: if not isinstance(managers, (list, tuple)): managers = [managers] for a in managers: self.register_storage(a) if cli_parser_name is not None: if cli_parser_name not in self: if cli_manager is None: cli_manager = ConfigCLIStorage cli_manager.storage_name = cli_parser_name self.register_storage(cli_manager) def register_storage(self, storage_manager): ip.debug('registering storage manager') storage_manager = storage_manager() storage_manager.manager = self.config_manager tmp_storage_config = self._storage_config.get(storage_manager.storage_name, dict()) storage_manager.config(tmp_storage_config) self.storage_managers[storage_manager.storage_name] = storage_manager # storage_manager.config(self._storage_config[storage_manager.storage_name]) if self.default_managers is not None and storage_manager.storage_name in self.default_managers: ip.a().debug('is lilsted as a default manager').s() self.manager_list.append(storage_manager) elif not self.default_managers and storage_manager.standard: ip.a().debug('is a standard manager (and no listings present').s() self.manager_list.append(storage_manager) else: ip.a().debug('is not going to be a default manager') ip.a().debug('default managers: ', self.default_managers) ip.debug('standard state:', storage_manager.standard).s(2) self._sort_list() ip.info('Storage Manager [', storage_manager.storage_name, '] registered') def _sort_list(self): self.manager_list.sort(key=lambda x: x.priority) def get(self, tag=None): if tag is None: ip.debug('fetching default storage:') return self.default_manager try: tmp_ret = self.storage_managers[tag] ip.debug('fetching storage for: ', tmp_ret) return tmp_ret except KeyError: ip.debug('storage not found for: ', tag) raise NoStorageManagerError(tag) def get_data(self, tag=None): return self.get(tag).data def set_data(self, data, tag=None): self.get(tag).data = data
[docs] def read(self, sections=None, storage_names=None, override_tags=False, data=None): """ runs the read from storage process for the selected or configured managers :param storage_names: If None, will read from all starnard storage managers, if a string or list, will read from the selected ones following the configured tag settings. :param sections: If None, will read from all sections, if string or list, will read from the selected ones following the configured tag settings. :param override_tags: if True, this will override the configured storage name settings allowing things like exporting the full config etc. :param data: if a single storage name is passed, then data can be passed to that storage manager for saving. this will raise an AssignmentError if data is not None and more than one storage name is passed. """ tmp_section_count = 0 tmp_option_count = 0 tmp_storage_manager_count = 0 tmp_run_list = [] if storage_names is None: tmp_run_list.extend(self.manager_list) else: storage_names = make_list(storage_names) for t in storage_names: tmp_run_list.append(self.get(t)) if data is not None and tmp_run_list: if len(tmp_run_list) == 1: tmp_run_list[0].data = data else: raise AttributeError('Data cannot be passed when reading from multiple storage managers') for s in tmp_run_list: tmp_storage_manager_count += 1 if override_tags: use_tag = '*' else: use_tag = s.storage_name tsc, toc = s.read(sections, use_tag) tmp_section_count += tsc tmp_option_count += toc ip.info('read from storage managers').a() ip.info('sections: ', tmp_section_count) ip.info('options: ', tmp_option_count) ip.info('managers: ', tmp_storage_manager_count).s()
[docs] def write(self, sections=None, storage_names=None, override_tags=False, **kwargs): """ runs the write to storage process for the selected or configured managers :param storage_names: If None, will write to all starnard storage managers, if a string or list, will write to the selected ones following the configured tag settings. :param sections: If None, will write to all sections, if string or list, will write to the selected ones following the configured tag settings. :param override_tags: if True, this will override the configured storage name settings allowing things like exporting the full config etc. :return: if ONLY one storage_name is passed, this will return the data from that manager if present. """ ip.info('writing data to storage locations').a() tmp_run_list = [] tmp_section_count = 0 tmp_option_count = 0 tmp_storage_manager_count = 0 if storage_names is None: tmp_run_list.extend(self.manager_list) ip.debug('using all registered standard managers') else: storage_names = make_list(storage_names) ip.debug('making a list...').a() ip.debug('registered storages: ', self.storage_managers) for t in storage_names: ip.debug('adding: ', self.storage_managers[t].storage_name) tmp_d = self.get(t) ip.debug('test:', tmp_d, ' tag ', t) tmp_run_list.append(self[t]) ip.s().debug('Storages to write to: ', tmp_run_list) for s in tmp_run_list: tmp_storage_manager_count += 1 if override_tags: use_tag = '*' else: use_tag = s.storage_name ip.debug('writing to : ', s).a() tsc, toc = s.write(sections, use_tag, **kwargs) ip.s() tmp_section_count += tsc tmp_option_count += toc ip.info('sections: ', tmp_section_count) ip.info('options: ', tmp_option_count) ip.info('managers: ', tmp_storage_manager_count).s() if len(tmp_run_list) == 1: return tmp_run_list[0].data else: return None
def __call__(self): return self.default_manager def __getitem__(self, item): return self.get(item) def __iter__(self): for s in self.manager_list: yield s