Commit ce9f5253 authored by Eelco van der Wel's avatar Eelco van der Wel 💬
Browse files

add edges, rewrite priority strategy

parent 7995978a
Showing with 196 additions and 63 deletions
+196 -63
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp data.itembase
```
%% Cell type:markdown id: tags:
# Itembase
%% Cell type:markdown id: tags:
Any data class in pymemri inherits from `Item`. It is a base class for items with some handy functionalities to create new items and edges, retrieve all edges to other items, and sync with the pod.
%% Cell type:code id: tags:
``` python
# export
# hide
from typing import Optional, Dict, List, Set
from pymemri.imports import *
from datetime import datetime
import uuid
ALL_EDGES = "allEdges"
SOURCE, TARGET, TYPE, EDGE_TYPE, LABEL, SEQUENCE = "_source", "_target", "_type", "_type", "label", "sequence"
```
%% Cell type:code id: tags:
``` python
#hide
from nbdev.showdoc import *
```
%% Cell type:code id: tags:
``` python
# export
class Edge():
"""Edges makes a link between two `ItemBase` Items. You won't use this class a lot in practice, as edges are
abstracted away for normal users. When items are retrieved from the database, the edges are parsed automatically.
When you add an edge between to items within pymemri, you will often use `ItemBase.add_edge`"""
def __init__(self, source, target, _type, label=None, sequence=None, created=False, reverse=True):
self.source = source
self.target = target
self._type = _type
self.label = label
self.sequence = sequence
self.created = created
self.reverse = reverse
@classmethod
def from_json(cls, json):
from pymemri.data.schema import get_constructor
# we only set the target here
_type = json[EDGE_TYPE]
json_target = json[TARGET]
target_type = json_target["_type"]
plugin_class = json_target.get("pluginClass", None)
target_constructor = get_constructor(target_type, plugin_class)
target = target_constructor.from_json(json_target)
return cls(source=None, target=target, _type=_type)
def __repr__(self):
return f"{self.source} --{self._type}-> {self.target}"
def update(self, api):
if self.created:
api.create_edges([self])
def __eq__(self, other):
return self.source is other.source and self.target is other.target \
and self._type == other._type and self.reverse == other.reverse and self.created == other.created \
and self.label == other.label
def traverse(self, start):
"""We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function."""
if start == self.source:
return self.target
elif start == self.target:
return self.source
else:
raise ValueError
```
%% Cell type:code id: tags:
``` python
show_doc(Edge.traverse)
```
%% Output
<h4 id="Edge.traverse" class="doc_header"><code>Edge.traverse</code><a href="__main__.py#L39" class="source_link" style="float:right">[source]</a></h4>
> <code>Edge.traverse</code>(**`start`**)
We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function.
%% Cell type:code id: tags:
``` python
# export
# hide
class ItemBase:
"""Provides a base class for all items.
All items in the schema inherit from this class, and it provides some
basic functionality for consistency and to enable easier usage."""
properties: List[str] = list()
edges: List[str] = list()
def __init__(self, id: str = None):
self._updated_properties: Set[str] = set()
self.id: Optional[str] = id
self._client: Optional["PodClient"] = None
self._in_pod: bool = False
self._new_edges = list()
self._date_local_modified = dict()
def _set_client(self, client: "PodClient"):
if self._client is not None and self._client != client:
raise ValueError(f"Attempted to overwrite existing client of item {self}")
self._client = client
def __setattr__(self, name, value):
prev_val = getattr(self, name, None)
super(ItemBase, self).__setattr__(name, value)
if name in self.properties and value != prev_val:
self._updated_properties.add(name)
self._date_local_modified[name] = datetime.utcnow()
if name not in self._original_properties:
self._original_properties[name] = prev_val
@property
def _updated_properties(self):
return set(self._original_properties.keys())
def __getattribute__(self, name):
val = object.__getattribute__(self, name)
if isinstance(val, Edge):
edge = val
return edge.traverse(start=self)
if isinstance(val, list) and len(val) > 0 and isinstance(val[0], Edge):
edges = val
return [edge.traverse(start=self) for edge in edges]
else:
return val
def on_sync_to_pod(self, client):
"""
on_sync_to_pod is called when self is created or updated (optionally via bulk) in the PodClient.
"""
self._set_client(client)
self._updated_properties = set()
self._in_pod = True
def add_edge(self, name, val):
"""Creates an edge of type name and makes it point to val"""
val = Edge(self, val, name, created=True)
if name not in self.__dict__:
raise NameError(f"object {self} does not have edge with name {name}")
existing = object.__getattribute__(self, name)
if val not in existing:
res = existing + [val]
self.__setattr__(name, res)
def is_expanded(self):
"""returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object."""
return len(self.get_all_edges()) > 0
def get_edges(self, name):
return object.__getattribute__(self, name)
def get_all_edges(self):
return [e for attr in self.__dict__.values() if self.attr_is_edge(attr) for e in attr]
def get_all_edge_names(self):
return [k for k,v in self.__dict__.items() if self.attr_is_edge(v)]
def get_property_names(self):
return [k for k, v in self.__dict__.items() if not type(v) == list]
@staticmethod
def attr_is_edge(attr):
return isinstance(attr, list) and len(attr)>0 and isinstance(attr[0], Edge)
def update(self, api, edges=True, create_if_not_exists=True, skip_nodes=False):
if not self.exists(api):
print(f"creating {self}")
api.create(self)
else:
print(f"updating {self}")
api.update_item(self)
if edges:
api.create_edges(self.get_all_edges())
def exists(self, api):
return api.exists(self.id) if self.id else None
def create_id(self, overwrite=False):
if not overwrite and self.id is not None:
return
self.id = uuid.uuid4().hex
def store(self, client: "PodClient" = None):
if client:
if client is not None:
self._set_client(client)
self._client.sync_store.append(self)
self._client.add_to_sync(self)
# def expand(self, api):
# """Expands a node (retrieves all directly connected nodes ands adds to object)."""
# self._expanded = True
# res = api.get(self.id, expanded=True)
# for edge_name in res.get_all_edge_names():
# edges = res.get_edges(edge_name)
# for e in edges:
# e.source = self
# self.__setattr__(edge_name, edges)
# # self.edges = res.edges
# return self
def __repr__(self):
id = self.id
_type = self.__class__.__name__
return f"{_type} (#{id})"
@classmethod
def from_data(cls, *args, **kwargs):
edges = dict()
new_kwargs = dict()
for k, v in kwargs.items():
if isinstance(v, ItemBase):
edge = Edge(None, v, k)
edges[k] = edge
new_kwargs[k] = edge
else:
new_kwargs[k] = v
res = cls(*args, **new_kwargs)
for v in edges.values():
v.source = res
return res
# def inherit_funcs(self, other):
# """This function can be used to inherit new functionalities from a subclass. This is a patch to solve
# the fact that python does provide extensions of classes that are defined in a different file that are
# dynamic enough for our use case."""
# assert issubclass(other, self.__class__)
# self.__class__ = other
```
%% Cell type:code id: tags:
``` python
# export
class Item(ItemBase):
"""Item is the baseclass for all of the data classes."""
properties = [
"dateCreated",
"dateModified",
"dateServerModified",
"deleted",
"externalId",
"itemDescription",
"starred",
"version",
"id",
"importJson",
"pluginClass",
]
edges = ["changelog", "label", "genericAttribute", "measure", "sharedWith"]
DATE_PROPERTIES = ['dateCreated', 'dateModified', 'dateServerModified']
def __init__(
self,
dateCreated: datetime = None,
dateModified: datetime = None,
dateServerModified: datetime = None,
deleted: bool = None,
externalId: str = None,
itemDescription: str = None,
starred: bool = None,
version: str = None,
id: str = None,
importJson: str = None,
pluginClass: str = None,
changelog: list = None,
label: list = None,
genericAttribute: list = None,
measure: list = None,
sharedWith: list = None
):
super().__init__(id)
# Properties
self.dateCreated: Optional[str] = dateCreated
self.dateModified: Optional[str] = dateModified
self.dateServerModified: Optional[str] = dateServerModified
self.deleted: Optional[str] = deleted
self.externalId: Optional[str] = externalId
self.itemDescription: Optional[str] = itemDescription
self.starred: Optional[str] = starred
self.version: Optional[str] = version
self.importJson: Optional[str] = importJson
self.pluginClass: Optional[str] = pluginClass
# Edges
self.changelog: list = changelog if changelog is not None else []
self.label: list = label if label is not None else []
self.genericAttribute: list = genericAttribute if genericAttribute is not None else []
self.measure: list = measure if measure is not None else []
self.sharedWith: list = sharedWith if sharedWith is not None else []
@classmethod
def parse_json(self, cls, json):
property_kwargs = Item.parse_properties(cls, json)
edge_kwargs = Item.parse_edges(cls, json)
return {**property_kwargs, **edge_kwargs}
@classmethod
def parse_properties(self, cls, json):
return {p: json.get(p, None) for p in cls.properties}
@classmethod
def parse_edges(self, cls, json):
all_edges = json.get(ALL_EDGES, None)
edge_kwargs = dict()
reverse_edges = [f"~{e}" for e in cls.edges]
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge.type in self.edges + reverse_edges:
edge_name = self.remove_prefix(edge.type)
if edge_name in edge_kwargs:
edge_kwargs[edge_name] += [edge]
else:
edge_kwargs[edge_name] = [edge]
return edge_kwargs
@classmethod
def get_property_types(cls, dates=False) -> Dict[str, type]:
"""
Infer the property types of all properties in cls.
Raises ValueError if type anotations for properties are missing in the cls init.
"""
mro = cls.mro()
property_types = dict()
for basecls in reversed(mro[:mro.index(ItemBase)]):
property_types.update(basecls.__init__.__annotations__)
property_types = {k: v for k, v in property_types.items() if k in cls.properties}
if not set(property_types.keys()) == set(cls.properties):
raise ValueError(f"Item {cls.__name__} has missing property annotations.")
res = dict()
for k, v in property_types.items():
if k[:1] != '_' and k != "private" and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in cls.DATE_PROPERTIES)):
res[k] = v
return res
@classmethod
def remove_prefix(s, prefix="~"):
return s[1:] if s[0] == "`" else s
@classmethod
def from_json(cls, json):
kwargs = Item.parse_json(cls, json)
property_types = cls.get_property_types(dates=True)
for k, v in kwargs.items():
if v is not None and property_types[k] == datetime:
# Datetime in pod is in milliseconds
kwargs[k] = datetime.fromtimestamp(v / 1000.)
res = cls(**kwargs)
for e in res.get_all_edges():
e.source = res
return res
def _get_schema_type(self):
for cls in self.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def to_json(self, dates=True):
res = dict()
private = getattr(self, "private", [])
for k, v in self.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in self.DATE_PROPERTIES)):
if isinstance(v, datetime):
# Save datetimes in milliseconds
v = int(v.timestamp() * 1000)
res[k] = v
res["type"] = self._get_schema_type()
return res
```
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.add_edge)
```
%% Output
<h4 id="ItemBase.add_edge" class="doc_header"><code>ItemBase.add_edge</code><a href="__main__.py#L47" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.add_edge</code>(**`name`**, **`val`**)
Creates an edge of type name and makes it point to val
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.is_expanded)
```
%% Output
<h4 id="ItemBase.is_expanded" class="doc_header"><code>ItemBase.is_expanded</code><a href="__main__.py#L57" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.is_expanded</code>()
returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object.
%% Cell type:markdown id: tags:
# Usage
%% Cell type:markdown id: tags:
With the `Item` and `Edge` classes we can create an item and its surrounding graph. The schema is defined in schema.py, in general we want to use the from_data staticmethod to generate new items, because it ensures that edges are linked from both the source and the target object. Let's make a new item and add it to the pod.
%% Cell type:code id: tags:
``` python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges + ["friend"]
def __init__(self, name: str=None, age: int=None,friend: list=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.friend = fried if friend is not None else []
```
%% Cell type:code id: tags:
``` python
from pymemri.pod.client import PodClient
client = PodClient()
```
%% Cell type:code id: tags:
``` python
assert client.add_to_schema(MyItem(name="abc", age=1))
```
%% Cell type:code id: tags:
``` python
x = MyItem(name="me", age=30)
target = MyItem(name="my friend", age=31)
client.create(target)
x.add_edge("friend", MyItem(name="my friend", age=31))
```
%% Cell type:markdown id: tags:
We can now create our `MyItem`, as a side-effect of creating it, it will receive an id
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
None
%% Cell type:code id: tags:
``` python
assert client.create(x)
```
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
a17c93d199f4128976d7eac3542b669b
%% Cell type:code id: tags:
``` python
y = client.get(x.id)
```
%% Cell type:code id: tags:
``` python
assert len(y.friend) > 0
```
%% Cell type:code id: tags:
``` python
assert y.friend[0].name == "my friend"
assert y.name == "me"
assert y.age == 30
# One year later
y.age = 31
y.add_edge("friend", MyItem(name="my friend2", age=29))
y.update(client)
assert y.age == 31
assert len(y.friend) == 2
```
%% Output
updating MyItem (#a17c93d199f4128976d7eac3542b669b)
BULK: Writing 2/2 items/edges
400 Failure: JSON deserialization error payload.createEdges[0]._target: invalid type: null, expected a string at line 1 column 245
could not complete bulk action, aborting
%% Cell type:code id: tags:
``` python
y.friend
```
%% Output
[MyItem (#None), MyItem (#None)]
%% Cell type:code id: tags:
``` python
y.to_json(dates=False)
```
%% Output
{'id': 'a17c93d199f4128976d7eac3542b669b',
'deleted': False,
'name': 'me',
'age': 31,
'type': 'MyItem'}
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.imports import *
from hashlib import sha256
from pymemri.pod.db import DB
from pymemri.pod.utils import *
from pymemri.plugin.schema import *
from pymemri.test_utils import get_ci_variables
from pymemri.pod.api import PodAPI, PodError, DEFAULT_POD_ADDRESS, POD_VERSION
from typing import List, Union
import uuid
import urllib
from datetime import datetime
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
# Mapping from python type to schema type
# TODO move to data.schema once schema is refactored
TYPE_TO_SCHEMA = {
bool: "Bool",
str: "Text",
int: "Integer",
float: "Real",
datetime: "DateTime",
}
def __init__(
self,
url=DEFAULT_POD_ADDRESS,
version=POD_VERSION,
database_key=None,
owner_key=None,
auth_json=None,
register_base_schema=True,
verbose=False,
):
self.verbose = verbose
self.database_key = (
database_key if database_key is not None else self.generate_random_key()
)
self.owner_key = (
owner_key if owner_key is not None else self.generate_random_key()
)
self.api = PodAPI(
database_key=self.database_key,
owner_key=self.owner_key,
url=url,
version=version,
auth_json=auth_json,
verbose=verbose,
)
self.api.test_connection()
self.local_db = DB()
self.registered_classes = dict()
self.register_base_schemas()
self.sync_store = list()
self.sync_store = dict()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(
database_key=read_pod_key("database_key"),
owner_key=read_pod_key("owner_key"),
**kwargs,
)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(self):
assert self.add_to_schema(PluginRun, CVUStoredDefinition, Account, Photo)
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
self.local_db.add(node)
def reset_local_db(self):
self.local_db = DB()
def get_create_dict(self, node):
properties = node.to_json()
properties = {k: v for k, v in properties.items() if v != []}
return properties
def create(self, node):
create_dict = self.get_create_dict(node)
try:
result = self.api.create_item(create_dict)
node.id = result
self.add_to_db(node)
node.on_sync_to_pod(self)
return True
except Exception as e:
print(e)
return False
def create_photo(self, photo):
file = photo.file[0]
# create the photo
items_edges_success = self.bulk_action(
create_items=[photo, file], create_edges=photo.get_edges("file")
)
if not items_edges_success:
raise ValueError("Could not create file or photo item")
return self._upload_image(photo.data)
def _property_dicts_from_instance(self, node):
create_items = []
attributes = node.to_json()
for k, v in attributes.items():
if type(v) not in self.TYPE_TO_SCHEMA:
raise ValueError(f"Could not add property {k} with type {type(v)}")
value_type = self.TYPE_TO_SCHEMA[type(v)]
create_items.append(
{
"type": "ItemPropertySchema",
"itemType": attributes["type"],
"propertyName": k,
"valueType": value_type,
}
)
return create_items
def _property_dicts_from_type(self, item):
create_items = []
for property, p_type in item.get_property_types().items():
p_type = self.TYPE_TO_SCHEMA[p_type]
create_items.append(
{
"type": "ItemPropertySchema",
"itemType": item.__name__,
"propertyName": property,
"valueType": p_type,
}
)
return create_items
def add_to_schema(self, *items: List[Union[object, type]]):
create_items = []
for item in items:
if isinstance(item, type):
property_dicts = self._property_dicts_from_type(item)
else:
property_dicts = self._property_dicts_from_instance(item)
item = type(item)
create_items.extend(property_dicts)
self.registered_classes[item.__name__] = item
try:
self.api.bulk(create_items=create_items)
return True
except Exception as e:
print(e)
return False
def _upload_image(self, img):
if isinstance(img, np.ndarray):
return self.upload_file(img.tobytes())
elif isinstance(img, bytes):
return self.upload_file(img)
else:
raise ValueError(f"Unknown image data type {type(img)}")
def upload_file(self, file):
try:
self.api.upload_file(file)
return True
except PodError as e:
# 409 = CONFLICT, file already exists
if e.status == 409:
return True
return False
def get_file(self, sha):
return self.api.get_file(sha)
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(
f"Could not load data of {photo} attached file item does not have data in pod"
)
return
photo.data = file
else:
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None:
return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
return self.bulk_action(create_edges=edges)
def delete_items(self, items):
return self.bulk_action(delete_items=items)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
@staticmethod
def gather_batch(items, start_idx, start_size=0, max_size=5000000):
idx = start_idx
total_size = start_size
batch_items = []
for i, x in enumerate(items):
if i < idx:
continue
elif len(str(x)) > max_size:
idx = i + 1
print("Could not add item: Item exceeds max item size")
elif total_size + len(str(x)) < max_size:
batch_items.append(x)
total_size += len(str(x))
idx = i + 1
else:
break
return batch_items, idx, total_size
def bulk_action(
self, create_items=None, update_items=None, create_edges=None, delete_items=None, partial_update=True
):
all_items = []
if create_items:
all_items += create_items
if update_items:
all_items += update_items
for item in all_items:
item._set_client(self)
# we need to set the id to not lose the reference
if create_items is not None:
for c in create_items:
if c.id is None:
c.id = uuid.uuid4().hex
create_items = (
[self.get_create_dict(i) for i in create_items]
if create_items is not None
else []
)
update_items = (
[self.get_update_dict(i, partial_update=partial_update) for i in update_items]
if update_items is not None
else []
)
create_edges = (
[self.get_create_edge_dict(i) for i in create_edges]
if create_edges is not None
else []
)
# Note: skip delete_items without id, as items that are not in pod cannot be deleted
delete_items = (
[item.id for item in delete_items if item.id is not None]
if delete_items is not None
else []
)
n_total = len(create_items + update_items + create_edges + delete_items)
n = 0
i_ci, i_ui, i_ce, i_di = 0, 0, 0, 0
while not (
i_ci == len(create_items)
and i_ui == len(update_items)
and i_ce == len(create_edges)
and i_di == len(delete_items)
):
batch_size = 0
create_items_batch, i_ci, batch_size = self.gather_batch(
create_items, i_ci, start_size=batch_size
)
update_items_batch, i_ui, batch_size = self.gather_batch(
update_items, i_ui, start_size=batch_size
)
delete_items_batch, i_di, batch_size = self.gather_batch(
delete_items, i_di, start_size=batch_size
)
if i_ci == len(create_items):
create_edges_batch, i_ce, batch_size = self.gather_batch(
create_edges, i_ce, start_size=batch_size
)
else:
create_edges_batch = []
n_batch = len(
create_items_batch
+ update_items_batch
+ create_edges_batch
+ delete_items_batch
)
n += n_batch
print(f"BULK: Writing {n}/{n_total} items/edges")
try:
result = self.api.bulk(
create_items_batch,
update_items_batch,
create_edges_batch,
delete_items_batch,
)
except PodError as e:
print(e)
print("could not complete bulk action, aborting")
return False
print(f"Completed Bulk action, written {n} items/edges")
for item in all_items:
item.on_sync_to_pod(self)
return True
def get_create_edge_dict(self, edge):
return {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
def create_edge(self, edge):
edge_dict = self.get_create_edge_dict(edge)
try:
self.api.create_edge(edge_dict)
return True
except PodError as e:
print(e)
return False
def get(self, id, expanded=True, include_deleted=False):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
raise ValueError(f"Item with id {id} does not exist")
elif res.deleted and not include_deleted:
print(f"Item with id {id} has been deleted")
return
return res
def get_all_items(self):
raise NotImplementedError()
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id, include_deleted=False):
item = self.get(id, expanded=False, include_deleted=include_deleted)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
def get_edges(self, id):
try:
result = self.api.get_edges(id)
for d in result:
d["item"] = self.item_from_json(d["item"])
return result
except PodError as e:
print(e)
return
def _get_item_with_properties(self, id):
try:
result = self.api.get_item(str(id))
if not len(result):
return
return self.item_from_json(result[0])
except PodError as e:
print(e)
return
def get_update_dict(self, node, partial_update=True):
properties = node.to_json(dates=False)
properties.pop("type", None)
properties.pop("deleted", None)
properties = {k: v for k, v in properties.items() if k=="id" or k in node._updated_properties}
return properties
def update_item(self, node, partial_update=True):
data = self.get_update_dict(node, partial_update=partial_update)
try:
self.api.update_item(data)
node.on_sync_to_pod(self)
return True
except PodError as e:
print(e)
return False
def exists(self, id):
try:
result = self.api.get_item(str(id))
if isinstance(result, list) and len(result) > 0:
return True
return False
except PodError as e:
print(e)
return False
def search_paginate(self, fields_data, limit=50, include_edges=True, add_to_local_db: bool = True):
if "ids" in fields_data:
raise NotImplementedError("Searching by multiple IDs is not implemented for paginated search.")
extra_fields = {"[[edges]]": {}} if include_edges else {}
query = {**fields_data, **extra_fields}
try:
for page in self.api.search_paginate(query, limit):
result = [self._item_from_search(item, add_to_local_db=add_to_local_db) for item in page]
yield self.filter_deleted(result)
except PodError as e:
print(e)
def search(self, fields_data, include_edges: bool = True, add_to_local_db: bool = True):
extra_fields = {"[[edges]]": {}} if include_edges else {}
query = {**fields_data, **extra_fields}
# Special key "ids" for searching a list of ids.
# Requires /bulk search instead of /search.
if "ids" in query:
ids = query.pop("ids")
bulk_query = [{"id": uid, **query} for uid in ids]
try:
result = self.api.bulk(search=bulk_query)["search"]
except PodError as e:
print(e)
result = [item for sublist in result for item in sublist]
else:
try:
result = self.api.search(query)
except PodError as e:
print(e)
result = [self._item_from_search(item, add_to_local_db=add_to_local_db) for item in result]
return self.filter_deleted(result)
def _item_from_search(self, item_json: dict, add_to_local_db: bool = True):
# search returns different fields w.r.t. edges compared to `get` api,
# different method to keep `self.get` clean.
item = self.item_from_json(item_json, add_to_local_db=add_to_local_db)
for edge_json in item_json.get("[[edges]]", []):
edge_name = edge_json["_edge"]
try:
edge_item = self.item_from_json(edge_json["_item"], add_to_local_db=add_to_local_db)
item.add_edge(edge_name, edge_item)
except Exception as e:
print(f"Could not attach edge {edge_json['_item']} to {item}")
print(e)
continue
return item
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return self.search(query)[0]
def item_from_json(self, json, add_client_ref: bool = True, from_pod: bool = True, add_to_local_db: bool = True):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(
json["type"],
plugin_class,
plugin_package=plugin_package,
extra=self.registered_classes,
)
new_item = constructor.from_json(json)
existing = self.local_db.get(new_item.id)
# TODO: cleanup
if existing is not None and add_to_local_db:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
result = existing
else:
result = new_item
result._set_client(self) if add_client_ref else None
if from_pod:
result._in_pod = True
result._updated_properties = set()
return result
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties:
del properties[ALL_EDGES]
return properties
def send_email(self, to, subject="", body=""):
try:
self.api.send_email(to, subject, body)
print(f"succesfully sent email to {to}")
return True
except PodError as e:
print(e)
return False
def add_to_sync(self, item):
item.create_id(overwrite=False)
self.sync_store[item.id] = item
def sync(self, priority=None):
all_items = list(self.sync_store.values())
all_ids = set(self.sync_store.keys())
create_items = []
update_items = []
update_ids = set()
create_edges = []
for item in self.sync_store:
if item._in_pod and item not in update_ids:
# Add items from sync store
for item in all_items:
if item._in_pod:
update_items.append(item)
if item.id is None:
raise ValueError("Attempted to sync an existing item without item ID.")
update_ids.add(item.id)
else:
create_items.append(item)
existing_items = self.search({"ids": list(update_ids)}, add_to_local_db=False)
if expanded:
# Add edges and items connected to edges
for edge in item._new_edges:
tgt = edge.target
if tgt.id in all_ids:
continue
elif tgt._in_pod:
update_items.append(tgt)
else:
create_items.append(tgt)
create_edges.extend(item._new_edges)
item._new_edges = list()
update_ids = [item.id for item in update_items]
existing_items = self.search({"ids": update_ids}, add_to_local_db=False)
existing_items = {item.id: item for item in existing_items}
update_items = [
self._resolve_existing_item(item, existing_items[item.id], priority) for item in update_items
]
create_edges = []
for item in self.sync_store:
create_edges.extend(item._new_edges)
item._new_edges = list()
return self.bulk_action(
success = self.bulk_action(
create_items=create_items,
update_items=update_items,
create_edges=create_edges,
)
if success:
self.sync_store = dict()
return success
def _resolve_existing_item(self, local_item, remote_item, priority=None):
if priority == None:
if priority == "error":
for prop in local_item.properties:
local_val = getattr(local_item, prop)
remote_val = getattr(remote_item, prop)
if local_val is not None and remote_val is not None and local_val != remote_val:
raise ValueError("Difference between local and remote item")
elif priority.lower() == "remote":
for prop in local_item.properties:
local_val = getattr(local_item, prop)
remote_val = getattr(remote_item, prop)
if remote_val is not None:
setattr(local_item, prop, remote_val)
elif priority.lower() == "local":
for prop in local_item.properties:
local_val = getattr(local_item, prop)
remote_val = getattr(remote_item, prop)
if local_val is None and remote_val is not None:
setattr(local_item, prop, remote_val)
else:
raise ValueError(f"Unknown overwrite method: {overwrite}")
return local_item
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys.
If you want to use the same keys for different `PodClient` instances, you can store a random key pair locally with the `store_keys` CLI, and create a new client with `PodClient.from_local_keys()`. When you are using the app, setting the keys in the pod, and passing them when calling a plugin is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
client.registered_classes["Photo"]
```
%% Output
pymemri.data.photo.Photo
%% Cell type:code id: tags:
``` python
# hide
success = client.api.test_connection()
assert success
```
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. To create an item in the pod, you have to add the schema first. Schemas can be added as follows
%% Cell type:code id: tags:
``` python
from pymemri.data.schema import EmailMessage, Address, PhoneNumber
succes = client.add_to_schema(EmailMessage, Address, PhoneNumber)
```
%% Cell type:code id: tags:
``` python
item = EmailMessage(content="c1", subject="s1")
client.create(item)
def setup_sync_test(client):
existing = [EmailMessage(content=f"content_{i}") for i in range(3)]
new_items = [EmailMessage(content=f"content_{i+3}") for i in range(3)]
client.bulk_action(create_items=existing)
all_items = existing + new_items
for item in all_items:
print(item)
item.store(client)
# CHange existing property
existing[0].content = "changed_0"
# Add new property
for i, item in enumerate(all_items):
item.title = f"title_{i}"
item.subject="s2"
return all_items
```
%% Cell type:code id: tags:
item2 = client.search({"id": item.id}, add_to_local_db=False)[0]
item2.to_json()
``` python
# Test sync, local priority
all_items = setup_sync_test(client)
client.sync(priority="local")
item = client._resolve_existing_item(item, item2, "remote")
item.to_json()
for i, item in enumerate(all_items):
if i==0:
assert item.content == f"changed_{i}"
else:
assert item.content == f"content_{i}"
assert item.title == f"title_{i}"
assert item._in_pod
```
%% Output
{'id': 'e0795b12b9b39d03efe4ace560130549',
'dateCreated': 1643020747248,
'dateModified': 1643020747248,
'dateServerModified': 1643020747248,
'deleted': False,
'content': 'c1',
'subject': 's1',
'type': 'EmailMessage'}
BULK: Writing 3/3 items/edges
Completed Bulk action, written 3 items/edges
EmailMessage (#3b9b1eadc82a4a57b5aa60846917c14e)
EmailMessage (#6c821e41cde7464e9568d82d22d55dbb)
EmailMessage (#a6edae59a6bf4e9c9b837c277faf50ea)
EmailMessage (#None)
EmailMessage (#None)
EmailMessage (#None)
BULK: Writing 6/6 items/edges
Completed Bulk action, written 6 items/edges
%% Cell type:code id: tags:
``` python
existing = [EmailMessage(content=f"existing_{i}", subject=f"existing_{i}") for i in range(3)]
new_items = [EmailMessage(content=f"new_{i}", subject=f"new_{i}") for i in range(3)]
# Test sync, local priority
all_items = setup_sync_test(client)
client.sync(priority="remote")
client.bulk_action(create_items=existing)
for i, item in enumerate(all_items):
assert item.content == f"content_{i}"
assert item.title == f"title_{i}"
assert item._in_pod
```
%% Output
BULK: Writing 3/3 items/edges
Completed Bulk action, written 3 items/edges
EmailMessage (#f3cd088e54824a278df0f21202f3e6b5)
EmailMessage (#01ff5e51bb234b428a5402c2f68b0d33)
EmailMessage (#cf3b294e5e8847a8bedc1568c452042d)
EmailMessage (#None)
EmailMessage (#None)
EmailMessage (#None)
BULK: Writing 6/6 items/edges
Completed Bulk action, written 6 items/edges
True
%% Cell type:code id: tags:
``` python
# Test sync, no priority
# Note: should throw an error, since there is a sync conflict
all_items = setup_sync_test(client)
try:
client.sync()
except ValueError as e:
assert e.args[0] == "Difference between local and remote item"
```
%% Output
BULK: Writing 3/3 items/edges
Completed Bulk action, written 3 items/edges
EmailMessage (#b505fbf4aa7b4b049b9312d4db512b55)
EmailMessage (#7819cc98b72f4662a8802de99c70480c)
EmailMessage (#93d112c26fd74cc791a3052474ba557b)
EmailMessage (#None)
EmailMessage (#None)
EmailMessage (#None)
%% Cell type:code id: tags:
``` python
existing[0].content = "changed_content"
client.sync(priority="local")
```
%% Output
BULK: Writing 6/6 items/edges
Completed Bulk action, written 6 items/edges
all_items = existing + new_items
for item in all_items:
item.store(client)
True
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
```
%% Cell type:markdown id: tags:
We can now create an item with one of the above item definitions. As a side-effect, our item will be assigned an id.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
print(email_item.id)
```
%% Output
6dd5e9eaeee6d16553778c77e17df61c
%% Cell type:markdown id: tags:
The types of items in the pod are not limited to definitions to the pymemri schema. We can easily define our own types, or overwrite existing item definitions with the same `add_to_schema` method.
Note that all keyword arguments need to be added to the `properties` class variable to let the pod know what the properties of our item are. Additionally, properties in the Pod are statically typed, and have to be inferred from type the annotations of our `__init__` method.
%% Cell type:code id: tags:
``` python
# export
class Dog(Item):
properties = Item.properties + ["name", "age", "bites", "weight"]
def __init__(self, name: str=None, age: int=None, bites: bool=False, weight: float=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.bites = bites
self.weight = weight
```
%% Cell type:code id: tags:
``` python
client.add_to_schema(Dog)
dog2 = Dog(name="bob", age=3, weight=33.2)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
# hide
client.reset_local_db()
dog_from_db = client.get(dog2.id)
assert dog_from_db.name == "bob"
assert dog_from_db.age == 3
assert dog_from_db.weight == 33.2
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
print(client.get_edges(email_item.id))
```
%% Output
[{'item': Person (#34ae22888865800caafe6696c92a4958), 'name': 'sender'}]
%% Cell type:code id: tags:
``` python
# hide
assert item_succes
assert edge_succes
```
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
print(email_from_db.sender)
```
%% Output
[Person (#34ae22888865800caafe6696c92a4958)]
%% Cell type:code id: tags:
``` python
# hide
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
client.create(person_item)
# Retrieve person from Pod
person_from_db = client.get(person_item.id, expanded=False)
```
%% Cell type:code id: tags:
``` python
# hide
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
# item._in_pod check
other_person = Person(firstName="test")
other_person._in_pod = False
client.create(other_person)
assert other_person._in_pod
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
print(person_from_db.lastName)
```
%% Output
Awesome
%% Cell type:code id: tags:
``` python
# hide
assert person_from_db.lastName == "Awesome"
```
%% Cell type:code id: tags:
``` python
# hide
assert person_from_db._in_pod
# Test partial updating
assert len(person_from_db._updated_properties) == 0
# Change property
person_from_db.displayName = "AliceAwesome"
assert "displayName" in person_from_db._updated_properties
client.update_item(person_from_db)
assert len(person_from_db._updated_properties) == 0
# Change non-property
person_from_db.non_property = "test"
assert len(person_from_db._updated_properties) == 0
# Empty update
print(list(client.get_update_dict(person_from_db).keys()) == ["id"])
assert client.update_item(person_from_db)
```
%% Output
True
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:markdown id: tags:
the `PodClient` can search through the pod with the `search` or `search_paginate` methods, which return the results of a search as a list or generator respectively. Search uses the same arguments as the Pod search API, which can be found [here](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#post-v4owner_keysearch).
To display how search works, we first add a few new items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
person_account = Account(service="testService")
client.create(person_item2)
client.create(person_account)
person_item2.add_edge("account", person_account)
client.create_edges(person_item2.get_edges("account"));
```
%% Output
BULK: Writing 1/1 items/edges
Completed Bulk action, written 1 items/edges
%% Cell type:code id: tags:
``` python
# hide
client.reset_local_db()
```
%% Cell type:code id: tags:
``` python
# Search for all Persons in the pod
all_people = client.search({"type": "Person"}, include_edges=True)
print("Number of results:", len(all_people))
```
%% Output
Number of results: 4
%% Cell type:code id: tags:
``` python
# hide
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert any([len(p.account) for p in all_people])
assert all([item._in_pod for item in all_people])
```
%% Cell type:code id: tags:
``` python
# hide
# search without returning edges
all_people = client.search({"type": "Person"}, include_edges=False)
assert len(all_people)
assert([len(person.get_all_edges())==0 for person in all_people])
```
%% Cell type:code id: tags:
``` python
# hide
assert len(all_people)
```
%% Cell type:code id: tags:
``` python
# hide
# Search with edges
all_people = client.search({"type": "Person"}, include_edges=True)
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert any([len(p.account) for p in all_people])
```
%% Cell type:code id: tags:
``` python
# Search by IDs
ids = [person.id for person in all_people]
result = client.search({"ids": ids})
assert [item.id for item in result] == ids
```
%% Cell type:markdown id: tags:
To hande large volumes of Items, the `PodClient.search_paginate` method can search through the pod and return a generator which yields batches of items. This method uses the same search arguments as the `search` method:
%% Cell type:code id: tags:
``` python
# Create 100 accounts to search
client.bulk_action(
create_items=[
Account(identifier=str(i), service="paginate_test") for i in range(100)
]
)
generator = client.search_paginate({"type": "Account", "service": "paginate_test"}, limit=10)
for page in generator:
# process accounts
pass
```
%% Output
BULK: Writing 100/100 items/edges
Completed Bulk action, written 100 items/edges
%% Cell type:code id: tags:
``` python
# hide
# Test pagination
accounts = client.search({"type": "Account", "service": "paginate_test"})
generator = client.search_paginate({"type": "Account", "service": "paginate_test"}, limit=10)
accounts_paginated = []
for page in generator:
accounts_paginated.extend(page)
assert len(accounts_paginated) == 100
assert [a.id for a in accounts] == [a.id for a in accounts_paginated]
assert all(a._in_pod for a in accounts_paginated)
```
%% Cell type:code id: tags:
``` python
# hide
# Search, edge cases
result = client.search({"type": "Account", "service": "NonExistentService"})
assert result == []
paginator = client.search_paginate({"type": "Account", "service": "NonExistentService"})
try:
next(paginator)
except Exception as e:
assert isinstance(e, StopIteration)
```
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2)
last_added = client.search_last_added(type="Person")
```
%% Cell type:code id: tags:
``` python
# hide
assert last_added.firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files like Photos or Videos, the `PodClient` has a separate file api. This api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
For example, we can upload a photo with the file API as follows:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(photo.data)
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
assert succes2
data = client.get_file(file.sha256)
photo.data = data
arr = photo.to_np()
assert (arr == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
The PodClient implements an easier API for photos separately, which uses the same file API under the hood
%% Cell type:code id: tags:
``` python
print(client.registered_classes["Photo"])
# client.add_to_schema(Photo)
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
client.create_photo(photo);
photo.file
```
%% Output
<class 'pymemri.data.photo.Photo'>
BULK: Writing 3/3 items/edges
Completed Bulk action, written 3 items/edges
[File (#f96a025c593348668efc8f8724314921)]
%% Cell type:code id: tags:
``` python
# hide
assert photo._in_pod
res = client.get_photo(photo.id)
print(res.id)
res.file[0].sha256
assert (res.to_np() == x).all()
```
%% Output
06f9e5d9b50f44aab3dcf8da2beceb91
%% Cell type:markdown id: tags:
Some photos come as bytes, for example when downloading them from a third party service. We can use `photo.from_bytes` to initialize these photos:
%% Cell type:code id: tags:
``` python
byte_photo = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xe1\x00\x00\x00\xe1\x08\x03\x00\x00\x00\tm"H\x00\x00\x003PLTE\x04\x02\x04\x00\x00\x00\xa0\xa0\xa0\xa3\xa3\xa3\xaa\xaa\xaa\xb4\xb4\xb4\xbd\xbe\xbd\xbb\xbc\xbb\xde\xde\xde\x9b\x9a\x9b\xfe\xfe\xfe\xf2\xf3\xf2\xe5\xe6\xe5\xd8\xd9\xd8\xd1\xd1\xd1\xc9\xca\xc9\xae\xae\xae\x80k\x98\xfc\x00\x00\x01TIDATx\x9c\xed\xdd;r\xc2P\x00\x04A!\x90\x84\xfc\x01\xee\x7fZ\x138\xb1\x13S\xceF\xaf\xfb\x06\x93o\xd5No\xef\x1f\x9f\xb7\xfb}]\xd7my\xba|;\xff4\xff\xdf\xf9O\x97W<\x96W\xac\xbfm\xd7i9\x1d\xdb\xfe,\x9c\x8e\xec4+\xac{\x16^\x14\xb6)\xecS\xd8\xa7\xb0Oa\x9f\xc2\xbe!\n\xcf\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xce\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87\xc4bHa\x9c\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x86xaQ\x18\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87D\xe1\xe3\xf0\x85\x8b\xc26\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xae\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14n\xa7c\xdb\xa7\xeb>\x1f\xd9~\xfb\x02\xee\x7f\r\xe5\xe1h\x04"\x00\x00\x00\x00IEND\xaeB`\x82'
photo = Photo.from_bytes(byte_photo)
client.create_photo(photo);
```
%% Output
BULK: Writing 3/3 items/edges
Completed Bulk action, written 3 items/edges
%% Cell type:code id: tags:
``` python
# hide
# Test on a new client to prevent caching
new_client = PodClient(database_key=client.database_key, owner_key=client.owner_key)
res = new_client.get_photo(photo.id, size=225)
assert res.data == photo.data
```
%% Cell type:markdown id: tags:
## Bulk API
%% Cell type:markdown id: tags:
Adding each item separately to the pod with the `create` method can take a lot of time. For this reason, using the bulk API is faster and more convenient in most cases. Here we show creating items and edges, updating and deleting is also possible.
%% Cell type:code id: tags:
``` python
# Create 100 Dogs to add to the pod, and two edges to a new person
dogs = [Dog(name=f"dog number {i}") for i in range(100)]
person = Person(firstName="Alice")
edge1 = Edge(dogs[0], person, "label")
edge2 = Edge(dogs[1], person, "label")
# Simultaneously add the dogs, person, and edges with the bulk API
success = client.bulk_action(create_items=dogs + [person], create_edges=[edge1, edge2])
```
%% Output
BULK: Writing 103/103 items/edges
Completed Bulk action, written 103 items/edges
%% Cell type:code id: tags:
``` python
# hide
dogs = client.search({"type": "Dog"})
dogs_with_edge = [item for item in dogs if len(item.get_all_edges())]
print(len(dogs_with_edge))
assert len(dogs_with_edge) == 2
for d in dogs_with_edge:
assert len(d.label) > 0
```
%% Output
2
%% Cell type:code id: tags:
``` python
# hide
# test bulk delete and update
person.firstName = "Bob"
to_delete = [dogs[0]]
to_update = [person]
client.bulk_action(delete_items=to_delete, update_items=to_update)
dogs_with_edge = [
item for item in client.search({"type": "Dog"}) if item.name.startswith("dog number 0") or item.name.startswith("dog number 1 ")
]
assert len(dogs_with_edge) == 1
dog = dogs_with_edge[0]
assert dog.label[0].firstName == "Bob"
```
%% Output
BULK: Writing 2/2 items/edges
Completed Bulk action, written 2 items/edges
%% Cell type:markdown id: tags:
# Sending emails -
%% Cell type:code id: tags:
``` python
# hide
# skip
to = "myemail@gmail.com"
client.send_email(to=to, subject="test", body="test2")
```
%% Output
succesfully sent email to myemail@gmail.com
True
%% Cell type:markdown id: tags:
# Create items that do not exist in the Pod
The `PodClient` can deduplicate items with the same externalId with the `create_if_external_id_not_exists` method.
%% Cell type:code id: tags:
``` python
person_item = Person(firstName="Eve", externalId="gmail_1")
person_item2 = Person(firstName="Eve2", externalId="gmail_1")
client.create_if_external_id_not_exists(person_item)
client.create_if_external_id_not_exists(person_item2)
existing = client.search({"externalId": "gmail_1"})
```
%% Cell type:code id: tags:
``` python
# hide
assert len(existing) == 1
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
......
......@@ -7,6 +7,7 @@ __all__ = ['ALL_EDGES', 'Edge', 'ItemBase', 'Item']
from typing import Optional, Dict, List, Set
from ..imports import *
from datetime import datetime
import uuid
ALL_EDGES = "allEdges"
SOURCE, TARGET, TYPE, EDGE_TYPE, LABEL, SEQUENCE = "_source", "_target", "_type", "_type", "label", "sequence"
......@@ -74,6 +75,7 @@ class ItemBase:
self.id: Optional[str] = id
self._client: Optional["PodClient"] = None
self._in_pod: bool = False
self._new_edges = list()
def _set_client(self, client: "PodClient"):
if self._client is not None and self._client != client:
......@@ -152,10 +154,15 @@ class ItemBase:
def exists(self, api):
return api.exists(self.id) if self.id else None
def create_id(self, overwrite=False):
if not overwrite and self.id is not None:
return
self.id = uuid.uuid4().hex
def store(self, client: "PodClient" = None):
if client:
if client is not None:
self._set_client(client)
self._client.sync_store.append(self)
self._client.add_to_sync(self)
# def expand(self, api):
# """Expands a node (retrieves all directly connected nodes ands adds to object)."""
......
......@@ -60,7 +60,7 @@ class PodClient:
self.local_db = DB()
self.registered_classes = dict()
self.register_base_schemas()
self.sync_store = list()
self.sync_store = dict()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
......@@ -218,6 +218,7 @@ class PodClient:
def delete_items(self, items):
return self.bulk_action(delete_items=items)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
......@@ -527,27 +528,33 @@ class PodClient:
print(e)
return False
def add_to_sync(self, item):
item.create_id(overwrite=False)
self.sync_store[item.id] = item
def sync(self, priority=None):
create_items = []
update_items = []
update_ids = []
update_ids = set()
for item in self.sync_store:
if item._in_pod:
for item in self.sync_store.values():
if item._in_pod and item not in update_ids:
update_items.append(item)
if item.id is None:
raise ValueError("Attempted to sync an existing item without item ID.")
update_ids.append(item.id)
update_ids.add(item.id)
else:
create_items.append(item)
existing_items = self.search({"ids": update_ids}, add_to_local_db=False)
existing_items = self.search({"ids": list(update_ids)}, add_to_local_db=False)
existing_items = {item.id: item for item in existing_items}
update_items = [self._resolve_existing_item(item, existing_items[item.id], priority)]
update_items = [
self._resolve_existing_item(item, existing_items[item.id], priority) for item in update_items
]
create_edges = []
for item in self.sync_store:
for item in self.sync_store.values():
create_edges.extend(item._new_edges)
item._new_edges = list()
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment