Skip to content

API Reference

Database

Bases: StandardDatabase

Serves similar to SQLAlchemy's session object with the exception that it also allows creating and dropping collections etc.

Source code in arango_orm/database.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
class Database(ArangoDatabase):
    """
    Serves similar to SQLAlchemy's session object with the exception that it
    also allows creating and dropping collections etc.
    """

    def __init__(self, db: ArangoDatabase):
        """Create database instance."""
        self._db: ArangoDatabase = db
        super(Database, self).__init__(db._conn)

    #         super(Database, self).__init__(
    #             connection=connection,
    #             executor=DefaultExecutor(connection)
    # )

    def _verify_collection(self, col) -> bool:
        """
        Verifies that col is a collection class or object.
        """

        CollectionClass = None

        if isclass(col):
            CollectionClass = col
        else:
            CollectionClass = col.__class__

        if CollectionClass is Collection or issubclass(CollectionClass, Collection):
            return col.__collection__ is not None

        return False

    def _verify_relation(self, col) -> bool:
        """
        Verifies that col is a relation class or object.
        """

        CollectionClass = None

        if isclass(col):
            CollectionClass = col
        else:
            CollectionClass = col.__class__

        if CollectionClass is Relation or issubclass(CollectionClass, Relation):
            return col.__collection__ is not None

        return False

    def _entity_pre_process(self, data: dict) -> None:
        "Clean up data dict before add/update into the db."
        for k in ("_key", "_rev"):
            if k in data and data[k] is None:
                del data[k]

    def _entity_post_process(self, entity: Collection, result: dict) -> None:
        "Update entity after add/update."
        for k in (("key_", "_key"), ("rev_", "_rev")):
            if result.get(k[1], None) is not None:
                setattr(entity, k[0], result[k[1]])
                entity._dirty.remove(k[0])

    def has_collection(self, collection):
        "Confirm that the given collection class or collection name exists in the db"

        collection_name = None

        if isclass(collection) and hasattr(collection, "__collection__"):
            collection_name = collection.__collection__

        elif isinstance(collection, str):
            collection_name = collection

        assert collection_name is not None

        return self._db.has_collection(collection_name)

    def create_collection(self, collection: Collection):
        "Create a collection"

        self._verify_collection(collection)

        col_args = {}
        if "col_args" in collection._collection_config:
            col_args = collection._collection_config["col_args"]

        if self._verify_relation(collection) and "edge" not in col_args:
            col_args["edge"] = True

        col = super(Database, self).create_collection(name=collection.__collection__, **col_args)

        if "indexes" in collection._collection_config:
            for index in collection._collection_config["indexes"]:
                # index_type: Literal["hash", "fulltext", "skiplist", "geo", "persistent", "ttl"]
                # fields: list[str]
                # unique: bool
                # sparse: bool
                index_create_method_name = "add_{}_index".format(index["index_type"])

                d = deepcopy(index)
                del d["index_type"]

                # create the index
                getattr(col, index_create_method_name)(**d)

    def drop_collection(self, collection):
        "Drop a collection"
        self._verify_collection(collection)

        super(Database, self).delete_collection(name=collection.__collection__)

    def has(self, collection, key):
        """Check if the document with key exists in the given collection."""

        return self._db.collection(collection.__collection__).has(key)

    def exists(self, document):
        """
        Check if document exists in database.

        Similar to has but takes in a document object and searches
        using it's _key.
        """

        return self._db.collection(document.__collection__).has(document.key_)

    def add(self, entity: Collection, if_present: Literal["ignore", "update"] = None):
        """
        Add a record to a collection.

        :param if_present: Can be None, 'ignore' or 'update'.
            In case of None, if the document is already present then
            arango.exceptions.DocumentInsertError is raised. 'ignore' ignores
            raising the exception. 'update' updates the document if it already
            exists.
        """
        assert if_present in [None, "ignore", "update"]
        if if_present and getattr(entity, "key_", None):
            # for these cases, first check if document exists
            if self.exists(entity):
                if if_present == "ignore":
                    setattr(entity, "_db", self)
                    return entity

                elif if_present == "update":
                    return self.update(entity)

        data_json = entity.model_dump(mode="json", by_alias=True)

        self._entity_pre_process(data_json)
        dispatch(entity, "pre_add", db=self)

        collection = self._db.collection(entity.__collection__)
        setattr(entity, "_db", self)
        res = collection.insert(data_json)

        self._entity_post_process(entity, res)
        entity._dirty.clear()

        dispatch(entity, "post_add", db=self, result=entity)
        return entity

    def bulk_add(self, entity_list, only_dirty=False, **kwargs):
        """
        Add all provided documents, attaching generated _key to entities if generated
        :param entity_list: List of Collection/Relationship objects
        :return: { collection_name : {
                collection_model: Collection class,
                entity_obj_list: Collection instance,
                entity_dict_list: dict
                }
            }
        """
        collections = {}
        for entity in entity_list:
            collection_model = self._db.collection(entity.__collection__)
            data = entity.model_dump(mode="json", by_alias=True)

            if only_dirty:
                if not entity._dirty:
                    return entity

                data = {k: v for k, v in data.items() if k == "_key" or k in entity._dirty}

            # Clean data dict
            self._entity_pre_process(data)
            dispatch(entity, "pre_update", db=self)

            collection_dict = collections.get(entity.__collection__, dict())
            entity_dict_list = collection_dict.get("entity_dict_list", list())
            entity_obj_list = collection_dict.get("entity_obj_list", list())

            entity_dict_list.append(data)
            entity_obj_list.append(entity)

            collection_dict["entity_dict_list"] = entity_dict_list
            collection_dict["entity_obj_list"] = entity_obj_list
            collection_dict["collection_model"] = collection_model

            collections[entity.__collection__] = collection_dict
            setattr(entity, "_db", self)
            entity._dirty.clear()

        for collection, data in collections.items():
            collection_model = data.get("collection_model")
            entity_dict_list = data.get("entity_dict_list")
            entity_obj_list = data.get("entity_obj_list")

            res = collection_model.insert_many(entity_dict_list, **kwargs)
            for num, entity in enumerate(entity_obj_list, start=0):
                entity._dirty.clear()
                self._entity_post_process(entity, res[num])
                dispatch(entity, "post_add", db=self, result=entity)

        return collections

    def delete(self, entity: Collection, **kwargs):
        """Delete given document."""
        dispatch(entity, "pre_delete", db=self)

        collection = self._db.collection(entity.__collection__)
        collection.delete(entity.key_, **kwargs)

        dispatch(entity, "post_delete", db=self, result=entity)
        return entity

    def bulk_delete(self, entity_list, **kwargs):
        """Bulk delete utility, based on delete method. Return a list of results."""
        res = []
        for entity in entity_list:
            res.append(self.delete(entity, **kwargs))
        return res

    def update(self, entity, only_dirty=False, **kwargs):
        "Update given document"
        collection = self._db.collection(entity.__collection__)
        data = {}

        if only_dirty:
            if not entity._dirty:
                return entity
            dispatch(entity, "pre_update", db=self)  # In case of updates to fields
            data = {
                k: v
                for k, v in entity.model_dump(mode="json", by_alias=True).items()
                if k == "_key" or k in entity._dirty
            }
        else:
            dispatch(entity, "pre_update", db=self)
            data = entity.model_dump(mode="json", by_alias=True)

        setattr(entity, "_db", self)
        self._entity_pre_process(data)

        res = collection.update(data, **kwargs)

        entity._dirty.clear()
        self._entity_post_process(entity, res)
        dispatch(entity, "post_update", db=self, result=entity)

        return entity

    def bulk_update(self, entity_list, only_dirty=False, **kwargs):
        """
        Update all provided documents
        :param entity_list: List of Collection/Relationship objects
        :return: { collection_name : {
                collection_model: Collection class,
                entity_obj_list: Collection instance,
                entity_dict_list: dict
                }
            }
        """

        collections = {}
        for entity in entity_list:
            collection_model = self._db.collection(entity.__collection__)
            data = {}

            if only_dirty:
                if not entity._dirty:
                    return entity

                dispatch(entity, "pre_update", db=self)  # In case of updates to fields
                data = {
                    k: v
                    for k, v in entity.model_dump(mode="json", by_alias=True).items()
                    if k == "_key" or k in entity._dirty
                }
            else:
                dispatch(entity, "pre_update", db=self)
                data = entity.model_dump(mode="json", by_alias=True)

            # dispatch(entity, 'pre_update', db=self)
            collection_dict = collections.get(entity.__collection__, dict())
            entity_dict_list = collection_dict.get("entity_dict_list", list())
            entity_obj_list = collection_dict.get("entity_obj_list", list())

            self._entity_pre_process(data)

            entity_dict_list.append(data)
            entity_obj_list.append(entity)

            collection_dict["entity_dict_list"] = entity_dict_list
            collection_dict["entity_obj_list"] = entity_obj_list
            collection_dict["collection_model"] = collection_model

            collections[entity.__collection__] = collection_dict
            setattr(entity, "_db", self)
            # entity._dirty.clear()

        for _, data in collections.items():
            collection_model = data.get("collection_model")
            entity_dict_list = data.get("entity_dict_list")
            entity_obj_list = data.get("entity_obj_list")

            res = collection_model.update_many(entity_dict_list, **kwargs)
            for num, entity in enumerate(entity_obj_list, start=0):
                entity._dirty.clear()
                self._entity_post_process(entity, res[num])
                dispatch(entity, "post_update", db=self, result=entity)

        return collections

    def query(self, CollectionClass) -> Query:
        "Query given collection"

        return Query(CollectionClass, self)

    def create_graph(self, graph_object: Graph, **kwargs):
        """
        Create a named graph from given graph object
        Optionally can provide a list of collection names as ignore_collections
        so those collections are not created
        """

        graph_edge_definitions = []

        # Create collections manually here so we also create indices
        # defined within the collection class. If we let the create_graph
        # call create the collections, it won't create the indices
        for _, col_obj in graph_object.vertices.items():
            if (
                "ignore_collections" in kwargs
                and col_obj.__collection__ in kwargs["ignore_collections"]
            ):
                continue

            try:
                self.create_collection(col_obj)
            except Exception:
                log.warning(
                    "Error creating collection %s, it probably already exists",
                    col_obj.__collection__,
                )

        for _, rel_obj in graph_object.edges.items():
            if (
                "ignore_collections" in kwargs
                and rel_obj.__collection__ in kwargs["ignore_collections"]
            ):
                continue

            try:
                self.create_collection(rel_obj)
            except Exception:
                log.warning(
                    "Error creating edge collection %s, it probably already exists",
                    rel_obj.__collection__,
                )

        for rel_name, relation_obj in graph_object.edges.items():
            cols_from = graph_object.edge_cols_from[rel_name]
            cols_to = graph_object.edge_cols_to[rel_name]

            from_col_names = [col.__collection__ for col in cols_from]
            to_col_names = [col.__collection__ for col in cols_to]

            graph_edge_definitions.append(
                {
                    "edge_collection": relation_obj.__collection__,
                    "from_vertex_collections": from_col_names,
                    "to_vertex_collections": to_col_names,
                }
            )

        self._db.create_graph(graph_object.__graph__, graph_edge_definitions)

    def drop_graph(self, graph_object, drop_collections=True, **kwargs):
        """
        Drop a graph.

        If drop_collections is True, drop all vertices and edges
        too. Optionally can provide a list of collection names as
        ignore_collections so those collections are not dropped
        """
        self._db.delete_graph(
            graph_object.__graph__,
            ignore_missing=True,
            drop_collections=drop_collections,
        )

    def update_graph(self, graph_object: Graph, graph_info=None):
        """
        Update existing graph object by adding collections and edge collections
        that are present in graph definition but not present within the graph
        in the database.

        Note: We delete edge definitions if they no longer exist in the graph
        class but we don't drop collections
        """

        if graph_info is None:
            graph_info = self._get_graph_info(graph_object)

        # Create collections manually here so we also create indices
        # defined within the collection class. If we let the create_graph
        # call create the collections, it won't create the indices
        existing_collection_names = [c["name"] for c in self.collections()]
        for _, col_obj in graph_object.vertices.items():
            try:
                if col_obj.__collection__ in existing_collection_names:
                    log.debug("Collection %s already exists", col_obj.__collection__)
                    continue

                log.info("+ Creating collection %s", col_obj.__collection__)
                self.create_collection(col_obj)

            except Exception:
                log.warning(
                    "Error creating collection %s, it probably already exists",
                    col_obj.__collection__,
                )

        for _, rel_obj in graph_object.edges.items():
            try:
                if rel_obj.__collection__ in existing_collection_names:
                    log.debug("Collection %s already exists", rel_obj.__collection__)
                    continue

                log.info("+ Creating edge collection %s", rel_obj.__collection__)
                self.create_collection(rel_obj, edge=True)
            except Exception:
                log.warning(
                    "Error creating edge collection %s, it probably already exists",
                    rel_obj.__collection__,
                )

        existing_edges = dict(
            [(e["edge_collection"], e) for e in graph_object._graph.edge_definitions()]
        )

        for rel_name, relation in graph_object.edges.items():
            cols_from = graph_object.edge_cols_from[rel_name]
            cols_to = graph_object.edge_cols_to[rel_name]

            from_col_names = [col.__collection__ for col in cols_from]
            to_col_names = [col.__collection__ for col in cols_to]

            edge_definition = {
                "edge_collection": relation.__collection__,
                "from_vertex_collections": from_col_names,
                "to_vertex_collections": to_col_names,
            }

            # if edge does not already exist, create it
            if edge_definition["edge_collection"] not in existing_edges:
                log.info("  + creating graph edge definition: %r", edge_definition)
                graph_object._graph.create_edge_definition(**edge_definition)
            else:
                # if edge definition exists, see if it needs updating
                # compare edges
                if not self._is_same_edge(
                    edge_definition,
                    existing_edges[edge_definition["edge_collection"]],
                ):
                    # replace_edge_definition
                    log.info(
                        "  graph edge definition modified, updating:\n new: %r\n old: %r",
                        edge_definition,
                        existing_edges[edge_definition["edge_collection"]],
                    )
                    graph_object._graph.replace_edge_definition(**edge_definition)

        # Remove any edge definitions that are present in DB but not in graph definition
        graph_connections = dict(
            [(gc.relation.__collection__, gc) for gc in graph_object.graph_connections]
        )

        for edge_name, ee in existing_edges.items():
            if edge_name not in graph_connections:
                log.warning(
                    "  - dropping edge no longer present in graph definition. "
                    "Please drop the edge and vertex collections manually if you no "
                    "longer need them: \n%s",
                    ee,
                )

                graph_object._graph.delete_edge_definition(edge_name)

    def _is_same_edge(self, e1, e2):
        """
        Compare given edge dicts and return True if both dicts have same keys and values else
        return False
        """

        # {'name': 'dns_info', 'to_collections': ['domains'], 'from_collections': ['dns_records']}
        assert e1["edge_collection"] == e2["edge_collection"]

        if len(e1["to_vertex_collections"]) != len(e2["to_vertex_collections"]) or len(
            e1["from_vertex_collections"]
        ) != len(e2["from_vertex_collections"]):
            return False

        else:
            # if same length compare values
            for cname in e1["to_vertex_collections"]:
                if cname not in e2["to_vertex_collections"]:
                    return False

            for cname in e1["from_vertex_collections"]:
                if cname not in e2["from_vertex_collections"]:
                    return False

        return True

    def _get_graph_info(self, graph_obj):
        graphs_info = self.graphs()
        for gi in graphs_info:
            if gi["name"] == graph_obj.__graph__:
                return gi

        return None

    def create_all(self, db_objects):
        """
        Create all objects (collections, relations and graphs).

        Create all objects present in the db_objects list.
        """
        # Collect all graphs
        graph_objs = [obj for obj in db_objects if hasattr(obj, "__graph__")]

        for graph_obj in graph_objs:
            graph_info = self._get_graph_info(graph_obj)
            if not graph_info:
                # graph does not exist, create it
                log.info("Creating graph %s", graph_obj.__graph__)
                graph_instance = graph_obj(connection=self)
                self.create_graph(graph_instance)
            else:
                # Graph exists, determine changes and update graph accordingly
                log.debug("Graph %s already exists", graph_obj.__graph__)
                graph_instance = graph_obj(connection=self)
                self.update_graph(graph_instance, graph_info)

        exclude_collections = [c["name"] for c in self._db.collections()]

        for obj in db_objects:
            if hasattr(obj, "__bases__") and Collection in obj.__bases__:
                if obj.__collection__ not in exclude_collections:
                    log.info("Creating collection %s", obj.__collection__)
                    self.create_collection(obj)
                else:
                    log.debug("Collection %s already exists", obj.__collection__)

    def drop_all(self, db_objects):
        """
        Drop all objects (collections, relations and graphs).

        Drop all objects present in the db_objects list.
        """
        # Collect all graphs
        graph_objs = [obj for obj in db_objects if hasattr(obj, "__graph__")]

        for graph_obj in graph_objs:
            graph_info = self._get_graph_info(graph_obj)
            if graph_info:
                # graph exists, drop it
                log.info("Dropping graph %s", graph_obj.__graph__)
                graph_instance = graph_obj(connection=self)
                self.drop_graph(graph_instance)
            else:
                # Graph exists, determine changes and update graph accordingly
                log.debug("Graph %s does not exist", graph_obj.__graph__)

        for obj in db_objects:
            if hasattr(obj, "__bases__") and Collection in obj.__bases__:
                try:
                    self.drop_collection(obj)
                except CollectionDeleteError:
                    log.debug(
                        "Not deleting missing collection: %s",
                        obj.__collection__,
                    )

