Commit 34c8937b authored by Koen van der Veen's avatar Koen van der Veen
Browse files

add to_json function to item

Showing with 98 additions and 59 deletions
+98 -59
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp data.itembase
```
%% Cell type:markdown id: tags:
# Itembase
%% Cell type:markdown id: tags:
Any data class in pymemri inherits from `Item`. It is a base class for items with some handy functionalities to create new items and edges, retrieve all edges to other items, and sync with the pod.
%% Cell type:code id: tags:
``` python
# export
# hide
from typing import Optional
from pymemri.imports import *
ALL_EDGES = "allEdges"
SOURCE, TARGET, TYPE, EDGE_TYPE, LABEL, SEQUENCE = "_source", "_target", "_type", "_type", "label", "sequence"
```
%% Cell type:code id: tags:
``` python
#hide
from nbdev.showdoc import *
```
%% Cell type:code id: tags:
``` python
# export
# hide
def parse_base_item_json(json):
id = json.get("id", None)
dateAccessed = json.get("dateAccessed", None)
dateCreated = json.get("dateCreated", None)
dateModified = json.get("dateModified", None)
deleted = json.get("deleted", None)
externalId = json.get("externalId", None)
itemDescription = json.get("itemDescription", None)
starred = json.get("starred", None)
version = json.get("version", None)
return id, dateAccessed, dateCreated, dateModified, deleted, externalId, itemDescription, starred, version, None, None
```
%% Cell type:code id: tags:
``` python
# export
class Edge():
"""Edges makes a link between two `ItemBase` Items. You won't use this class a lot in practice, as edges are
abstracted away for normal users. When items are retrieved from the database, the edges are parsed automatically.
When you add an edge between to items within pymemri, you will often use `ItemBase.add_edge`"""
def __init__(self, source, target, _type, label=None, sequence=None, created=False, reverse=True):
self.source = source
self.target = target
self._type = _type
self.label = label
self.sequence = sequence
self.created = created
self.reverse = reverse
@classmethod
def from_json(cls, json):
from pymemri.data.schema import get_constructor
# we only set the target here
_type = json[EDGE_TYPE]
json_target = json[TARGET]
target_type = json_target["_type"]
plugin_class = json_target.get("pluginClass", None)
target_constructor = get_constructor(target_type, plugin_class)
target = target_constructor.from_json(json_target)
return cls(source=None, target=target, _type=_type)
def __repr__(self):
return f"{self.source} --{self._type}-> {self.target}"
def update(self, api):
if self.created:
api.create_edges([self])
def __eq__(self, other):
return self.source is other.source and self.target is other.target \
and self._type == other._type and self.reverse == other.reverse and self.created == other.created \
and self.label == other.label
def traverse(self, start):
"""We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function."""
if start == self.source:
return self.target
elif start == self.target:
return self.source
else:
raise ValueError
```
%% Cell type:code id: tags:
``` python
show_doc(Edge.traverse)
```
%% Output
<h4 id="Edge.traverse" class="doc_header"><code>Edge.traverse</code><a href="__main__.py#L39" class="source_link" style="float:right">[source]</a></h4>
> <code>Edge.traverse</code>(**`start`**)
We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function.
%% Cell type:code id: tags:
``` python
ITEMBASE_PROPERTIES = ["dateAccessed", "dateCreated", "dateModified", "deleted", "externalId", "itemDescription",
"starred", "version", "id", "importJson", "name", "repository", "icon", "bundleImage",
"runDestination", "pluginClass"]
```
%% Cell type:code id: tags:
``` python
# export
# hide
class ItemBase():
"""Provides a base class for all items.
All items in the schema inherit from this class, and it provides some
basic functionality for consistency and to enable easier usage."""
def __init__(self, id: str = None):
self.id: Optional[str] = id
def __getattribute__(self, name):
val = object.__getattribute__(self, name)
if isinstance(val, Edge):
edge = val
return edge.traverse(start=self)
if isinstance(val, list) and len(val) > 0 and isinstance(val[0], Edge):
edges = val
return [edge.traverse(start=self) for edge in edges]
else:
return val
def add_edge(self, name, val):
"""Creates an edge of type name and makes it point to val"""
val = Edge(self, val, name, created=True)
if name not in self.__dict__:
raise NameError(f"object {self} does not have edge with name {name}")
existing = object.__getattribute__(self, name)
if val not in existing:
res = existing + [val]
self.__setattr__(name, res)
def is_expanded(self):
"""returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object."""
return len(self.get_all_edges()) > 0
def get_edges(self, name):
return object.__getattribute__(self, name)
def get_all_edges(self):
return [e for attr in self.__dict__.values() if self.attr_is_edge(attr) for e in attr]
def get_all_edge_names(self):
return [k for k,v in self.__dict__.items() if self.attr_is_edge(v)]
def get_property_names(self):
return [k for k, v in self.__dict__.items() if not type(v) == list]
@staticmethod
def attr_is_edge(attr):
return isinstance(attr, list) and len(attr)>0 and isinstance(attr[0], Edge)
def update(self, api, edges=True, create_if_not_exists=True, skip_nodes=False):
if not self.exists(api):
print(f"creating {self}")
api.create(self)
else:
print(f"updating {self}")
api.update_item(self)
if edges:
api.create_edges(self.get_all_edges())
def exists(self, api):
return api.exists(self.id) if self.id else None
# def expand(self, api):
# """Expands a node (retrieves all directly connected nodes ands adds to object)."""
# self._expanded = True
# res = api.get(self.id, expanded=True)
# for edge_name in res.get_all_edge_names():
# edges = res.get_edges(edge_name)
# for e in edges:
# e.source = self
# self.__setattr__(edge_name, edges)
# # self.edges = res.edges
# return self
def __repr__(self):
id = self.id
_type = self.__class__.__name__
return f"{_type} (#{id})"
@classmethod
def from_data(cls, *args, **kwargs):
edges = dict()
new_kwargs = dict()
for k, v in kwargs.items():
if isinstance(v, ItemBase):
edge = Edge(None, v, k)
edges[k] = edge
new_kwargs[k] = edge
else:
new_kwargs[k] = v
res = cls(*args, **new_kwargs)
for v in edges.values():
v.source = res
return res
# def inherit_funcs(self, other):
# """This function can be used to inherit new functionalities from a subclass. This is a patch to solve
# the fact that python does provide extensions of classes that are defined in a different file that are
# dynamic enough for our use case."""
# assert issubclass(other, self.__class__)
# self.__class__ = other
```
%% Cell type:code id: tags:
``` python
# export
class Item(ItemBase):
"""Item is the baseclass for all of the data classes."""
properties = [
"dateAccessed",
"dateCreated",
"dateModified",
"deleted",
"externalId",
"itemDescription",
"starred",
"version",
"id",
"importJson",
"pluginClass",
]
edges = ["changelog", "label", "genericAttribute", "measure", "sharedWith"]
def __init__(
self,
dateAccessed: str = None,
dateCreated: str = None,
dateModified: str = None,
deleted: str = None,
externalId: str = None,
itemDescription: str = None,
starred: str = None,
version: str = None,
id: str = None,
importJson: str = None,
pluginClass: str = None,
changelog: list = None,
label: list = None,
genericAttribute: list = None,
measure: list = None,
sharedWith: list = None
):
super().__init__(id)
# Properties
self.dateAccessed: Optional[str] = dateAccessed
self.dateCreated: Optional[str] = dateCreated
self.dateModified: Optional[str] = dateModified
self.deleted: Optional[str] = deleted
self.externalId: Optional[str] = externalId
self.itemDescription: Optional[str] = itemDescription
self.starred: Optional[str] = starred
self.version: Optional[str] = version
self.importJson: Optional[str] = importJson
self.pluginClass: Optional[str] = pluginClass
# Edges
self.changelog: list = changelog if changelog is not None else []
self.label: list = label if label is not None else []
self.genericAttribute: list = genericAttribute if genericAttribute is not None else []
self.measure: list = measure if measure is not None else []
self.sharedWith: list = sharedWith if sharedWith is not None else []
@classmethod
def parse_json(self, cls, json):
property_kwargs = Item.parse_properties(cls, json)
edge_kwargs = Item.parse_edges(cls, json)
return {**property_kwargs, **edge_kwargs}
@classmethod
def parse_properties(self, cls, json):
return {p: json.get(p, None) for p in cls.properties}
@classmethod
def parse_edges(self, cls, json):
all_edges = json.get(ALL_EDGES, None)
edge_kwargs = dict()
reverse_edges = [f"~{e}" for e in cls.edges]
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge.type in self.edges + reverse_edges:
edge_name = self.remove_prefix(edge.type)
if edge_name in edge_kwargs:
edge_kwargs[edge_name] += [edge]
else:
edge_kwargs[edge_name] = [edge]
return edge_kwargs
@classmethod
def remove_prefix(s, prefix="~"):
return s[1:] if s[0] == "`" else s
@classmethod
def from_json(cls, json):
kwargs = Item.parse_json(cls, json)
res = cls(**kwargs)
for e in res.get_all_edges():
e.source = res
return res
def _get_schema_type(self):
for cls in self.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def to_json(self, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(self, "private", [])
for k, v in self.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type()
return res
```
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.add_edge)
```
%% Output
<h4 id="ItemBase.add_edge" class="doc_header"><code>ItemBase.add_edge</code><a href="__main__.py#L29" class="source_link" style="float:right">[source]</a></h4>
<h4 id="ItemBase.add_edge" class="doc_header"><code>ItemBase.add_edge</code><a href="__main__.py#L22" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.add_edge</code>(**`name`**, **`val`**)
Creates an edge of type name and makes it point to val
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.is_expanded)
```
%% Output
<h4 id="ItemBase.is_expanded" class="doc_header"><code>ItemBase.is_expanded</code><a href="__main__.py#L39" class="source_link" style="float:right">[source]</a></h4>
<h4 id="ItemBase.is_expanded" class="doc_header"><code>ItemBase.is_expanded</code><a href="__main__.py#L32" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.is_expanded</code>()
returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object.
%% Cell type:markdown id: tags:
# Usage
%% Cell type:markdown id: tags:
With the `Item` and `Edge` classes we can create an item and its surrounding graph. The schema is defined in schema.py, in general we want to use the from_data staticmethod to generate new items, because it ensures that edges are linked from both the source and the target object. Let's make a new item and add it to the pod.
%% Cell type:code id: tags:
``` python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges + ["friend"]
def __init__(self, name=None, age=None,friend=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.friend = fried if friend is not None else []
```
%% Cell type:code id: tags:
``` python
from pymemri.pod.client import PodClient
client = PodClient()
```
%% Cell type:code id: tags:
``` python
assert client.add_to_schema(MyItem(name="abc", age=1))
```
%% Cell type:code id: tags:
``` python
x = MyItem(name="me", age=30)
x.add_edge("friend", MyItem(name="my friend", age=31))
```
%% Cell type:markdown id: tags:
We can now create our `MyItem`, as a side-effect of creating it, it will receive an id
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
None
%% Cell type:code id: tags:
``` python
assert client.create(x)
```
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
bfec09ee3c0eee83e4fb4a63674704f5
100f7a6b5f7ec4fd17546c6a2d3c4336
%% Cell type:code id: tags:
``` python
y = client.get(x.id)
```
%% Cell type:code id: tags:
``` python
assert len(y.friend) > 0
```
%% Cell type:code id: tags:
``` python
assert y.friend[0].name == "my friend"
assert y.name == "me"
assert y.age == 30
# One year later
y.age = 31
y.add_edge("friend", MyItem(name="my friend2", age=29))
y.update(client)
assert y.age == 31
assert len(y.friend) == 2
```
%% Output
'my friend'
updating MyItem (#100f7a6b5f7ec4fd17546c6a2d3c4336)
<Response [400]> b'Failure: JSON deserialization error payload.createEdges[0]._target: invalid type: null, expected a string at line 1 column 245'
%% Cell type:code id: tags:
``` python
y.to_json(dates=False)
```
%% Output
{'id': '100f7a6b5f7ec4fd17546c6a2d3c4336',
'deleted': False,
'name': 'me',
'age': 31,
'type': 'MyItem'}
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.schema.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted test_utils.ipynb.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.data.photo import Photo, NUMPY, BYTES
from pymemri.imports import *
from hashlib import sha256
from pymemri.pod.db import DB
from pymemri.pod.utils import *
from pymemri.plugin.schema import *
from multidimensional_urlencode import urlencode
import uuid
import urllib
```
%% Cell type:code id: tags:
``` python
# export
DEFAULT_POD_ADDRESS = "http://localhost:3030"
POD_VERSION = "v4"
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
# Mapping from python type to schema type
# TODO move to data.schema once schema is refactored
TYPE_TO_SCHEMA = {
bool: "Bool",
str: "Text",
int: "Integer",
float: "Real"
}
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None, verbose=False, register_base_schema=True):
self.verbose = verbose
self.url = url
self.version = POD_VERSION
self.test_connection(verbose=self.verbose)
self.database_key=database_key if database_key is not None else self.generate_random_key()
self.owner_key=owner_key if owner_key is not None else self.generate_random_key()
self.base_url = f"{url}/{version}/{self.owner_key}"
self.auth_json = {"type":"ClientAuth","databaseKey":self.database_key} if auth_json is None \
else {**{"type": "PluginAuth"}, **auth_json}
self.local_db = DB()
self.registered_classes=dict()
self.register_base_schemas()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"), **kwargs)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(self):
try:
assert self.add_to_schema(
PluginRun("", "", "", status="", error="", targetItemId="", settings="", authUrl=""),
CVUStoredDefinition(name="", definition="", externalId=""),
Account(service="", identifier="", secret="", code="", accessToken="",
refreshToken="", errorMessage="", handle="", displayName=""),
Photo(width=1,height=1,channels=1,encoding="")
)
except Exception as e:
raise ValueError("Could not add base schema")
def test_connection(self, verbose=True):
try:
res = requests.get(self.url)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
print("Could no connect to backend")
return False
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
self.local_db.add(node)
def reset_local_db(self):
self.local_db = DB()
def get_create_dict(self, node):
properties = self.get_properties_json(node)
properties = node.to_json()
properties = {k:v for k, v in properties.items() if v != []}
return properties
def create(self, node):
create_dict = self.get_create_dict(node)
try:
body = {"auth": self.auth_json, "payload": create_dict}
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
self.add_to_db(node)
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def create_photo(self, photo):
# create the file
file_succes = self.create_photo_file(photo)
if file_succes == False:
raise ValueError("Could not create file")
# create the photo
return self.bulk_action(create_items=[photo], create_edges=photo.get_edges("file"))
def add_to_schema(self, *nodes):
# TODO instead of adding nodes: List[Item], add_to_schema should add types: List[type]
create_items = []
for node in nodes:
self.registered_classes[node.__class__.__name__] = type(node)
attributes = self.get_properties_json(node)
attributes = node.to_json()
for k, v in attributes.items():
if type(v) not in self.TYPE_TO_SCHEMA:
raise ValueError(f"Could not add property {k} with type {type(v)}")
value_type = self.TYPE_TO_SCHEMA[type(v)]
create_items.append({
"type": "ItemPropertySchema", "itemType": attributes["type"],
"propertyName": k, "valueType": value_type
})
body = {"auth": self.auth_json, "payload": {
"createItems": create_items
}}
try:
result = requests.post(f"{self.base_url}/bulk", json=body)
if result.status_code != 200:
print(result, result.content)
return False
except requests.exceptions.RequestException as e:
print(e)
return False
return True
def create_photo_file(self, photo):
file = photo.file[0]
self.create(file)
return self._upload_image(photo.data)
def _upload_image(self, img):
if isinstance(img, np.ndarray):
return self.upload_file(img.tobytes())
elif isinstance(img, bytes):
return self.upload_file(img)
else:
raise ValueError(f"Unknown image data type {type(img)}")
def upload_file(self, file):
# TODO: currently this only works for numpy images
if self.auth_json.get("type") == "PluginAuth":
# alternative file upload for plugins, with different authentication
return self.upload_file_b(file)
else:
try:
sha = sha256(file).hexdigest()
result = requests.post(f"{self.base_url}/upload_file/{self.database_key}/{sha}", data=file)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def upload_file_b(self, file):
try:
sha = sha256(file).hexdigest()
auth = urllib.parse.quote(json.dumps(self.auth_json))
result = requests.post(f"{self.base_url}/upload_file_b/{auth}/{sha}", data=file)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def get_file(self, sha):
# TODO: currently this only works for numpy images
try:
body= {"auth": self.auth_json,
"payload": {"sha256": sha}}
result = requests.post(f"{self.base_url}/get_file", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
return result.content
except requests.exceptions.RequestException as e:
print(e)
return None
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(f"Could not load data of {photo} attached file item does not have data in pod")
return
if photo.encoding == NUMPY:
data = np.frombuffer(file, dtype=np.uint8)
c = photo.channels
shape = (photo.height,photo.width, c) if c is not None and c > 1 else (photo.height, photo.width)
data = data.reshape(shape)
if size is not None: data = resize(data, size)
photo.data = data
return
elif photo.encoding == BYTES:
photo.data = file
return
else:
raise ValueError("Unsupported encoding")
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None: return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format:
[{"_type": "friend", "_source": 1, "_target": 2}]"""
return self.bulk_action(create_edges=edges)
# create_edges = []
# for e in edges:
# src, target = e.source.id, e.target.id
# if src is None or target is None:
# print(f"Could not create edge {e} missing source or target id")
# return False
# data = {"_source": src, "_target": target, "_name": e._type}
# if e.label is not None: data[LABEL] = e.label
# if e.sequence is not None: data[SEQUENCE] = e.sequence
# if e.reverse:
# data2 = copy(data)
# data2["_source"] = target
# data2["_target"] = src
# data2["_name"] = "~" + data2["_name"]
# create_edges.append(data2)
# create_edges.append(data)
# return self.bulk_action(create_items=[], update_items=[],create_edges=create_edges)
def delete_items(self, items):
ids = [i.id for i in items]
return self.bulk_action(delete_items=ids)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
def bulk_action(self, create_items=None, update_items=None, create_edges=None, delete_items=None):
# we need to set the id to not lose the reference
if create_items is not None:
for c in create_items:
if c.id is None: c.id = uuid.uuid4().hex
create_items = [self.get_create_dict(i) for i in create_items] if create_items is not None else []
update_items = [self.get_update_dict(i) for i in update_items] if update_items is not None else []
create_edges = [self.get_create_edge_dict(i) for i in create_edges] if create_edges is not None else []
# Note: skip delete_items without id, as items that are not in pod cannot be deleted
delete_items = [item.id for item in delete_items if item.id is not None] if delete_items is not None else []
edges_data = {"auth": self.auth_json, "payload": {
"createItems": create_items, "updateItems": update_items,
"createEdges": create_edges, "deleteItems": delete_items}}
try:
result = requests.post(f"{self.base_url}/bulk",
json=edges_data)
if result.status_code != 200:
if "UNIQUE constraint failed" in str(result.content):
print(result.status_code, "Edge already exists")
else:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def get_create_edge_dict(self, edge):
return {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
def create_edge(self, edge):
payload = self.get_create_edge_dict(edge)
body = {"auth": self.auth_json,
"payload": payload}
try:
result = requests.post(f"{self.base_url}/create_edge", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
return self.create_edges([edge])
def get(self, id, expanded=True):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
raise ValueError(f"Item with id {id} does not exist")
elif res.deleted == True:
print(f"Item with id {id} does not exist anymore")
return None
else:
return res
def get_all_items(self):
raise NotImplementedError()
try:
body = { "databaseKey": self.database_key, "payload":None}
result = requests.post(f"{self.base_url}/get_all_items", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
res = [self.item_from_json(x) for x in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
print(e)
return None
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id):
item = self.get(id, expanded=False)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
def get_edges(self, id):
body = {"payload": {"item": str(id),
"direction": "Outgoing",
"expandItems": True},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/get_edges", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
for d in json:
d["item"] = self.item_from_json(d["item"])
return json
except requests.exceptions.RequestException as e:
print(e)
return None
def _get_item_with_properties(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
if json == []:
return None
else:
res = self.item_from_json(json[0])
return res
except requests.exceptions.RequestException as e:
print(e)
return None
def get_properties_json(self, node, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(node, "private", [])
for k, v in node.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type(node)
return res
@staticmethod
def _get_schema_type(node):
for cls in node.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def get_update_dict(self, node):
properties = self.get_properties_json(node, dates=False)
properties = node.to_json(dates=False)
properties.pop("type", None)
properties.pop("deleted", None)
return properties
def update_item(self, node):
data = self.get_update_dict(node)
body = {"payload": data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/update_item",
json=body)
if result.status_code != 200:
print(result, result.content)
except requests.exceptions.RequestException as e:
print(e)
def exists(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
json = result.json()
if isinstance(json, list) and len(json) > 0:
return True
except requests.exceptions.RequestException as e:
print(e)
return None
def search(self, fields_data, include_edges: bool = True):
extra_fields = {'[[edges]]': {}} if include_edges else {}
body = {"payload": {**fields_data, **extra_fields},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/search", json=body)
json = result.json()
res = [self._item_from_search(item) for item in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
return None
def _item_from_search(self, item_json: dict):
# search returns different fields w.r.t. edges compared to `get` api,
# different method to keep `self.get` clean.
item = self.item_from_json(item_json)
for edge_json in item_json.get("[[edges]]", []):
edge_name = edge_json["_edge"]
edge_item = self.item_from_json(edge_json["_item"])
item.add_edge(edge_name, edge_item)
return item
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return self.search(query)[0]
def item_from_json(self, json):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(json["type"], plugin_class, plugin_package=plugin_package,
extra=self.registered_classes)
new_item = constructor.from_json(json)
existing = self.local_db.get(new_item.id)
# TODO: cleanup
if existing is not None:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
return existing
else:
return new_item
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties: del properties[ALL_EDGES]
return properties
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys. *Note that this will create a new database for your every time you create a PodClient, if you want to access the same database with multiple PodClients, you have to set the same keys* When you are using the app, setting the keys in the pod, and passing them when calling an integrator is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
success = client.test_connection()
assert success
```
%% Output
Succesfully connected to pod
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. When Initializing an Item, always make sure to use the from_data classmethod to initialize.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
email_item
```
%% Output
EmailMessage (#None)
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(email_item)
assert succes
```
%% Cell type:markdown id: tags:
We can now create our item. As a side-effect, our item will be assigned an id.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
```
%% Output
True
%% Cell type:code id: tags:
``` python
email_item.id
```
%% Output
'3173989e1917b4f6fde0e02b2300322f'
'58a2c9996b805849f3262e5ca9fbde16'
%% Cell type:markdown id: tags:
We can easily define our own types, and use them in the pod.
%% Cell type:code id: tags:
``` python
# export
class Dog(Item):
properties = Item.properties + ["name", "age", "bites", "weight"]
edges = Item.edges
def __init__(self, name=None, age=None, bites=False, weight=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.bites = bites
self.weight = weight
```
%% Cell type:code id: tags:
``` python
dog = Dog("", 0, True, 0.)
client.add_to_schema(dog)
dog2 = Dog(name="bob", age=3, weight=33.2)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
client.reset_local_db()
dog_from_db = client.get(dog2.id)
```
%% Cell type:code id: tags:
``` python
assert dog_from_db.name == "bob"
assert dog_from_db.age == 3
assert dog_from_db.weight == 33.2
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
assert item_succes and edge_succes
```
%% Cell type:code id: tags:
``` python
client.get_edges(email_item.id)
```
%% Output
[{'item': Person (#e2f05d9528cfe6ee9c72f3a3a5fe291d), 'name': 'sender'}]
[{'item': Person (#0a02555567dd1c896c97a5aae0c5ed48), 'name': 'sender'}]
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
```
%% Cell type:code id: tags:
``` python
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
assert client.create(person_item)
```
%% Cell type:code id: tags:
``` python
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db.lastName == "Awesome"
```
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
person_account = Account(service="testService")
client.create(person_item2)
client.create(person_account)
person_item2.add_edge("account", person_account)
client.create_edges(person_item2.get_edges("account"))
```
%% Output
True
%% Cell type:code id: tags:
``` python
# search without edges
# To test search with and without include_edges, make a fresh client (no local_db)
new_client = PodClient(owner_key=client.owner_key, database_key=client.database_key)
all_people = new_client.search({"type": "Person"}, include_edges=False)
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert all([len(p.account)==0 for p in all_people])
```
%% Cell type:code id: tags:
``` python
# Search with edges
all_people = new_client.search({"type": "Person"}, include_edges=True)
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert any([len(p.account) for p in all_people])
```
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2);
```
%% Cell type:code id: tags:
``` python
assert client.search_last_added(type="Person").firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files, the `PodClient` has a file api. The file api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
%% Cell type:code id: tags:
``` python
from pymemri.data.photo import *
```
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(x)
assert succes
assert succes2
```
%% Cell type:code id: tags:
``` python
data = client.get_file(file.sha256)
arr = np.frombuffer(data, dtype=np.uint8)
assert (arr.reshape(640,640) == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
For photos we do this automatically using `PodClient.create` on a Photo and `PodClient.get_photo`:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
```
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(Photo.from_np(x))
```
%% Cell type:code id: tags:
``` python
client.create_photo(photo)
```
%% Output
True
%% Cell type:code id: tags:
``` python
res = client.get_photo(photo.id, size=640)
```
%% Cell type:code id: tags:
``` python
assert (res.data == x).all()
```
%% Cell type:markdown id: tags:
Some photos come as bytes, we can use `Iphoto.from_bytes` to initialize those
%% Cell type:code id: tags:
``` python
_bytes = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xe1\x00\x00\x00\xe1\x08\x03\x00\x00\x00\tm"H\x00\x00\x003PLTE\x04\x02\x04\x00\x00\x00\xa0\xa0\xa0\xa3\xa3\xa3\xaa\xaa\xaa\xb4\xb4\xb4\xbd\xbe\xbd\xbb\xbc\xbb\xde\xde\xde\x9b\x9a\x9b\xfe\xfe\xfe\xf2\xf3\xf2\xe5\xe6\xe5\xd8\xd9\xd8\xd1\xd1\xd1\xc9\xca\xc9\xae\xae\xae\x80k\x98\xfc\x00\x00\x01TIDATx\x9c\xed\xdd;r\xc2P\x00\x04A!\x90\x84\xfc\x01\xee\x7fZ\x138\xb1\x13S\xceF\xaf\xfb\x06\x93o\xd5No\xef\x1f\x9f\xb7\xfb}]\xd7my\xba|;\xff4\xff\xdf\xf9O\x97W<\x96W\xac\xbfm\xd7i9\x1d\xdb\xfe,\x9c\x8e\xec4+\xac{\x16^\x14\xb6)\xecS\xd8\xa7\xb0Oa\x9f\xc2\xbe!\n\xcf\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xce\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87\xc4bHa\x9c\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x86xaQ\x18\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87D\xe1\xe3\xf0\x85\x8b\xc26\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xae\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14n\xa7c\xdb\xa7\xeb>\x1f\xd9~\xfb\x02\xee\x7f\r\xe5\xe1h\x04"\x00\x00\x00\x00IEND\xaeB`\x82'
```
%% Cell type:code id: tags:
``` python
photo = Photo.from_bytes(_bytes)
```
%% Cell type:code id: tags:
``` python
assert client.create_photo(photo)
```
%% Cell type:markdown id: tags:
We create a new client to prevent caching
%% Cell type:code id: tags:
``` python
new_client = PodClient(database_key=client.database_key, owner_key=client.owner_key)
```
%% Cell type:code id: tags:
``` python
res = new_client.get_photo(photo.id, size=225)
```
%% Cell type:code id: tags:
``` python
res.data == photo.data
```
%% Output
True
%% Cell type:code id: tags:
``` python
res.file
```
%% Output
[File (#14a26270fdb987f6cecf29443575a409)]
[File (#b707b8c16d2649eebe19dd690f638e28)]
%% Cell type:markdown id: tags:
## Bulk API
%% Cell type:markdown id: tags:
Currently, an api call takes a lot of time (~0.1 second). If you want to write many data items to the pod, you can use the bulk api for efficiency reasons. Currently, only creating `Items` and `Edges` is supported, support for updating and deletion will follow.
%% Cell type:code id: tags:
``` python
# Test bulk create items and edges
dogs = [Dog(name=f"dog number {i}") for i in range(100)]
person = Person(firstName="Alice")
edge1 = Edge(dogs[0], person, "label")
edge2 = Edge(dogs[1], person, "label")
client.bulk_action(create_items=dogs + [person], create_edges=[edge1,edge2])
dogs_with_edge = [item for item in client.search({"type": "Dog"}) if item.name in {"dog number 0", "dog number 1"}]
assert len(dogs_with_edge) == 2
for d in dogs_with_edge:
assert len(d.label) > 0
```
%% Cell type:code id: tags:
``` python
# test bulk delete and update
# Change person name, delete first dog :(
person.firstName = "Bob"
to_delete = [dogs[0]]
to_update = [person]
client.bulk_action(delete_items=to_delete, update_items=to_update)
dogs_with_edge = [
item for item in client.search({"type": "Dog"}) if item.name in {"dog number 0", "dog number 1"}
]
assert len(dogs_with_edge) == 1
dog = dogs_with_edge[0]
assert dog.label[0].firstName == "Bob"
```
%% Cell type:markdown id: tags:
# Check if an item exists -
%% Cell type:code id: tags:
``` python
# hide
# person_item = Person.from_data(firstName="Eve", externalId="gmail_1")
# person_item2 = Person.from_data(firstName="Eve2", externalId="gmail_1")
# client.create_if_external_id_not_exists(person_item)
# client.create_if_external_id_not_exists(person_item2)
# existing = client.search({"externalId": "gmail_1"})
# assert len(existing) == 1
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Resetting the db -
%% Cell type:code id: tags:
``` python
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.schema.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted test_utils.ipynb.
%% Cell type:code id: tags:
``` python
```
......
......@@ -279,4 +279,20 @@ class Item(ItemBase):
res = cls(**kwargs)
for e in res.get_all_edges():
e.source = res
return res
\ No newline at end of file
return res
def _get_schema_type(self):
for cls in self.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def to_json(self, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(self, "private", [])
for k, v in self.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type()
return res
......@@ -87,7 +87,7 @@ class PodClient:
self.local_db = DB()
def get_create_dict(self, node):
properties = self.get_properties_json(node)
properties = node.to_json()
properties = {k:v for k, v in properties.items() if v != []}
return properties
......@@ -123,7 +123,7 @@ class PodClient:
for node in nodes:
self.registered_classes[node.__class__.__name__] = type(node)
attributes = self.get_properties_json(node)
attributes = node.to_json()
for k, v in attributes.items():
if type(v) not in self.TYPE_TO_SCHEMA:
raise ValueError(f"Could not add property {k} with type {type(v)}")
......@@ -407,25 +407,8 @@ class PodClient:
print(e)
return None
def get_properties_json(self, node, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(node, "private", [])
for k, v in node.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type(node)
return res
@staticmethod
def _get_schema_type(node):
for cls in node.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def get_update_dict(self, node):
properties = self.get_properties_json(node, dates=False)
properties = node.to_json(dates=False)
properties.pop("type", None)
properties.pop("deleted", None)
return properties
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment