Source code for AdvConfigMgr.config_migrate

__author__ = 'dstrohl'

from AdvConfigMgr.config_exceptions import Error
from AdvConfigMgr.utils import VersionRange, slugify, _UNSET, UnSet
import copy
from fnmatch import fnmatchcase

class ConfigMigrationError(Error):
    def __init__(self, version_from, version_to, section, option, msg):
        msg = 'Error converting [{}.{}]: {}, converting from {} to {}'.format(section, option,
                                                                              msg, version_from, version_to)
        super(ConfigMigrationError, self).__init__(msg)

[docs]class ConfigMigrationManager(object): def __init__(self, section, *migration_dictionaries): self.section = section self.manager = section._manager self.migrations = [] self._options_to_remove = [] self._options_to_add = {} self.live_version = self.section.version self.stored_version = _UNSET self.current_migration = None for md in migration_dictionaries: if not isinstance(md, (list, tuple)): md = [md] for m in md: self.migrations.append(self._parse_migration(m)) self._int_key = self.manager._interpolator.key self._int_sep = self.manager._interpolator.sep self._int_enc = self.manager._interpolator.enc self._int_max_depth = self.manager._interpolator.max_depth
[docs] def set_migration(self, version): """ Will set the version manager to use based on the database version. returns True if a version migration is found, False if no migration is found. """ self.stored_version = version self.current_migration = None for m in self.migrations: if self.stored_version is None: if m['blank_stored_version'] and self.live_version in m['live_version_range']: self.current_migration = m return True else: if version in m['stored_version_range'] and self.live_version in m['live_version_range']: self.current_migration = m return True return False
@property def options_to_remove(self): tmp_ret = copy.copy(self._options_to_remove) self._options_to_remove = None return tmp_ret def migrate_section(self, version, section_dict): if not self.set_migration(version): return section_dict option_list = self.current_migration['actions'] actions = self.current_migration['action_class'] keep_only = self.current_migration['keep_only'] if actions is None: actions = BaseMigrationActions actions = actions(self) tmp_dict = {} for option, value in section_dict.items(): new_option = None new_value = None match = False for option_name, option_args in option_list.items(): if fnmatchcase(option_name, option): match = True action = getattr(actions, option_args['action_name']) if isinstance(option_args['args'], dict): new_option, new_value = action(value, option_name, **option_args['args']) elif isinstance(option_args['args'], (list, tuple)): new_option, new_value = action(value, option_name, *option_args['args']) break if not keep_only and not match: new_option = option new_value = value if new_option is not None: tmp_dict[new_option] = new_value tmp_dict.update(self._options_to_add) return tmp_dict def _xf(self, option_name): return self.section._xf(option_name, glob=True) def _xf_this_sec(self, section): return section is _UNSET or section == or section is None def _parse_migration(self, migration_dict): tmp_stored_version = migration_dict.get('stored_version', _UNSET) if tmp_stored_version is None: blank_stored_version = True elif tmp_stored_version is UnSet: blank_stored_version = False tmp_stored_version = None else: blank_stored_version = False tmp_md = {'stored_version_range': VersionRange(version_class=self.manager._version_class, sup_ver=tmp_stored_version, min_ver=migration_dict.get('stored_version_min', None), max_ver=migration_dict.get('stored_version_max', None)), 'live_version_range': VersionRange(version_class=self.manager._version_class, sup_ver=migration_dict.get('live_version', None), min_ver=migration_dict.get('live_version_min', None), max_ver=migration_dict.get('live_version_max', None)), 'action_class': migration_dict.get('action_class', BaseMigrationActions), 'blank_stored_version': blank_stored_version, 'actions': {}, 'keep_only': migration_dict.get('keep_only', False)} if 'actions' not in migration_dict: raise AttributeError('Section Migration must have actions defined') for a in migration_dict['actions']: tmp_action = {} tmp_a = copy.copy(a) if isinstance(a, dict): tmp_action['action_name'] = '_' + tmp_a.pop('action') tmp_action['option_name'] = tmp_a.pop('option_name') tmp_action['args'] = tmp_a elif isinstance(a, (tuple, list)): tmp_a = list(tmp_a) tmp_action['action_name'] = '_' + tmp_a.pop(0) section, option = self._xf(tmp_a.pop(0)) tmp_action['option_name'] = option tmp_action['args'] = tmp_a else: msg = 'Unknown migration action found: {}'.format(a) raise AttributeError(msg) if not hasattr(tmp_md['action_class'], tmp_action['action_name']): msg = 'Action Class Object does not have the {} action defined'.format(tmp_action['action_name']) raise AttributeError(msg) tmp_md['actions'][tmp_action['option_name']] = tmp_action return tmp_md
[docs]class BaseMigrationActions(object): """ To add new migration / conversion actions, subclass this and add new methods, each method MUST accept as the first two args "value" and "option_name", after that, you can use whatever, though make sure to be consistent with how you define things in the migration dictionary. You must also return a tuple of new_option_name, new_value, or None, None. If your migration requires removeing an option, call :py:meth:`BaseMigrationActions._remove` and it it requires adding a new option, call :py:meth:`BaseMigrationActions._new`. """ def __init__(self, migration_manager): self._migration_manager = migration_manager # ************************************************************************************************************* # **** Defined Actions # ************************************************************************************************************* def _new(self, value, option_name): """ This action should not be used by migration dictionaries and is instead an internal action used by other actions :return: None, None """ self._migration_manager._options_to_add[option_name] = value return None, None def _remove(self, value, option_name): """ Adds a the option to the "remove_options" queue and returns None, None :return: The existing option, value """ self._migration_manager._options_to_remove.append(option_name) return None, None def _copy(self, value, option_name, new_option_name, interpolation_str=None): """ Adds a copy of the option to the "new_options" queue. :return: The existing option, value """ new_value = copy.copy(value) if interpolation_str is not None: new_option_name, new_value = self._interpolate(new_option_name, new_value, interpolation_str) self._new(new_value, new_option_name) return option_name, value def _rename(self, value, option_name, new_option_name, interpolation_str=None): """ adds the option to the "remove options queue" :return: The new option name, value """ self._remove(value, option_name) if interpolation_str is not None: return self._interpolate(new_option_name, value, interpolation_str) else: return new_option_name, value def _pass(self, value, option_name, interpolation_str=None): """ Adds passes the option through the system without modification (unless the interpolation string is passed.) :return: The existing option, value """ if interpolation_str is not None: return self._interpolate(option_name, value, interpolation_str) else: return option_name, value def _interpolate(self, value, option_name, interpolation_str): """ Interpolates the string based on the interpolation rules, this can be called directly or through other actions. :return: The existing option, value """ # manager = self._migration_manager.manager interpolation_str = interpolation_str.replace('%(__current_value__)', value) tmp_new_value = self._migration_manager.manager._interpolator.before_get(, interpolation_str) ''' tmp_new_value = interpolate(interpolation_str, manager, max_depth=self._migration_manager.manager._int_max_depth, key=self._migration_manager.manager._int_key, key_sep=self._migration_manager.manager._int_sep, key_enc=self._migration_manager.manager._int_enc, ''' return option_name, tmp_new_value