__init__(db)

Create database instance.

Source code in arango_orm/database.py
def __init__(self, db: ArangoDatabase):
    """Create database instance."""
    self._db: ArangoDatabase = db
    super(Database, self).__init__(db._conn)

add(entity, if_present=None)

Add a record to a collection.

:param if_present: Can be None, 'ignore' or 'update'. In case of None, if the document is already present then arango.exceptions.DocumentInsertError is raised. 'ignore' ignores raising the exception. 'update' updates the document if it already exists.

Source code in arango_orm/database.py
def add(self, entity: Collection, if_present: Literal["ignore", "update"] = None):
    """
    Add a record to a collection.

    :param if_present: Can be None, 'ignore' or 'update'.
        In case of None, if the document is already present then
        arango.exceptions.DocumentInsertError is raised. 'ignore' ignores
        raising the exception. 'update' updates the document if it already
        exists.
    """
    assert if_present in [None, "ignore", "update"]
    if if_present and getattr(entity, "key_", None):
        # for these cases, first check if document exists
        if self.exists(entity):
            if if_present == "ignore":
                setattr(entity, "_db", self)
                return entity

            elif if_present == "update":
                return self.update(entity)

    data_json = entity.model_dump(mode="json", by_alias=True)

    self._entity_pre_process(data_json)
    dispatch(entity, "pre_add", db=self)

    collection = self._db.collection(entity.__collection__)
    setattr(entity, "_db", self)
    res = collection.insert(data_json)

    self._entity_post_process(entity, res)
    entity._dirty.clear()

    dispatch(entity, "post_add", db=self, result=entity)
    return entity

bulk_add(entity_list, only_dirty=False, **kwargs)

Add all provided documents, attaching generated _key to entities if generated :param entity_list: List of Collection/Relationship objects :return: { collection_name : { collection_model: Collection class, entity_obj_list: Collection instance, entity_dict_list: dict } }

Source code in arango_orm/database.py
def bulk_add(self, entity_list, only_dirty=False, **kwargs):
    """
    Add all provided documents, attaching generated _key to entities if generated
    :param entity_list: List of Collection/Relationship objects
    :return: { collection_name : {
            collection_model: Collection class,
            entity_obj_list: Collection instance,
            entity_dict_list: dict
            }
        }
    """
    collections = {}
    for entity in entity_list:
        collection_model = self._db.collection(entity.__collection__)
        data = entity.model_dump(mode="json", by_alias=True)

        if only_dirty:
            if not entity._dirty:
                return entity

            data = {k: v for k, v in data.items() if k == "_key" or k in entity._dirty}

        # Clean data dict
        self._entity_pre_process(data)
        dispatch(entity, "pre_update", db=self)

        collection_dict = collections.get(entity.__collection__, dict())
        entity_dict_list = collection_dict.get("entity_dict_list", list())
        entity_obj_list = collection_dict.get("entity_obj_list", list())

        entity_dict_list.append(data)
        entity_obj_list.append(entity)

        collection_dict["entity_dict_list"] = entity_dict_list
        collection_dict["entity_obj_list"] = entity_obj_list
        collection_dict["collection_model"] = collection_model

        collections[entity.__collection__] = collection_dict
        setattr(entity, "_db", self)
        entity._dirty.clear()

    for collection, data in collections.items():
        collection_model = data.get("collection_model")
        entity_dict_list = data.get("entity_dict_list")
        entity_obj_list = data.get("entity_obj_list")

        res = collection_model.insert_many(entity_dict_list, **kwargs)
        for num, entity in enumerate(entity_obj_list, start=0):
            entity._dirty.clear()
            self._entity_post_process(entity, res[num])
            dispatch(entity, "post_add", db=self, result=entity)

    return collections

bulk_delete(entity_list, **kwargs)

Bulk delete utility, based on delete method. Return a list of results.

Source code in arango_orm/database.py
def bulk_delete(self, entity_list, **kwargs):
    """Bulk delete utility, based on delete method. Return a list of results."""
    res = []
    for entity in entity_list:
        res.append(self.delete(entity, **kwargs))
    return res

bulk_update(entity_list, only_dirty=False, **kwargs)

Update all provided documents :param entity_list: List of Collection/Relationship objects :return: { collection_name : { collection_model: Collection class, entity_obj_list: Collection instance, entity_dict_list: dict } }

Source code in arango_orm/database.py
def bulk_update(self, entity_list, only_dirty=False, **kwargs):
    """
    Update all provided documents
    :param entity_list: List of Collection/Relationship objects
    :return: { collection_name : {
            collection_model: Collection class,
            entity_obj_list: Collection instance,
            entity_dict_list: dict
            }
        }
    """

    collections = {}
    for entity in entity_list:
        collection_model = self._db.collection(entity.__collection__)
        data = {}

        if only_dirty:
            if not entity._dirty:
                return entity

            dispatch(entity, "pre_update", db=self)  # In case of updates to fields
            data = {
                k: v
                for k, v in entity.model_dump(mode="json", by_alias=True).items()
                if k == "_key" or k in entity._dirty
            }
        else:
            dispatch(entity, "pre_update", db=self)
            data = entity.model_dump(mode="json", by_alias=True)

        # dispatch(entity, 'pre_update', db=self)
        collection_dict = collections.get(entity.__collection__, dict())
        entity_dict_list = collection_dict.get("entity_dict_list", list())
        entity_obj_list = collection_dict.get("entity_obj_list", list())

        self._entity_pre_process(data)

        entity_dict_list.append(data)
        entity_obj_list.append(entity)

        collection_dict["entity_dict_list"] = entity_dict_list
        collection_dict["entity_obj_list"] = entity_obj_list
        collection_dict["collection_model"] = collection_model

        collections[entity.__collection__] = collection_dict
        setattr(entity, "_db", self)
        # entity._dirty.clear()

    for _, data in collections.items():
        collection_model = data.get("collection_model")
        entity_dict_list = data.get("entity_dict_list")
        entity_obj_list = data.get("entity_obj_list")

        res = collection_model.update_many(entity_dict_list, **kwargs)
        for num, entity in enumerate(entity_obj_list, start=0):
            entity._dirty.clear()
            self._entity_post_process(entity, res[num])
            dispatch(entity, "post_update", db=self, result=entity)

    return collections

create_all(db_objects)

Create all objects (collections, relations and graphs).

Create all objects present in the db_objects list.

Source code in arango_orm/database.py
def create_all(self, db_objects):
    """
    Create all objects (collections, relations and graphs).

    Create all objects present in the db_objects list.
    """
    # Collect all graphs
    graph_objs = [obj for obj in db_objects if hasattr(obj, "__graph__")]

    for graph_obj in graph_objs:
        graph_info = self._get_graph_info(graph_obj)
        if not graph_info:
            # graph does not exist, create it
            log.info("Creating graph %s", graph_obj.__graph__)
            graph_instance = graph_obj(connection=self)
            self.create_graph(graph_instance)
        else:
            # Graph exists, determine changes and update graph accordingly
            log.debug("Graph %s already exists", graph_obj.__graph__)
            graph_instance = graph_obj(connection=self)
            self.update_graph(graph_instance, graph_info)

    exclude_collections = [c["name"] for c in self._db.collections()]

    for obj in db_objects:
        if hasattr(obj, "__bases__") and Collection in obj.__bases__:
            if obj.__collection__ not in exclude_collections:
                log.info("Creating collection %s", obj.__collection__)
                self.create_collection(obj)
            else:
                log.debug("Collection %s already exists", obj.__collection__)

create_collection(collection)

Create a collection

Source code in arango_orm/database.py
def create_collection(self, collection: Collection):
    "Create a collection"

    self._verify_collection(collection)

    col_args = {}
    if "col_args" in collection._collection_config:
        col_args = collection._collection_config["col_args"]

    if self._verify_relation(collection) and "edge" not in col_args:
        col_args["edge"] = True

    col = super(Database, self).create_collection(name=collection.__collection__, **col_args)

    if "indexes" in collection._collection_config:
        for index in collection._collection_config["indexes"]:
            # index_type: Literal["hash", "fulltext", "skiplist", "geo", "persistent", "ttl"]
            # fields: list[str]
            # unique: bool
            # sparse: bool
            index_create_method_name = "add_{}_index".format(index["index_type"])

            d = deepcopy(index)
            del d["index_type"]

            # create the index
            getattr(col, index_create_method_name)(**d)

create_graph(graph_object, **kwargs)

Create a named graph from given graph object Optionally can provide a list of collection names as ignore_collections so those collections are not created

Source code in arango_orm/database.py
def create_graph(self, graph_object: Graph, **kwargs):
    """
    Create a named graph from given graph object
    Optionally can provide a list of collection names as ignore_collections
    so those collections are not created
    """

    graph_edge_definitions = []

    # Create collections manually here so we also create indices
    # defined within the collection class. If we let the create_graph
    # call create the collections, it won't create the indices
    for _, col_obj in graph_object.vertices.items():
        if (
            "ignore_collections" in kwargs
            and col_obj.__collection__ in kwargs["ignore_collections"]
        ):
            continue

        try:
            self.create_collection(col_obj)
        except Exception:
            log.warning(
                "Error creating collection %s, it probably already exists",
                col_obj.__collection__,
            )

    for _, rel_obj in graph_object.edges.items():
        if (
            "ignore_collections" in kwargs
            and rel_obj.__collection__ in kwargs["ignore_collections"]
        ):
            continue

        try:
            self.create_collection(rel_obj)
        except Exception:
            log.warning(
                "Error creating edge collection %s, it probably already exists",
                rel_obj.__collection__,
            )

    for rel_name, relation_obj in graph_object.edges.items():
        cols_from = graph_object.edge_cols_from[rel_name]
        cols_to = graph_object.edge_cols_to[rel_name]

        from_col_names = [col.__collection__ for col in cols_from]
        to_col_names = [col.__collection__ for col in cols_to]

        graph_edge_definitions.append(
            {
                "edge_collection": relation_obj.__collection__,
                "from_vertex_collections": from_col_names,
                "to_vertex_collections": to_col_names,
            }
        )

    self._db.create_graph(graph_object.__graph__, graph_edge_definitions)

delete(entity, **kwargs)

Delete given document.

Source code in arango_orm/database.py
def delete(self, entity: Collection, **kwargs):
    """Delete given document."""
    dispatch(entity, "pre_delete", db=self)

    collection = self._db.collection(entity.__collection__)
    collection.delete(entity.key_, **kwargs)

    dispatch(entity, "post_delete", db=self, result=entity)
    return entity

drop_all(db_objects)

Drop all objects (collections, relations and graphs).

Drop all objects present in the db_objects list.

Source code in arango_orm/database.py
def drop_all(self, db_objects):
    """
    Drop all objects (collections, relations and graphs).

    Drop all objects present in the db_objects list.
    """
    # Collect all graphs
    graph_objs = [obj for obj in db_objects if hasattr(obj, "__graph__")]

    for graph_obj in graph_objs:
        graph_info = self._get_graph_info(graph_obj)
        if graph_info:
            # graph exists, drop it
            log.info("Dropping graph %s", graph_obj.__graph__)
            graph_instance = graph_obj(connection=self)
            self.drop_graph(graph_instance)
        else:
            # Graph exists, determine changes and update graph accordingly
            log.debug("Graph %s does not exist", graph_obj.__graph__)

    for obj in db_objects:
        if hasattr(obj, "__bases__") and Collection in obj.__bases__:
            try:
                self.drop_collection(obj)
            except CollectionDeleteError:
                log.debug(
                    "Not deleting missing collection: %s",
                    obj.__collection__,
                )

drop_collection(collection)

Drop a collection

Source code in arango_orm/database.py
def drop_collection(self, collection):
    "Drop a collection"
    self._verify_collection(collection)

    super(Database, self).delete_collection(name=collection.__collection__)

drop_graph(graph_object, drop_collections=True, **kwargs)

Drop a graph.

If drop_collections is True, drop all vertices and edges too. Optionally can provide a list of collection names as ignore_collections so those collections are not dropped

Source code in arango_orm/database.py
def drop_graph(self, graph_object, drop_collections=True, **kwargs):
    """
    Drop a graph.

    If drop_collections is True, drop all vertices and edges
    too. Optionally can provide a list of collection names as
    ignore_collections so those collections are not dropped
    """
    self._db.delete_graph(
        graph_object.__graph__,
        ignore_missing=True,
        drop_collections=drop_collections,
    )

exists(document)

Check if document exists in database.

Similar to has but takes in a document object and searches using it's _key.

Source code in arango_orm/database.py
def exists(self, document):
    """
    Check if document exists in database.

    Similar to has but takes in a document object and searches
    using it's _key.
    """

    return self._db.collection(document.__collection__).has(document.key_)

has(collection, key)

Check if the document with key exists in the given collection.

Source code in arango_orm/database.py
def has(self, collection, key):
    """Check if the document with key exists in the given collection."""

    return self._db.collection(collection.__collection__).has(key)

has_collection(collection)

Confirm that the given collection class or collection name exists in the db

Source code in arango_orm/database.py
def has_collection(self, collection):
    "Confirm that the given collection class or collection name exists in the db"

    collection_name = None

    if isclass(collection) and hasattr(collection, "__collection__"):
        collection_name = collection.__collection__

    elif isinstance(collection, str):
        collection_name = collection

    assert collection_name is not None

    return self._db.has_collection(collection_name)

query(CollectionClass)

Query given collection

Source code in arango_orm/database.py
def query(self, CollectionClass) -> Query:
    "Query given collection"

    return Query(CollectionClass, self)

update(entity, only_dirty=False, **kwargs)

Update given document

Source code in arango_orm/database.py
def update(self, entity, only_dirty=False, **kwargs):
    "Update given document"
    collection = self._db.collection(entity.__collection__)
    data = {}

    if only_dirty:
        if not entity._dirty:
            return entity
        dispatch(entity, "pre_update", db=self)  # In case of updates to fields
        data = {
            k: v
            for k, v in entity.model_dump(mode="json", by_alias=True).items()
            if k == "_key" or k in entity._dirty
        }
    else:
        dispatch(entity, "pre_update", db=self)
        data = entity.model_dump(mode="json", by_alias=True)

    setattr(entity, "_db", self)
    self._entity_pre_process(data)

    res = collection.update(data, **kwargs)

    entity._dirty.clear()
    self._entity_post_process(entity, res)
    dispatch(entity, "post_update", db=self, result=entity)

    return entity

update_graph(graph_object, graph_info=None)

Update existing graph object by adding collections and edge collections that are present in graph definition but not present within the graph in the database.

Note: We delete edge definitions if they no longer exist in the graph class but we don't drop collections

Source code in arango_orm/database.py
def update_graph(self, graph_object: Graph, graph_info=None):
    """
    Update existing graph object by adding collections and edge collections
    that are present in graph definition but not present within the graph
    in the database.

    Note: We delete edge definitions if they no longer exist in the graph
    class but we don't drop collections
    """

    if graph_info is None:
        graph_info = self._get_graph_info(graph_object)

    # Create collections manually here so we also create indices
    # defined within the collection class. If we let the create_graph
    # call create the collections, it won't create the indices
    existing_collection_names = [c["name"] for c in self.collections()]
    for _, col_obj in graph_object.vertices.items():
        try:
            if col_obj.__collection__ in existing_collection_names:
                log.debug("Collection %s already exists", col_obj.__collection__)
                continue

            log.info("+ Creating collection %s", col_obj.__collection__)
            self.create_collection(col_obj)

        except Exception:
            log.warning(
                "Error creating collection %s, it probably already exists",
                col_obj.__collection__,
            )

    for _, rel_obj in graph_object.edges.items():
        try:
            if rel_obj.__collection__ in existing_collection_names:
                log.debug("Collection %s already exists", rel_obj.__collection__)
                continue

            log.info("+ Creating edge collection %s", rel_obj.__collection__)
            self.create_collection(rel_obj, edge=True)
        except Exception:
            log.warning(
                "Error creating edge collection %s, it probably already exists",
                rel_obj.__collection__,
            )

    existing_edges = dict(
        [(e["edge_collection"], e) for e in graph_object._graph.edge_definitions()]
    )

    for rel_name, relation in graph_object.edges.items():
        cols_from = graph_object.edge_cols_from[rel_name]
        cols_to = graph_object.edge_cols_to[rel_name]

        from_col_names = [col.__collection__ for col in cols_from]
        to_col_names = [col.__collection__ for col in cols_to]

        edge_definition = {
            "edge_collection": relation.__collection__,
            "from_vertex_collections": from_col_names,
            "to_vertex_collections": to_col_names,
        }

        # if edge does not already exist, create it
        if edge_definition["edge_collection"] not in existing_edges:
            log.info("  + creating graph edge definition: %r", edge_definition)
            graph_object._graph.create_edge_definition(**edge_definition)
        else:
            # if edge definition exists, see if it needs updating
            # compare edges
            if not self._is_same_edge(
                edge_definition,
                existing_edges[edge_definition["edge_collection"]],
            ):
                # replace_edge_definition
                log.info(
                    "  graph edge definition modified, updating:\n new: %r\n old: %r",
                    edge_definition,
                    existing_edges[edge_definition["edge_collection"]],
                )
                graph_object._graph.replace_edge_definition(**edge_definition)

    # Remove any edge definitions that are present in DB but not in graph definition
    graph_connections = dict(
        [(gc.relation.__collection__, gc) for gc in graph_object.graph_connections]
    )

    for edge_name, ee in existing_edges.items():
        if edge_name not in graph_connections:
            log.warning(
                "  - dropping edge no longer present in graph definition. "
                "Please drop the edge and vertex collections manually if you no "
                "longer need them: \n%s",
                ee,
            )

            graph_object._graph.delete_edge_definition(edge_name)

Collection

Bases: BaseModel

Source code in arango_orm/collections.py
class Collection(BaseModel):
    __collection__ = None

    _safe_list: ClassVar[list[str]] = [
        "__collection__",
        "_safe_list",
        "_annotated_fields"
        "_relations",
        "_index",
        "config_",
        "_collections_from",
        "_collections_to",
        "_object_from",
        "_object_to",
        "_post_process",
        "_pre_process",
        "_fields_info",
        "_fields",
        "_db",
        "_refs",
        "_refs_vals",
        "model_config",  # pydantic configuration attribute
    ]

    _annotated_fields: ClassVar[list[str]] = []

    model_config: ClassVar[dict] = ConfigDict(
        populate_by_name=True,
        arbitrary_types_allowed=True,
        ignored_types=(Relationship,),
    )
    _collection_config: ClassVar[dict] = CollectionConfig()
    _dirty: set
    _db: Database | None
    _fields: dict
    _refs: dict
    _refs_vals: dict

    key_: str = Field(None, alias="_key")
    rev_: str = Field(None, alias="_rev")

    def __init__(self, collection_name=None, **kwargs):
        if "_key" in kwargs:
            kwargs["key_"] = kwargs["_key"]
            del kwargs["_key"]

        if "_id" in kwargs:
            del kwargs["_id"]
        # if self._collection_config.get("key_field", None) and 'key_' not in kwargs:
        #     kwargs["key_"] = kwargs[self._collection_config["key_field"]]

        super().__init__(**kwargs)
        if collection_name is not None:
            self.__collection__ = collection_name

        # Determine actual data fields.
        self._fields = {}
        self._refs_vals: dict = {}  # Storage for referenced objects
        self._refs: dict[str, FieldInfo] = {}

        for fname, f in self.model_fields.items():
            if f.default.__class__ in self.model_config.get("ignored_types", []):
                if f.default.__class__ is Relationship:
                    self._refs[fname] = f

                continue

            if fname in self._annotated_fields:
                continue

            self._fields[fname] = f

        self._dirty = set(self._fields.keys())
        self._db = kwargs.get("_db", None)
        if self._db is not None:
            self._dirty = set()

    def __str__(self):
        return f"{self.__class__.__name__}({super().__str__()})"

    @property
    def id_(self) -> str | None:
        if self._key is not None and self.__collection__ is not None:
            return f"{self.__collection__}/{self._key}"

        # if hasattr(self, "_key") and getattr(self, "_key") is not None:
        #     return self.__collection__ + "/" + getattr(self, "_key")

        return None

    @property
    def _id(self) -> str | None:
        return self.id_

    @property
    def _key(self) -> str | None:
        return self.key_

    @_key.setter
    def _key(self, value):
        self.key_ = value

    @property
    def _rev(self) -> str | None:
        return self.rev_

    @_rev.setter
    def _rev(self, value):
        self.rev_ = value

    def __setattr__(self, attr, value):
        a_real = attr

        # if attr == self.config_.get("key_field", None):
        #     a_real = "key_"

        if attr == "_id":
            return

        if "_key" == attr:
            a_real = "key_"

        super().__setattr__(a_real, value)

        if a_real not in self.model_fields_set:
            return

        self._dirty.add(a_real)

    def __getattribute__(self, item: str):
        if item in ["_id", "id_"]:
            return super().__getattribute__("id_")

        if item == "_key":
            return super().__getattribute__("key_")

        if item.startswith(("_", "model_") or item in self._annotated_fields):
            return super().__getattribute__(item)

        if item in self._safe_list:
            return super().__getattr__(item)

        if item not in self.model_fields:
            raise AttributeError(name=item, obj=self)

        if item not in self._refs:
            return super().__getattribute__(item)

        # Item is a relationship so we need to lookit up and return proper value.
        if item in self._refs_vals:
            return self._refs_vals[item]

        if self._db is None:
            raise DetachedInstanceError()

        relationship: Relationship = self._refs[item].default
        ReferencedClass = relationship.col_class

        r_val = None
        if "key_" == relationship.target_field:
            r_val = self._db.query(ReferencedClass).by_key(
                getattr(self, relationship.field)
                # super(Collection, self).__getattribute__(ref_class.field)
            )

            if relationship.uselist is True:
                r_val = [
                    r_val,
                ]

        else:
            query = self._db.query(ReferencedClass).filter(
                relationship.target_field + "==@val",
                val=getattr(self, relationship.field),
            )

            if relationship.uselist is False:
                r_val = query.first()

            else:
                # TODO: Handle ref_class.order_by if present
                r_val = query.all()

        if relationship.cache is True:
            self._refs_vals[item] = r_val

        return r_val

    def model_dump(
        self,
        *,
        mode: Literal["json", "python"] | str = "python",
        include: IncEx = None,
        exclude: IncEx = None,
        by_alias: bool = False,
        exclude_unset: bool = False,
        exclude_defaults: bool = False,
        exclude_none: bool = False,
        round_trip: bool = False,
        warnings: bool = True,
    ) -> dict[str, Any]:
        exclude = exclude or set()
        exclude_fields = {"_db", "_refs"}
        if exclude:
            exclude = set(exclude)
            exclude.update(exclude_fields)
        else:
            exclude = exclude_fields

        for fname in self.model_fields:
            if fname not in self._fields:
                exclude.add(fname)

        return super().model_dump(
            mode=mode,
            include=include,
            exclude=exclude,
            by_alias=by_alias,
            exclude_unset=exclude_unset,
            exclude_defaults=exclude_defaults,
            exclude_none=exclude_none,
            round_trip=round_trip,
            warnings=warnings,
        )

Relation

Bases: Collection

Source code in arango_orm/collections.py
class Relation(Collection):
    from_: str | None = Field(None, alias="_from")
    to_: str | None = Field(None, alias="_to")
    _collections_from: Collection | list[Collection] | None
    _collections_to: Collection | list[Collection] | None

    def __init__(self, collection_name=None, **kwargs):
        if "_from" in kwargs:
            kwargs["from_"] = kwargs["_from"]
            del kwargs["_from"]

        if "_to" in kwargs:
            kwargs["to_"] = kwargs["_to"]
            del kwargs["_to"]

        super().__init__(collection_name=collection_name, **kwargs)

        if "_collections_from" in kwargs:
            self._collections_from = kwargs["_collections_from"]
            del kwargs["_collections_from"]
        else:
            self._collections_from = None

        if "_collections_to" in kwargs:
            self._collections_to = kwargs["_collections_to"]
            del kwargs["_collections_to"]
        else:
            self._collections_to = None

        self._object_from = None
        self._object_to = None

    @property
    def _from(self) -> str | None:
        return self.from_

    @_from.setter
    def _from(self, value):
        self.from_ = value

    @property
    def _to(self) -> str | None:
        return self.to_

    @_to.setter
    def _to(self, value):
        self.to_ = value

Graph

Bases: object

