arches_extensions.management.commands.indexes
1import logging 2from django.core.management.base import BaseCommand 3 4from arches.app.models.resource import Resource 5from arches.app.models.graph import Graph 6from arches.app.search.search_engine_factory import SearchEngineInstance as se 7 8from arches_extensions.utils import ArchesHelpTextFormatter 9 10logger = logging.getLogger(__name__) 11 12class Command(BaseCommand): 13 """Some commands for helper operations with the ElasticSearch indexes. 14 15 .. warning:: 16 This command is a work-in-progress 17 18 Usage: 19 20 python manage.py indexes [operation] [--index-missing] 21 22 Operations: 23 24 - `check` 25 26 """ 27 28 def __init__(self, *args, **kwargs): 29 self.help = self.__doc__ 30 31 def add_arguments(self, parser): 32 33 parser.formatter_class = ArchesHelpTextFormatter 34 35 parser.add_argument("operation", 36 choices=[ 37 "check", 38 ], 39 help="""OPERATION 40 check: Compare the current ElasticSearch resource index against the ORM objects and prints a list of missing resources to the logs directory 41 """ 42 ) 43 44 parser.add_argument("--index-missing", 45 action="store_true", 46 default=False, 47 help="Attempt to index resources that are missing from index." 48 ) 49 50 def handle(self, *args, **options): 51 52 if options['operation'] == "check": 53 self.check(index_missing=options['index-missing']) 54 55 def check(self, index_missing=False): 56 """ 57 Compare all ES indexes against resources in the ORM (and vice versa). 58 """ 59 60 es_contents = self.get_es_contents() 61 62 graphs = Graph.objects.filter(isresource=True).exclude(name="Arches System Settings") 63 64 for graph in graphs: 65 66 print(graph.name) 67 68 missing = [] 69 uuid_resids = Resource.objects.filter(graph=graph).values_list('resourceinstanceid', flat=True) 70 db_resourceids = set([str(i) for i in uuid_resids]) 71 72 try: 73 es_resourceids = es_contents[str(graph.pk)] 74 except KeyError: 75 es_resourceids = set() 76 77 print(f"- in db: {len(db_resourceids)}") 78 print(f"- in index: {len(es_resourceids)}") 79 if db_resourceids != es_resourceids: 80 es_diff = list(es_resourceids - db_resourceids) 81 if len(es_diff) > 0: 82 print(f" {len(es_diff)} indexed resources not in db:") 83 [print(" " + i) for i in es_diff[:5]] 84 if len(es_diff) > 5: 85 print(" ...") 86 db_diff = list(db_resourceids - es_resourceids) 87 if len(db_diff) > 0: 88 print(f" {len(db_diff)} db resources not in index:") 89 [print(" " + i) for i in db_diff[:5]] 90 if len(db_diff) > 5: 91 print(" ...") 92 if index_missing: 93 print(" indexing these resources now...") 94 for id in db_diff: 95 r = Resource.objects.get(pk=id) 96 try: 97 r.index() 98 except Exception as e: 99 print(e) 100 break 101 102 103 104 def get_es_contents(self): 105 106 summary = dict() 107 for resinfo in self.iterate_all_documents(se, 'resources'): 108 resid, graphid = resinfo 109 if graphid != 'None': 110 if graphid in summary: 111 summary[graphid].add(resid) 112 else: 113 summary[graphid] = set([resid]) 114 return summary 115 116 def iterate_all_documents(self, se, index, pagesize=250, scroll_timeout="1m"): 117 """ 118 Helper to iterate ALL values from a single index. Yields all the documents. 119 https://techoverflow.net/2019/05/07/elasticsearch-how-to-iterate-scroll-through-all-documents-in-index/ 120 """ 121 is_first = True 122 while True: 123 # Scroll next 124 if is_first: # Initialize scroll 125 result = se.search(index=index, scroll="1m", body={ 126 "size": pagesize 127 }) 128 is_first = False 129 else: 130 ## note: need to access the ElasticSearch() instance directly 131 ## here, (.es), because the Arches se object doesn't inherit .scroll() 132 result = se.es.scroll(body={ 133 "scroll_id": scroll_id, 134 "scroll": scroll_timeout 135 }) 136 scroll_id = result["_scroll_id"] 137 hits = result["hits"]["hits"] 138 # Stop after no more docs 139 if not hits: 140 break 141 # Yield each entry 142 yield from ((hit['_source']['resourceinstanceid'], hit['_source']['graph_id']) for hit in hits)
logger =
<Logger arches_extensions.management.commands.indexes (WARNING)>
class
Command(django.core.management.base.BaseCommand):
13class Command(BaseCommand): 14 """Some commands for helper operations with the ElasticSearch indexes. 15 16 .. warning:: 17 This command is a work-in-progress 18 19 Usage: 20 21 python manage.py indexes [operation] [--index-missing] 22 23 Operations: 24 25 - `check` 26 27 """ 28 29 def __init__(self, *args, **kwargs): 30 self.help = self.__doc__ 31 32 def add_arguments(self, parser): 33 34 parser.formatter_class = ArchesHelpTextFormatter 35 36 parser.add_argument("operation", 37 choices=[ 38 "check", 39 ], 40 help="""OPERATION 41 check: Compare the current ElasticSearch resource index against the ORM objects and prints a list of missing resources to the logs directory 42 """ 43 ) 44 45 parser.add_argument("--index-missing", 46 action="store_true", 47 default=False, 48 help="Attempt to index resources that are missing from index." 49 ) 50 51 def handle(self, *args, **options): 52 53 if options['operation'] == "check": 54 self.check(index_missing=options['index-missing']) 55 56 def check(self, index_missing=False): 57 """ 58 Compare all ES indexes against resources in the ORM (and vice versa). 59 """ 60 61 es_contents = self.get_es_contents() 62 63 graphs = Graph.objects.filter(isresource=True).exclude(name="Arches System Settings") 64 65 for graph in graphs: 66 67 print(graph.name) 68 69 missing = [] 70 uuid_resids = Resource.objects.filter(graph=graph).values_list('resourceinstanceid', flat=True) 71 db_resourceids = set([str(i) for i in uuid_resids]) 72 73 try: 74 es_resourceids = es_contents[str(graph.pk)] 75 except KeyError: 76 es_resourceids = set() 77 78 print(f"- in db: {len(db_resourceids)}") 79 print(f"- in index: {len(es_resourceids)}") 80 if db_resourceids != es_resourceids: 81 es_diff = list(es_resourceids - db_resourceids) 82 if len(es_diff) > 0: 83 print(f" {len(es_diff)} indexed resources not in db:") 84 [print(" " + i) for i in es_diff[:5]] 85 if len(es_diff) > 5: 86 print(" ...") 87 db_diff = list(db_resourceids - es_resourceids) 88 if len(db_diff) > 0: 89 print(f" {len(db_diff)} db resources not in index:") 90 [print(" " + i) for i in db_diff[:5]] 91 if len(db_diff) > 5: 92 print(" ...") 93 if index_missing: 94 print(" indexing these resources now...") 95 for id in db_diff: 96 r = Resource.objects.get(pk=id) 97 try: 98 r.index() 99 except Exception as e: 100 print(e) 101 break 102 103 104 105 def get_es_contents(self): 106 107 summary = dict() 108 for resinfo in self.iterate_all_documents(se, 'resources'): 109 resid, graphid = resinfo 110 if graphid != 'None': 111 if graphid in summary: 112 summary[graphid].add(resid) 113 else: 114 summary[graphid] = set([resid]) 115 return summary 116 117 def iterate_all_documents(self, se, index, pagesize=250, scroll_timeout="1m"): 118 """ 119 Helper to iterate ALL values from a single index. Yields all the documents. 120 https://techoverflow.net/2019/05/07/elasticsearch-how-to-iterate-scroll-through-all-documents-in-index/ 121 """ 122 is_first = True 123 while True: 124 # Scroll next 125 if is_first: # Initialize scroll 126 result = se.search(index=index, scroll="1m", body={ 127 "size": pagesize 128 }) 129 is_first = False 130 else: 131 ## note: need to access the ElasticSearch() instance directly 132 ## here, (.es), because the Arches se object doesn't inherit .scroll() 133 result = se.es.scroll(body={ 134 "scroll_id": scroll_id, 135 "scroll": scroll_timeout 136 }) 137 scroll_id = result["_scroll_id"] 138 hits = result["hits"]["hits"] 139 # Stop after no more docs 140 if not hits: 141 break 142 # Yield each entry 143 yield from ((hit['_source']['resourceinstanceid'], hit['_source']['graph_id']) for hit in hits)
Some commands for helper operations with the ElasticSearch indexes.
This command is a work-in-progress
Usage:
python manage.py indexes [operation] [--index-missing]
Operations:
- `check`
def
add_arguments(self, parser):
32 def add_arguments(self, parser): 33 34 parser.formatter_class = ArchesHelpTextFormatter 35 36 parser.add_argument("operation", 37 choices=[ 38 "check", 39 ], 40 help="""OPERATION 41 check: Compare the current ElasticSearch resource index against the ORM objects and prints a list of missing resources to the logs directory 42 """ 43 ) 44 45 parser.add_argument("--index-missing", 46 action="store_true", 47 default=False, 48 help="Attempt to index resources that are missing from index." 49 )
Entry point for subclassed commands to add custom arguments.
def
handle(self, *args, **options):
51 def handle(self, *args, **options): 52 53 if options['operation'] == "check": 54 self.check(index_missing=options['index-missing'])
The actual logic of the command. Subclasses must implement this method.
def
check(self, index_missing=False):
56 def check(self, index_missing=False): 57 """ 58 Compare all ES indexes against resources in the ORM (and vice versa). 59 """ 60 61 es_contents = self.get_es_contents() 62 63 graphs = Graph.objects.filter(isresource=True).exclude(name="Arches System Settings") 64 65 for graph in graphs: 66 67 print(graph.name) 68 69 missing = [] 70 uuid_resids = Resource.objects.filter(graph=graph).values_list('resourceinstanceid', flat=True) 71 db_resourceids = set([str(i) for i in uuid_resids]) 72 73 try: 74 es_resourceids = es_contents[str(graph.pk)] 75 except KeyError: 76 es_resourceids = set() 77 78 print(f"- in db: {len(db_resourceids)}") 79 print(f"- in index: {len(es_resourceids)}") 80 if db_resourceids != es_resourceids: 81 es_diff = list(es_resourceids - db_resourceids) 82 if len(es_diff) > 0: 83 print(f" {len(es_diff)} indexed resources not in db:") 84 [print(" " + i) for i in es_diff[:5]] 85 if len(es_diff) > 5: 86 print(" ...") 87 db_diff = list(db_resourceids - es_resourceids) 88 if len(db_diff) > 0: 89 print(f" {len(db_diff)} db resources not in index:") 90 [print(" " + i) for i in db_diff[:5]] 91 if len(db_diff) > 5: 92 print(" ...") 93 if index_missing: 94 print(" indexing these resources now...") 95 for id in db_diff: 96 r = Resource.objects.get(pk=id) 97 try: 98 r.index() 99 except Exception as e: 100 print(e) 101 break
Compare all ES indexes against resources in the ORM (and vice versa).
def
iterate_all_documents(self, se, index, pagesize=250, scroll_timeout='1m'):
117 def iterate_all_documents(self, se, index, pagesize=250, scroll_timeout="1m"): 118 """ 119 Helper to iterate ALL values from a single index. Yields all the documents. 120 https://techoverflow.net/2019/05/07/elasticsearch-how-to-iterate-scroll-through-all-documents-in-index/ 121 """ 122 is_first = True 123 while True: 124 # Scroll next 125 if is_first: # Initialize scroll 126 result = se.search(index=index, scroll="1m", body={ 127 "size": pagesize 128 }) 129 is_first = False 130 else: 131 ## note: need to access the ElasticSearch() instance directly 132 ## here, (.es), because the Arches se object doesn't inherit .scroll() 133 result = se.es.scroll(body={ 134 "scroll_id": scroll_id, 135 "scroll": scroll_timeout 136 }) 137 scroll_id = result["_scroll_id"] 138 hits = result["hits"]["hits"] 139 # Stop after no more docs 140 if not hits: 141 break 142 # Yield each entry 143 yield from ((hit['_source']['resourceinstanceid'], hit['_source']['graph_id']) for hit in hits)
Helper to iterate ALL values from a single index. Yields all the documents. https://techoverflow.net/2019/05/07/elasticsearch-how-to-iterate-scroll-through-all-documents-in-index/