arches_extensions.managers
1import os 2import imp 3import json 4import uuid 5import logging 6 7from django.db import transaction 8from django.contrib.gis.db.models import UUIDField 9 10from arches.app.models import models 11 12from arches_extensions.utils import ArchesCLIStyles 13 14logger = logging.getLogger(__name__) 15 16class ExtensionManager(): 17 """ A unified manager class for handling all Arches "extensions," like Widgets, DDataType, etc.""" 18 def __init__(self, extension_type=None): 19 self.model_lookup = { 20 "card-component": models.Widget, 21 "datatype": models.DDataType, 22 "etl-module": models.ETLModule, 23 "function": models.Function, 24 "plugin": models.Plugin, 25 "report": models.ReportTemplate, 26 "search-filter": models.SearchComponent, 27 "widget": models.Widget, 28 } 29 30 self.extension_type = extension_type 31 self.model = self._get_model(extension_type) if extension_type else None 32 33 def _get_source_details(self, source_path, ensure_valid_uuid_pk=False): 34 35 ## load details from a python module (functions, datatypes, etc.) 36 if source_path.endswith(".py"): 37 38 try: 39 source = imp.load_source("", source_path) 40 ## more precise exception handling would be good here 41 except Exception as e: 42 raise(e) 43 details = source.details 44 45 ## load details form a json file (widgets, card_components, etc.) 46 elif source_path.endswith(".json"): 47 48 with open(source_path) as f: 49 details = json.load(f) 50 else: 51 raise Exception(f"Invalid source path: {source_path}") 52 53 if ensure_valid_uuid_pk is True: 54 details = self._ensure_valid_uuid_pk(details) 55 56 return details 57 58 def _ensure_valid_uuid_pk(self, details): 59 60 if isinstance(self.model._meta.pk, UUIDField): 61 val = details.get(self.model._meta.pk.name) 62 try: 63 uuid.UUID(val) 64 except ValueError: 65 details[self.model._meta.pk.name] = str(uuid.uuid4()) 66 return details 67 68 def _get_model(self, extension_type): 69 70 if extension_type in self.model_lookup: 71 return self.model_lookup[extension_type] 72 else: 73 raise Exception(f"Extension type not supported: {extension_type}") 74 75 def _get_instance(self, name): 76 77 try: 78 if self.extension_type == "datatype": 79 instance = self.model.objects.get(datatype=name) 80 else: 81 instance = self.model.objects.get(name=name) 82 return instance 83 except self.model.DoesNotExist: 84 raise Exception(f"Can't find {self.extension_type}: {name}") 85 86 def register(self, source, overwrite=False): 87 """ 88 Registers a new extension in the database based on the provided source 89 """ 90 91 details = self._get_source_details(source, ensure_valid_uuid_pk=True) 92 93 # pop the pk value so the details can be iterated later without it 94 pk_field = self.model._meta.pk.name 95 pk_val = details.pop(pk_field) 96 pk_qry = {self.model._meta.pk.name: pk_val} 97 98 with transaction.atomic(): 99 if overwrite is True and self.model.objects.filter(**pk_qry).exists(): 100 instance = self.model.objects.get(**pk_qry) 101 else: 102 try: 103 instance = self.model(**pk_qry) 104 except Exception as e: 105 print(e) 106 raise e 107 108 # some extensions are irregular and the details need to be altered a little bit 109 # before they can be saved to the instance. Generally these attribute definitions 110 # are taken from existing commands. 111 if self.extension_type == "datatype": 112 details['modulename'] = os.path.basename(source) 113 details['issearchable'] = details.get("issearchable", False) 114 if self.extension_type == "function": 115 details['modulename'] = os.path.basename(source) 116 if self.extension_type == "report": 117 details['preload_resource_data'] = details.get("preload_resource_data", True) 118 119 # finally, set the attributes from all of the details values 120 for k, v in details.items(): 121 setattr(instance, k, v) 122 123 try: 124 instance.save() 125 except Exception as e: 126 logger.error(e) 127 raise e 128 129 def unregister(self, name): 130 """ 131 Removes an extension of the specified type from the database 132 """ 133 134 try: 135 instance = self._get_instance(name) 136 instance.delete() 137 except Exception as e: 138 logger.error(e) 139 raise e 140 141 def set_active(self, name, active=True): 142 143 if self.extension_type in ["etl-module", "plugin"]: 144 instance = self._get_instance(name) 145 instance.config['show'] = active 146 instance.save() 147 elif self.extension_type == "search-filter": 148 instance = self._get_instance(name) 149 instance.enabled = active 150 instance.save() 151 else: 152 logger.warn(f"This operation not supported for {self.extension_type}.") 153 154 def print_list(self): 155 s = ArchesCLIStyles() 156 try: 157 instances = self.model.objects.all() 158 for n, instance in enumerate(instances, start=1): 159 if self.extension_type == "datatype": 160 name = instance.datatype 161 else: 162 name = instance.name 163 # for certain extension types that can be disabled, prepend name with status 164 active_str = f"{s.fg.green}active{s.reset}" 165 inactive_str = f"{s.fg.red}inactive{s.reset}" 166 if self.extension_type in ["etl-module", "plugin"]: 167 name = f"{active_str if instance.config['show'] is True else inactive_str} {name}" 168 if self.extension_type in ["search-filter"]: 169 name = f"{active_str if instance.enabled is True else inactive_str} {name}" 170 print(name) 171 print(f"---\nregistered {self.extension_type} count: {instances.count()}") 172 except Exception as e: 173 print(s.error(e)) 174 raise e
logger =
<Logger arches_extensions.managers (WARNING)>
class
ExtensionManager:
17class ExtensionManager(): 18 """ A unified manager class for handling all Arches "extensions," like Widgets, DDataType, etc.""" 19 def __init__(self, extension_type=None): 20 self.model_lookup = { 21 "card-component": models.Widget, 22 "datatype": models.DDataType, 23 "etl-module": models.ETLModule, 24 "function": models.Function, 25 "plugin": models.Plugin, 26 "report": models.ReportTemplate, 27 "search-filter": models.SearchComponent, 28 "widget": models.Widget, 29 } 30 31 self.extension_type = extension_type 32 self.model = self._get_model(extension_type) if extension_type else None 33 34 def _get_source_details(self, source_path, ensure_valid_uuid_pk=False): 35 36 ## load details from a python module (functions, datatypes, etc.) 37 if source_path.endswith(".py"): 38 39 try: 40 source = imp.load_source("", source_path) 41 ## more precise exception handling would be good here 42 except Exception as e: 43 raise(e) 44 details = source.details 45 46 ## load details form a json file (widgets, card_components, etc.) 47 elif source_path.endswith(".json"): 48 49 with open(source_path) as f: 50 details = json.load(f) 51 else: 52 raise Exception(f"Invalid source path: {source_path}") 53 54 if ensure_valid_uuid_pk is True: 55 details = self._ensure_valid_uuid_pk(details) 56 57 return details 58 59 def _ensure_valid_uuid_pk(self, details): 60 61 if isinstance(self.model._meta.pk, UUIDField): 62 val = details.get(self.model._meta.pk.name) 63 try: 64 uuid.UUID(val) 65 except ValueError: 66 details[self.model._meta.pk.name] = str(uuid.uuid4()) 67 return details 68 69 def _get_model(self, extension_type): 70 71 if extension_type in self.model_lookup: 72 return self.model_lookup[extension_type] 73 else: 74 raise Exception(f"Extension type not supported: {extension_type}") 75 76 def _get_instance(self, name): 77 78 try: 79 if self.extension_type == "datatype": 80 instance = self.model.objects.get(datatype=name) 81 else: 82 instance = self.model.objects.get(name=name) 83 return instance 84 except self.model.DoesNotExist: 85 raise Exception(f"Can't find {self.extension_type}: {name}") 86 87 def register(self, source, overwrite=False): 88 """ 89 Registers a new extension in the database based on the provided source 90 """ 91 92 details = self._get_source_details(source, ensure_valid_uuid_pk=True) 93 94 # pop the pk value so the details can be iterated later without it 95 pk_field = self.model._meta.pk.name 96 pk_val = details.pop(pk_field) 97 pk_qry = {self.model._meta.pk.name: pk_val} 98 99 with transaction.atomic(): 100 if overwrite is True and self.model.objects.filter(**pk_qry).exists(): 101 instance = self.model.objects.get(**pk_qry) 102 else: 103 try: 104 instance = self.model(**pk_qry) 105 except Exception as e: 106 print(e) 107 raise e 108 109 # some extensions are irregular and the details need to be altered a little bit 110 # before they can be saved to the instance. Generally these attribute definitions 111 # are taken from existing commands. 112 if self.extension_type == "datatype": 113 details['modulename'] = os.path.basename(source) 114 details['issearchable'] = details.get("issearchable", False) 115 if self.extension_type == "function": 116 details['modulename'] = os.path.basename(source) 117 if self.extension_type == "report": 118 details['preload_resource_data'] = details.get("preload_resource_data", True) 119 120 # finally, set the attributes from all of the details values 121 for k, v in details.items(): 122 setattr(instance, k, v) 123 124 try: 125 instance.save() 126 except Exception as e: 127 logger.error(e) 128 raise e 129 130 def unregister(self, name): 131 """ 132 Removes an extension of the specified type from the database 133 """ 134 135 try: 136 instance = self._get_instance(name) 137 instance.delete() 138 except Exception as e: 139 logger.error(e) 140 raise e 141 142 def set_active(self, name, active=True): 143 144 if self.extension_type in ["etl-module", "plugin"]: 145 instance = self._get_instance(name) 146 instance.config['show'] = active 147 instance.save() 148 elif self.extension_type == "search-filter": 149 instance = self._get_instance(name) 150 instance.enabled = active 151 instance.save() 152 else: 153 logger.warn(f"This operation not supported for {self.extension_type}.") 154 155 def print_list(self): 156 s = ArchesCLIStyles() 157 try: 158 instances = self.model.objects.all() 159 for n, instance in enumerate(instances, start=1): 160 if self.extension_type == "datatype": 161 name = instance.datatype 162 else: 163 name = instance.name 164 # for certain extension types that can be disabled, prepend name with status 165 active_str = f"{s.fg.green}active{s.reset}" 166 inactive_str = f"{s.fg.red}inactive{s.reset}" 167 if self.extension_type in ["etl-module", "plugin"]: 168 name = f"{active_str if instance.config['show'] is True else inactive_str} {name}" 169 if self.extension_type in ["search-filter"]: 170 name = f"{active_str if instance.enabled is True else inactive_str} {name}" 171 print(name) 172 print(f"---\nregistered {self.extension_type} count: {instances.count()}") 173 except Exception as e: 174 print(s.error(e)) 175 raise e
A unified manager class for handling all Arches "extensions," like Widgets, DDataType, etc.
ExtensionManager(extension_type=None)
19 def __init__(self, extension_type=None): 20 self.model_lookup = { 21 "card-component": models.Widget, 22 "datatype": models.DDataType, 23 "etl-module": models.ETLModule, 24 "function": models.Function, 25 "plugin": models.Plugin, 26 "report": models.ReportTemplate, 27 "search-filter": models.SearchComponent, 28 "widget": models.Widget, 29 } 30 31 self.extension_type = extension_type 32 self.model = self._get_model(extension_type) if extension_type else None
def
register(self, source, overwrite=False):
87 def register(self, source, overwrite=False): 88 """ 89 Registers a new extension in the database based on the provided source 90 """ 91 92 details = self._get_source_details(source, ensure_valid_uuid_pk=True) 93 94 # pop the pk value so the details can be iterated later without it 95 pk_field = self.model._meta.pk.name 96 pk_val = details.pop(pk_field) 97 pk_qry = {self.model._meta.pk.name: pk_val} 98 99 with transaction.atomic(): 100 if overwrite is True and self.model.objects.filter(**pk_qry).exists(): 101 instance = self.model.objects.get(**pk_qry) 102 else: 103 try: 104 instance = self.model(**pk_qry) 105 except Exception as e: 106 print(e) 107 raise e 108 109 # some extensions are irregular and the details need to be altered a little bit 110 # before they can be saved to the instance. Generally these attribute definitions 111 # are taken from existing commands. 112 if self.extension_type == "datatype": 113 details['modulename'] = os.path.basename(source) 114 details['issearchable'] = details.get("issearchable", False) 115 if self.extension_type == "function": 116 details['modulename'] = os.path.basename(source) 117 if self.extension_type == "report": 118 details['preload_resource_data'] = details.get("preload_resource_data", True) 119 120 # finally, set the attributes from all of the details values 121 for k, v in details.items(): 122 setattr(instance, k, v) 123 124 try: 125 instance.save() 126 except Exception as e: 127 logger.error(e) 128 raise e
Registers a new extension in the database based on the provided source
def
unregister(self, name):
130 def unregister(self, name): 131 """ 132 Removes an extension of the specified type from the database 133 """ 134 135 try: 136 instance = self._get_instance(name) 137 instance.delete() 138 except Exception as e: 139 logger.error(e) 140 raise e
Removes an extension of the specified type from the database
def
set_active(self, name, active=True):
142 def set_active(self, name, active=True): 143 144 if self.extension_type in ["etl-module", "plugin"]: 145 instance = self._get_instance(name) 146 instance.config['show'] = active 147 instance.save() 148 elif self.extension_type == "search-filter": 149 instance = self._get_instance(name) 150 instance.enabled = active 151 instance.save() 152 else: 153 logger.warn(f"This operation not supported for {self.extension_type}.")
def
print_list(self):
155 def print_list(self): 156 s = ArchesCLIStyles() 157 try: 158 instances = self.model.objects.all() 159 for n, instance in enumerate(instances, start=1): 160 if self.extension_type == "datatype": 161 name = instance.datatype 162 else: 163 name = instance.name 164 # for certain extension types that can be disabled, prepend name with status 165 active_str = f"{s.fg.green}active{s.reset}" 166 inactive_str = f"{s.fg.red}inactive{s.reset}" 167 if self.extension_type in ["etl-module", "plugin"]: 168 name = f"{active_str if instance.config['show'] is True else inactive_str} {name}" 169 if self.extension_type in ["search-filter"]: 170 name = f"{active_str if instance.enabled is True else inactive_str} {name}" 171 print(name) 172 print(f"---\nregistered {self.extension_type} count: {instances.count()}") 173 except Exception as e: 174 print(s.error(e)) 175 raise e