Source code in arango_orm/graph.py
class Graph(object):
    __graph__ = None
    graph_connections = None

    def __init__(self, graph_name=None, graph_connections=None, connection=None):
        self.vertices = {}
        self.edges: dict[str, Relation] = {}
        self.edge_cols_from: dict[str, list[Collection]] = {}
        self.edge_cols_to: dict[str, list[Collection]] = {}
        self._db = connection
        self._graph = None
        self.inheritances = {}

        if graph_name is not None:
            self.__graph__ = graph_name

        if self._db is not None:
            self._graph = self._db.graph(self.__graph__)

        if graph_connections:
            self.graph_connections = graph_connections

        if self.graph_connections:
            for gc in self.graph_connections:
                froms = gc.collections_from
                if not isinstance(froms, (list, tuple)):
                    froms = [
                        froms,
                    ]

                tos = gc.collections_to
                if not isinstance(tos, (list, tuple)):
                    tos = [
                        tos,
                    ]

                # Note: self.vertices stores collection classes while self.edges stores
                # relation objects (not classes)
                for col in froms + tos:
                    if col.__collection__ not in self.vertices:
                        self.vertices[col.__collection__] = col

                if gc.relation.__collection__ not in self.edges:
                    self.edges[gc.relation.__collection__] = gc.relation
                    self.edge_cols_from[gc.relation.__collection__] = froms
                    self.edge_cols_to[gc.relation.__collection__] = tos

        # for col_name, col in [
        #     (col_name, col)
        #     for col_name, col in self.vertices.items()
        #     if len(col._inheritance_mapping) > 0
        # ]:
        #     subclasses = self._get_recursive_subclasses(col).union([col])

        #     if len(subclasses) > 1:
        #         _inheritances = {}
        #         for subclass in [
        #             subclass
        #             for subclass in subclasses
        #             if subclass.__name__ in col._inheritance_mapping
        #         ]:
        #             _inheritances[col._inheritance_mapping[subclass.__name__]] = subclass
        #         if len(_inheritances):
        #             self.inheritances[col_name] = _inheritances

    # def _get_recursive_subclasses(self, cls):
    #     return set(cls.__subclasses__()).union(
    #         [s for c in cls.__subclasses__() for s in self._get_recursive_subclasses(c)]
    #     )

    def relation(self, relation_from, relation, relation_to):
        """
        Return relation (edge) object from given collection (relation_from and
        relation_to) and edge/relation (relation) objects
        """

        # relation._from = relation_from.__collection__ + '/' + relation_from._key
        # relation._to = relation_to.__collection__ + '/' + relation_to._key
        relation.from_ = relation_from.id_
        relation.to_ = relation_to.id_

        return relation

    def _inheritance_mapping_inspector(self, collection_class: Collection, doc_dict: dict):
        field = collection_class._inheritance_field
        mapping = self.inheritances[collection_class.__collection__]

        if doc_dict[field] not in mapping or not issubclass(mapping[doc_dict[field]], Collection):
            return False

        return mapping[doc_dict[field]]

    def inheritance_mapping_resolver(self, col_name: str, doc_dict) -> Type[Collection]:
        """
        Custom method to resolve inheritance mapping.

        It allows the user to resolve the class of the current object based on any condition (discriminator field a/o
        inference).

        :param col_name: The collection name retrieved from the object _id property
        :param doc_dict: The object as dict
        :return Type[Collection]
        """
        return self.vertices[col_name]

    def _doc_from_dict(self, doc_dict):
        "Given a result dictionary, creates and returns a document object"

        col_name = doc_dict["_id"].split("/")[0]
        CollectionClass = self.vertices[col_name]

        if CollectionClass.__collection__ in self.inheritances:
            found_class = self._inheritance_mapping_inspector(CollectionClass, doc_dict)
            if issubclass(found_class, Collection):
                CollectionClass = found_class

        elif callable(self.inheritance_mapping_resolver):
            resolved_class = self.inheritance_mapping_resolver(col_name, doc_dict)
            if issubclass(resolved_class, Collection):
                CollectionClass = resolved_class

        # remove empty values
        keys_to_del = []
        for k, v in doc_dict.items():
            if doc_dict[k] is None:
                keys_to_del.append(k)

        if keys_to_del:
            for k in keys_to_del:
                del doc_dict[k]

        return CollectionClass(**doc_dict)

    def _objectify_results(self, results, doc_obj=None):
        """
        Make traversal results object oriented by adding all links to the first
        object's _relations attribute. If doc_obj is not provided, the first
        vertex of the first path is used.
        """

        # Create objects from vertices dicts
        documents = {}
        if doc_obj:
            documents[doc_obj.id_] = doc_obj

        relations_added = {}

        for p_dict in results:
            for v_dict in p_dict["vertices"]:
                if doc_obj is None:
                    # Get the first vertex of the first result, it's the parent object
                    doc_obj = self._doc_from_dict(v_dict)
                    documents[doc_obj.id_] = doc_obj

                if v_dict["_id"] in documents:
                    continue

                # Get ORM class for the collection
                documents[v_dict["_id"]] = self._doc_from_dict(v_dict)

            # Process each path as a unit
            # First edge's _from always points to our parent document
            parent_id = doc_obj.id_

            for e_dict in p_dict["edges"]:
                col_name = e_dict["_id"].split("/")[0]
                rel_identifier = parent_id + "->" + e_dict["_id"]

                if rel_identifier in relations_added:
                    rel = relations_added[rel_identifier]

                else:
                    RelationClass = self.edges[col_name]

                    if not isclass(self.edges[col_name]):
                        RelationClass = RelationClass.__class__

                    rel = RelationClass(**e_dict)
                    rel._object_from = documents[rel.from_]
                    rel._object_to = documents[rel.to_]

                    parent_object = None
                    if rel.from_ == parent_id:
                        parent_object = documents[rel.from_]
                        rel._next = rel._object_to

                    elif rel.to_ == parent_id:
                        parent_object = documents[rel.to_]
                        rel._next = rel._object_from

                    assert parent_object is not None

                    if not hasattr(parent_object, "_relations"):
                        setattr(parent_object, "_relations", {})

                    if col_name not in parent_object._relations:
                        parent_object._relations[col_name] = []

                    if rel not in parent_object._relations[col_name]:
                        parent_object._relations[col_name].append(rel)

                    if rel.id_ not in relations_added:
                        relations_added[rel_identifier] = rel

                # Set parent ID
                if rel.from_ == parent_id:
                    parent_id = rel.to_

                elif rel.to_ == parent_id:
                    parent_id = rel.from_

        return doc_obj

    def expand(self, doc_obj, direction="any", depth=1, only=None, condition:str = None):
        """
        Graph traversal.

        Expand all links of given direction (outbound, inbound, any) upto given
        length for the given document object and update the object with the
        found relations.

        :param only: If given should be a string, Collection class or list of
        strings or collection classes containing target collection names of
        documents (vertices) that should be fetched.
        Any vertices found in traversal that don't belong to the specified
        collection names given in this parameter will be ignored.

        :param condition: String containing conditions in JS format. If `only` is provided
        then these conditions are merged with only using logical AND. Within the condition
        3 objects (config, vertex, path) are available for use within the traversal context.
        """
        assert direction in ("any", "inbound", "outbound")

        graph = self._db.graph(self.__graph__)
        doc_id = doc_obj.id_
        doc_obj._relations = {}  # clear any previous relations
        filter_func = None
        c_str = ""

        aql = f"FOR vertex, edge, path IN 1..{depth} {direction} '{doc_id}' GRAPH {self.__graph__}\n"

        if only:
            if not isinstance(only, (list, tuple)):
                only = [
                    only,
                ]

            for c in only:
                if not isinstance(c, str) and hasattr(c, "__collection__"):
                    c = c.__collection__

                c_str += "vertex._id LIKE '" + c + "/%' ||"

            if c_str:
                c_str = c_str[:-3]

        if condition:
            if c_str:
                c_str = c_str + ' AND ' + condition
            else:
                c_str = condition

        if c_str:
            aql += "FILTER " + c_str + "\n"


        aql += "RETURN path"

        # print(aql)

        new_doc = self.aql(aql)
        if new_doc:
            doc_obj._relations = new_doc._relations

        # results = graph.traverse(
        #     start_vertex=doc_id,
        #     direction=direction,
        #     vertex_uniqueness="path",
        #     min_depth=1,
        #     max_depth=depth,
        #     filter_func=filter_func,
        # )


        # self._objectify_results(results["paths"], doc_obj)

    def aql(self, query, **kwargs):
        """Run AQL graph traversal query."""
        results = self._db.aql.execute(query, **kwargs)

        doc_obj = self._objectify_results(results)

        return doc_obj

    def delete_tree(self, doc_obj):
        """
        Remove node and all nodes linked to it based on traversal criteria.

        Only nodes present in doc_obj._relations dict are removed.
        """
        objs_to_delete = [doc_obj]

        def get_linked_objects(obj):
            ret = []
            for _, e_objs in getattr(obj, "_relations", {}).items():
                for e_obj in e_objs:
                    ret.append(e_obj)
                    v_obj = e_obj._next
                    ret.append(v_obj)

                    if hasattr(v_obj, "_relations"):
                        _ = get_linked_objects(v_obj)

            return ret

        if hasattr(doc_obj, "_relations"):
            objs_to_delete.extend(get_linked_objects(doc_obj))

        for obj in reversed(objs_to_delete):
            self._db.delete(obj)

        doc_obj._relations = {}

aql(query, **kwargs)

Run AQL graph traversal query.

Source code in arango_orm/graph.py
def aql(self, query, **kwargs):
    """Run AQL graph traversal query."""
    results = self._db.aql.execute(query, **kwargs)

    doc_obj = self._objectify_results(results)

    return doc_obj

delete_tree(doc_obj)

Remove node and all nodes linked to it based on traversal criteria.

Only nodes present in doc_obj._relations dict are removed.

Source code in arango_orm/graph.py
def delete_tree(self, doc_obj):
    """
    Remove node and all nodes linked to it based on traversal criteria.

    Only nodes present in doc_obj._relations dict are removed.
    """
    objs_to_delete = [doc_obj]

    def get_linked_objects(obj):
        ret = []
        for _, e_objs in getattr(obj, "_relations", {}).items():
            for e_obj in e_objs:
                ret.append(e_obj)
                v_obj = e_obj._next
                ret.append(v_obj)

                if hasattr(v_obj, "_relations"):
                    _ = get_linked_objects(v_obj)

        return ret

    if hasattr(doc_obj, "_relations"):
        objs_to_delete.extend(get_linked_objects(doc_obj))

    for obj in reversed(objs_to_delete):
        self._db.delete(obj)

    doc_obj._relations = {}

expand(doc_obj, direction='any', depth=1, only=None, condition=None)

Graph traversal.

Expand all links of given direction (outbound, inbound, any) upto given length for the given document object and update the object with the found relations.

:param only: If given should be a string, Collection class or list of strings or collection classes containing target collection names of documents (vertices) that should be fetched. Any vertices found in traversal that don't belong to the specified collection names given in this parameter will be ignored.

:param condition: String containing conditions in JS format. If only is provided then these conditions are merged with only using logical AND. Within the condition 3 objects (config, vertex, path) are available for use within the traversal context.

Source code in arango_orm/graph.py
def expand(self, doc_obj, direction="any", depth=1, only=None, condition:str = None):
    """
    Graph traversal.

    Expand all links of given direction (outbound, inbound, any) upto given
    length for the given document object and update the object with the
    found relations.

    :param only: If given should be a string, Collection class or list of
    strings or collection classes containing target collection names of
    documents (vertices) that should be fetched.
    Any vertices found in traversal that don't belong to the specified
    collection names given in this parameter will be ignored.

    :param condition: String containing conditions in JS format. If `only` is provided
    then these conditions are merged with only using logical AND. Within the condition
    3 objects (config, vertex, path) are available for use within the traversal context.
    """
    assert direction in ("any", "inbound", "outbound")

    graph = self._db.graph(self.__graph__)
    doc_id = doc_obj.id_
    doc_obj._relations = {}  # clear any previous relations
    filter_func = None
    c_str = ""

    aql = f"FOR vertex, edge, path IN 1..{depth} {direction} '{doc_id}' GRAPH {self.__graph__}\n"

    if only:
        if not isinstance(only, (list, tuple)):
            only = [
                only,
            ]

        for c in only:
            if not isinstance(c, str) and hasattr(c, "__collection__"):
                c = c.__collection__

            c_str += "vertex._id LIKE '" + c + "/%' ||"

        if c_str:
            c_str = c_str[:-3]

    if condition:
        if c_str:
            c_str = c_str + ' AND ' + condition
        else:
            c_str = condition

    if c_str:
        aql += "FILTER " + c_str + "\n"


    aql += "RETURN path"

    # print(aql)

    new_doc = self.aql(aql)
    if new_doc:
        doc_obj._relations = new_doc._relations

inheritance_mapping_resolver(col_name, doc_dict)

Custom method to resolve inheritance mapping.

It allows the user to resolve the class of the current object based on any condition (discriminator field a/o inference).

:param col_name: The collection name retrieved from the object _id property :param doc_dict: The object as dict :return Type[Collection]

Source code in arango_orm/graph.py
def inheritance_mapping_resolver(self, col_name: str, doc_dict) -> Type[Collection]:
    """
    Custom method to resolve inheritance mapping.

    It allows the user to resolve the class of the current object based on any condition (discriminator field a/o
    inference).

    :param col_name: The collection name retrieved from the object _id property
    :param doc_dict: The object as dict
    :return Type[Collection]
    """
    return self.vertices[col_name]

relation(relation_from, relation, relation_to)

Return relation (edge) object from given collection (relation_from and relation_to) and edge/relation (relation) objects

Source code in arango_orm/graph.py
def relation(self, relation_from, relation, relation_to):
    """
    Return relation (edge) object from given collection (relation_from and
    relation_to) and edge/relation (relation) objects
    """

    # relation._from = relation_from.__collection__ + '/' + relation_from._key
    # relation._to = relation_to.__collection__ + '/' + relation_to._key
    relation.from_ = relation_from.id_
    relation.to_ = relation_to.id_

    return relation

Events

dispatch(target, event, *args, **kwargs)

Fire given event

Source code in arango_orm/event.py
def dispatch(target, event, *args, **kwargs):
    "Fire given event"
    by_event = _registrars[event]
    for t in by_event.keys():
        if isinstance(target, t):
            for fn in by_event[t]:
                fn(target, event, *args, **kwargs)

listen(target, event, fn, *args, **kwargs)

Register fn to listen for event

Source code in arango_orm/event.py
def listen(target, event, fn, *args, **kwargs):
    "Register fn to listen for event"
    events = [event] if isinstance(event, six.text_type) else event

    for event in events:
        _registrars[event][target].append(fn)

listens_for(target, event, *args, **kwargs)

Decorator to register fn to listen for event

Source code in arango_orm/event.py
def listens_for(target, event, *args, **kwargs):
    "Decorator to register fn to listen for event"
    def decorator(fn):
        listen(target, event, fn, *args, **kwargs)
        return fn
    return decorator