Commit d5535964 authored by Eelco van der Wel's avatar Eelco van der Wel :speech_balloon:
Browse files

remove schema, make declarative

parent ce557620
Pipeline #3309 failed with stage
in 3 minutes and 25 seconds
Showing with 425 additions and 6236 deletions
+425 -6236
......@@ -129,6 +129,31 @@
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
......
%% Cell type:code id:6c01afe7 tags:
``` python
#default_exp data.declarative
#default_exp data.schema
%load_ext autoreload
%autoreload 2
```
%% Cell type:code id:80c76cf3 tags:
``` python
# export
# hide
import json
import re
import os
import requests
import urllib
import pathlib
from pathlib import Path
from typing import Any, Dict
import pymemri
from pymemri.data.itembase import Item
```
%% Cell type:code id:dc015f3b tags:
%% Cell type:code id:29cc6147 tags:
``` python
# export
# hide
# stores all generated schema classes
_SCHEMA: Dict[str, type] = dict()
def __getattr__(name):
if name in _SCHEMA:
return _SCHEMA[name]
else:
raise AttributeError(f"Schema for {name} not found.")
```
%% Cell type:code id:ebe1407f tags:
``` python
# export
# hide
_generated_schema: dict[str, type] = dict()
def schema_init(self, **kwargs):
super(type(self), self).__init__(**kwargs)
for p in self._property_kwargs:
setattr(self, p, kwargs.get(p, None))
for e in self._edge_kwargs:
setattr(self, e, kwargs.get(e, list()))
class SchemaFactory:
"""
Class factory for schema types. Generates a schema type from a given json file or url
"""
@staticmethod
def is_valid_schema_name(name: str) -> bool:
"""Check if name is a valid schema name"""
return re.match(r"[A-Z][a-zA-Z0-9_]*", name)
@staticmethod
def create_schema(name: str, properties: list, edges: list,
base: type=Item, description=None) -> type:
"""
Create a type for schema from a name, list of properties and list of edges. Optionally set the base schema.
"""
if not SchemaFactory.is_valid_schema_name(name):
raise ValueError(f"{name} is not a valid schema name.")
properties = [p for p in properties if p not in base.properties]
edges = [e for e in edges if e not in base.edges]
# Store property_kwargs and edge_kwargs of self to set these in the init.
cls_dict = {
"_property_kwargs": properties,
"_edge_kwargs": edges,
"properties": base.properties + properties,
"edges": base.edges + edges,
"description": description,
"__init__": schema_init
}
schema_cls = type(
name,
(base,),
cls_dict
)
return schema_cls
@staticmethod
def from_dict(name: str, schema_dict: dict) -> type:
"""
Creates a schema type from a dictionary. See https://gitlab.memri.io/memri/schema/ for format.
TODO add edge constraints ("sequenced" and "singular")
TODO add option for different base class
"""
properties = schema_dict.get("properties", [])
edges = list(schema_dict.get("relations", {}).keys())
description=schema_dict.get("description", None)
return SchemaFactory.create_schema(
name=name,
properties=properties,
edges=edges,
description=description
)
@staticmethod
def from_file(file_name: str) -> type:
"""
Create a schema class from json file.
"""
schema_name = os.path.splitext(os.path.basename(file_name))[0]
with open(file_name, "r") as f:
schema_dict = json.load(f)
return SchemaFactory.from_dict(schema_name, schema_dict)
@staticmethod
def from_url(url: str) -> type:
"""
Create a schema class from json url.
For testing, import directly from schema repository.
"""
file_name = urllib.parse.urlparse(url)[2].rpartition('/')[-1]
schema_name = os.path.splitext(file_name)[0]
schema_dict = requests.get(url).json()
return SchemaFactory.from_dict(schema_name, schema_dict)
```
%% Cell type:code id:2a855921 tags:
``` python
# export
# hide
def get_available_schema():
base_path = Path(pymemri.data.__file__).parent
schema_path = base_path / "schema" / "TypeHierarchy" / "Item"
schema_files = schema_path.rglob("*.json")
return schema_files
def generate_schema():
global _SCHEMA
schema_files = get_available_schema()
for schema_file in schema_files:
schema_cls = SchemaFactory.from_file(schema_file)
schema_name = schema_cls.__name__
_SCHEMA[schema_name] = schema_cls
generate_schema()
```
%% Cell type:code id:dc015f3b tags:
``` python
# export
# hide
def get_constructor(_type, plugin_class=None, plugin_package=None, extra=None):
import pymemri.indexers as models
from pymemri.data.photo import IPhoto
from pymemri.indexers.indexer import IndexerBase
import pymemri.integrator_registry
if _type == "Indexer" and plugin_class is not None and hasattr(pymemri.integrator_registry, plugin_class):
return getattr(pymemri.integrator_registry, plugin_class)
if plugin_class is not None and plugin_package is not None:
try:
mod = __import__(plugin_package, fromlist=[plugin_class])
dynamic = {plugin_class: getattr(mod, plugin_class)}
except Exception as e:
print(f"Could not import {plugin_class}.{plugin_package}")
else:
dynamic = dict()
classes = z = {**globals(), **locals(), **extra, **dynamic}
if _type in classes:
if _type == "Indexer":
constructor = classes[plugin_class]
else:
i_class = "I" + _type
if i_class in classes:
constructor = classes[i_class]
else:
constructor = classes[_type]
else:
raise TypeError(f"Could not initialize item, type {_type} not registered in PodClient")
return constructor
```
%% Cell type:code id:a616c11c tags:
``` python
from pymemri.pod.client import PodClient
pod_client = PodClient()
```
%% Cell type:code id:7d4a8d27 tags:
``` python
print("Number of generated Items:", len(_SCHEMA))
assert len(_SCHEMA)
```
%% Output
Number of generated Items: 94
%% Cell type:code id:29050de0 tags:
``` python
from pymemri.data.schema import Account
test_account = Account(handle="Test Account", displayName="Test Account")
assert pod_client.add_to_schema(Account(handle="", displayName=""))
assert pod_client.create(test_account)
```
%% Cell type:code id:3267a5d6 tags:
``` python
account_json = """
{
"description": "An account or subscription, for instance for some online service, or a bank account or wallet.",
"properties": [
"handle",
"displayName",
"service",
"itemType",
"avatarUrl"
],
"relations": {
"belongsTo": {
"sequenced": false,
"singular": false
},
"price": {
"sequenced": false,
"singular": false
},
"location": {
"sequenced": false,
"singular": false
},
"organization": {
"sequenced": false,
"singular": false
},
"contact": {
"sequenced": false,
"singular": false
}
},
"foregroundColor": "#ffffff",
"backgroundColor": "#93c47d"
}
"""
Account = SchemaFactory.from_dict("Account", json.loads(account_json))
test_account = Account(handle="Test Account", displayName="Test Account")
assert pod_client.add_to_schema(Account(handle="", displayName=""))
assert pod_client.create(test_account)
```
%% Cell type:code id:010e95e6 tags:
``` python
url = 'https://gitlab.memri.io/memri/schema/-/raw/dev/TypeHierarchy/Item/Account/Account.json'
Account = SchemaFactory.from_url(url)
test_account = Account(handle="Test Account", displayName="Test Account")
assert pod_client.add_to_schema(Account(handle="", displayName=""))
assert pod_client.create(test_account)
```
%% Cell type:code id:ca1c8794 tags:
``` python
# export
# hide
def load_schema_folder(path: str) -> dict[str, type]:
"""
Add a folder of json files to generated schema
"""
generated_schema = dict()
file_names = pathlib.Path(path).glob(f"*.json")
for file_name in file_names:
s = SchemaFactory.from_file(file_name)
generated_schema[s.__name__] = s
return generated_schema
def register_schema_folder(path: str, overwrite_existing=True) -> None:
"""
Load schema from folder and add to globals()
"""
for k, v in load_schema_folder(path).items():
if k in globals() and not overwrite_existing:
print(f"Skipping schema {k}: schema already exists.")
else:
globals()[k] = v
print(globals()[k])
```
%% Cell type:code id:c53c15bc tags:
``` python
# hide
register_schema_folder("./schemas")
assert Account
assert UserAccount
```
%% Cell type:code id:895192da tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.declarative.ipynb.
Converted data.photo.ipynb.
Converted data.schema.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.schema.ipynb.
Converted plugin.stateful.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
%% Cell type:code id:7d529ca5 tags:
%% Cell type:code id:146ca34a tags:
``` python
```
......
......@@ -601,6 +601,31 @@
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
......
......@@ -170,6 +170,31 @@
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.8"
},
"toc": {
"base_numbering": 1,
"nav_menu": {},
"number_sections": true,
"sideBar": true,
"skip_h1_title": false,
"title_cell": "Table of Contents",
"title_sidebar": "Contents",
"toc_cell": false,
"toc_position": {},
"toc_section_display": true,
"toc_window_display": false
}
},
"nbformat": 4,
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.schema import CVUStoredDefinition
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.data.photo import Photo
from pymemri.imports import *
from hashlib import sha256
from pymemri.pod.db import DB
from pymemri.pod.utils import *
from pymemri.plugin.schema import *
```
%% Cell type:code id: tags:
``` python
# export
DEFAULT_POD_ADDRESS = "http://localhost:3030"
POD_VERSION = "v4"
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None, verbose=False, register_base_schema=True):
self.verbose = verbose
self.url = url
self.version = POD_VERSION
self.test_connection(verbose=self.verbose)
self.database_key=database_key if database_key is not None else self.generate_random_key()
self.owner_key=owner_key if owner_key is not None else self.generate_random_key()
self.base_url = f"{url}/{version}/{self.owner_key}"
self.auth_json = {"type":"ClientAuth","databaseKey":self.database_key} if auth_json is None \
else {**{"type": "PluginAuth"}, **auth_json}
self.local_db = DB()
self.registered_classes=dict()
self.register_base_schemas()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"), **kwargs)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", error="", targetItemId="",
settings=""))
assert client.add_to_schema(PersistentState(pluginName="", state="", account=""))
assert client.add_to_schema(CVUStoredDefinition(name="", definition=""))
assert client.add_to_schema(Account(service="", identifier="", secret="", code="",
refreshToken="", errorMessage=""))
except Exception as e:
raise ValueError("Could not add base schema")
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
self.local_db.add(node)
def test_connection(self, verbose=True):
try:
res = requests.get(self.url)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
print("Could no connect to backend")
return False
def create(self, node):
if isinstance(node, Photo) and not self.create_photo_file(node): return False
try:
properties = self.get_properties_json(node)
properties = {k:v for k, v in properties.items() if v != []}
body = {"auth": self.auth_json, "payload":properties}
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
self.add_to_db(node)
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def add_to_schema(self, node):
self.registered_classes[node.__class__.__name__] = type(node)
attributes = self.get_properties_json(node)
for k, v in attributes.items():
if not isinstance(v, list) and k != "type":
if isinstance(v, str):
value_type = "Text"
elif isinstance(v, int):
value_type = "Integer"
payload = {"type": "ItemPropertySchema", "itemType": attributes["type"],
"propertyName": k, "valueType": value_type}
body = {"auth": self.auth_json, "payload": payload }
try:
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
self.add_to_db(node)
except requests.exceptions.RequestException as e:
print(e)
return False
return True
def create_photo_file(self, photo):
file = photo.file[0]
self.create(file)
return self._upload_image(photo.data)
def _upload_image(self, arr):
return self.upload_file(arr.tobytes())
def upload_file(self, file):
# TODO: currently this only works for numpy images
try:
sha = sha256(file).hexdigest()
result = requests.post(f"{self.base_url}/upload_file/{self.database_key}/{sha}", data=file)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def get_file(self, sha):
# TODO: currently this only works for numpy images
try:
body= {"auth": self.auth_json,
"payload": {"sha256": sha}}
result = requests.post(f"{self.base_url}/get_file", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
return result.content
except requests.exceptions.RequestException as e:
print(e)
return None
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(f"Could not load data of {photo} attached file item does not have data in pod")
return
data = np.frombuffer(file, dtype=np.uint8)
c = photo.channels
shape = (photo.height,photo.width, c) if c is not None and c > 1 else (photo.height, photo.width)
data = data.reshape(shape)
if size is not None: data = resize(data, size)
photo.data = data
return
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None: return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format [{"_type": "friend", "_source": 1, "_target": 2}]"""
create_edges = []
for e in edges:
src, target = e.source.id, e.target.id
if src is None or target is None:
print(f"Could not create edge {e} missing source or target id")
return False
data = {"_source": src, "_target": target, "_name": e._type}
if e.label is not None: data[LABEL] = e.label
if e.sequence is not None: data[SEQUENCE] = e.sequence
if e.reverse:
data2 = copy(data)
data2["_source"] = target
data2["_target"] = src
data2["_name"] = "~" + data2["_name"]
create_edges.append(data2)
create_edges.append(data)
return self.bulk_action(create_items=[], update_items=[],create_edges=create_edges)
def delete_items(self, items):
ids = [i.id for i in items]
return self.bulk_action(delete_items=ids)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
def bulk_action(self, create_items=None, update_items=None, create_edges=None, delete_items=None):
create_items = create_items if create_items is not None else []
update_items = update_items if update_items is not None else []
create_edges = create_edges if create_edges is not None else []
delete_items = delete_items if delete_items is not None else []
edges_data = {"auth": self.auth_json, "payload": {
"createItems": create_items, "updateItems": update_items,
"createEdges": create_edges, "deleteItems": delete_items}}
try:
result = requests.post(f"{self.base_url}/bulk",
json=edges_data)
if result.status_code != 200:
if "UNIQUE constraint failed" in str(result.content):
print(result.status_code, "Edge already exists")
else:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def create_edge(self, edge):
payload = {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
body = {"auth": self.auth_json,
"payload": payload}
try:
result = requests.post(f"{self.base_url}/create_edge", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
return self.create_edges([edge])
def get(self, id, expanded=True):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
raise ValueError(f"User with id {id} does not exist")
elif res.deleted == True:
print(f"Item with id {id} does not exist anymore")
return None
else:
return res
def get_all_items(self):
raise NotImplementedError()
try:
body = { "databaseKey": self.database_key, "payload":None}
result = requests.post(f"{self.base_url}/get_all_items", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
res = [self.item_from_json(x) for x in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
print(e)
return None
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id):
item = self.get(id, expanded=False)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
def get_edges(self, id):
body = {"payload": {"item": str(id),
"direction": "Outgoing",
"expandItems": True},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/get_edges", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
for d in json:
d["item"] = self.item_from_json(d["item"])
return json
except requests.exceptions.RequestException as e:
print(e)
return None
def _get_item_with_properties(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
if json == []:
return None
else:
res = self.item_from_json(json[0])
return res
except requests.exceptions.RequestException as e:
print(e)
return None
def get_properties_json(self, node, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(node, "private", [])
for k, v in node.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type(node)
return res
@staticmethod
def _get_schema_type(node):
for cls in node.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def update_item(self, node):
data = self.get_properties_json(node, dates=False)
if "type" in data:
del data["type"]
if "deleted" in data:
del data["deleted"]
id = data["id"]
body = {"payload": data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/update_item",
json=body)
if result.status_code != 200:
print(result, result.content)
except requests.exceptions.RequestException as e:
print(e)
def exists(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
json = result.json()
if isinstance(json, list) and len(json) > 0:
return True
except requests.exceptions.RequestException as e:
print(e)
return None
def search(self, fields_data, include_edges: bool = True):
extra_fields = {'[[edges]]': {}} if include_edges else {}
body = {"payload": {**fields_data, **extra_fields},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/search", json=body)
json = result.json()
res = [self.item_from_json(item) for item in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
return None
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return self.search(query)[0]
def item_from_json(self, json):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(json["type"], plugin_class, plugin_package=plugin_package,
extra=self.registered_classes)
new_item = constructor.from_json(json)
existing = self.local_db.get(new_item.id)
# TODO: cleanup
if existing is not None:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
return existing
else:
return new_item
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties: del properties[ALL_EDGES]
return properties
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys. *Note that this will create a new database for your every time you create a PodClient, if you want to access the same database with multiple PodClients, you have to set the same keys* When you are using the app, setting the keys in the pod, and passing them when calling an integrator is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
success = client.test_connection()
assert success
```
%% Output
Succesfully connected to pod
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. When Initializing an Item, always make sure to use the from_data classmethod to initialize.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
email_item
```
%% Output
EmailMessage (#None)
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(email_item)
assert succes
```
%% Cell type:markdown id: tags:
We can now create our item. As a side-effect, our item will be assigned an id.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
```
%% Output
True
%% Cell type:code id: tags:
``` python
email_item.id
```
%% Output
'6733820e72c19e1227be9e041f4f0676'
%% Cell type:markdown id: tags:
We can easily define our own types, and use them in the pod.
%% Cell type:code id: tags:
``` python
class Dog(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
```
%% Cell type:code id: tags:
``` python
dog = Dog("max", 2)
client.add_to_schema(dog);
dog2 = Dog("bob", 3)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
dog_from_db = client.get(dog2.id, expanded=False)
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
assert item_succes and edge_succes
```
%% Cell type:code id: tags:
``` python
client.get_edges(email_item.id)
```
%% Output
[{'item': Person (#469eb1ab5cfabdbdff7f8c57859f7c84), 'name': 'sender'}]
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
```
%% Cell type:code id: tags:
``` python
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
assert client.create(person_item)
```
%% Cell type:code id: tags:
``` python
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db.lastName == "Awesome"
```
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
client.create(person_item2);
all_people = client.search({"type": "Person"})
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
all_people[:3]
```
%% Output
[Person (#18b8646793e51ae434549c42b60d0e2a),
Person (#4c0ed41801f3d4f942677d987675a86a),
Person (#364347a2b50365463cbc5fb03dfb1458)]
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2);
```
%% Cell type:code id: tags:
``` python
assert client.search_last_added(type="Person").firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files, the `PodClient` has a file api. The file api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
%% Cell type:code id: tags:
``` python
from pymemri.data.photo import *
```
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(x)
assert succes
assert succes2
```
%% Cell type:code id: tags:
``` python
data = client.get_file(file.sha256)
arr = np.frombuffer(data, dtype=np.uint8)
assert (arr.reshape(640,640) == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
For photos we do this automatically using `PodClient.create` on a Photo and `PodClient.get_photo`:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
```
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(IPhoto.from_np(x))
```
%% Cell type:code id: tags:
``` python
assert client.create(photo)
```
%% Cell type:code id: tags:
``` python
res = client.get_photo(photo.id, size=640)
```
%% Cell type:code id: tags:
``` python
res
```
%% Output
IPhoto (#415fc96bde3bb6e426ad2a792151e216)
%% Cell type:code id: tags:
``` python
assert (res.data == x).all()
```
%% Cell type:markdown id: tags:
# Check if an item exists -
%% Cell type:code id: tags:
``` python
# hide
# person_item = Person.from_data(firstName="Eve", externalId="gmail_1")
# person_item2 = Person.from_data(firstName="Eve2", externalId="gmail_1")
# client.create_if_external_id_not_exists(person_item)
# client.create_if_external_id_not_exists(person_item2)
# existing = client.search({"externalId": "gmail_1"})
# assert len(existing) == 1
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Resetting the db -
%% Cell type:code id: tags:
``` python
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted data.schema.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.schema.ipynb.
Converted plugin.stateful.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
%% Cell type:code id: tags:
``` python
```
......
......@@ -14,14 +14,16 @@ index = {"read_file": "basic.ipynb",
"HOME_DIR": "basic.ipynb",
"MODEL_DIR": "basic.ipynb",
"MEMRI_S3": "basic.ipynb",
"schema_init": "data.declarative.ipynb",
"SchemaFactory": "data.declarative.ipynb",
"load_schema_folder": "data.declarative.ipynb",
"register_schema_folder": "data.declarative.ipynb",
"show_images": "data.photo.ipynb",
"get_height_width_channels": "data.photo.ipynb",
"Photo": "data.photo.ipynb",
"IPhoto": "data.photo.ipynb",
"__getattr__": "data.schema.ipynb",
"schema_init": "data.schema.ipynb",
"SchemaFactory": "data.schema.ipynb",
"get_available_schema": "data.schema.ipynb",
"generate_schema": "data.schema.ipynb",
"get_constructor": "data.schema.ipynb",
"ImporterBase": "importers.Importer.ipynb",
"batch": "importers.util.ipynb",
"IndexerBase": "indexers.indexer.ipynb",
......@@ -80,8 +82,8 @@ index = {"read_file": "basic.ipynb",
"read_pod_key": "pod.utils.ipynb"}
modules = ["data/basic.py",
"data/declarative.py",
"data/photo.py",
"data/schema.py",
"importers/importer.py",
"importers/util.py",
"indexers/indexer.py",
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/data.declarative.ipynb (unless otherwise specified).
__all__ = ['schema_init', 'SchemaFactory', 'load_schema_folder', 'register_schema_folder']
# Cell
# hide
import json
import re
import os
import requests
import urllib
import pathlib
from .itembase import Item
# Cell
# hide
_generated_schema: dict[str, type] = dict()
def schema_init(self, **kwargs):
super(type(self), self).__init__(**kwargs)
for p in self._property_kwargs:
setattr(self, p, kwargs.get(p, None))
for e in self._edge_kwargs:
setattr(self, e, kwargs.get(e, list()))
class SchemaFactory:
@staticmethod
def is_valid_schema_name(name: str) -> bool:
"""Check if name is a valid schema name"""
return re.match(r"[A-Z][a-zA-Z0-9_]*", name)
@staticmethod
def create_schema(name: str, properties: list, edges: list,
base: type=Item, description=None) -> type:
"""
Create a type for schema from a name, list of properties and list of edges. Optionally set the base schema.
"""
if not SchemaFactory.is_valid_schema_name(name):
raise ValueError(f"{name} is not a valid schema name.")
properties = [p for p in properties if p not in base.properties]
edges = [e for e in edges if e not in base.edges]
# Store property_kwargs and edge_kwargs of self to set these in the init.
cls_dict = {
"_property_kwargs": properties,
"_edge_kwargs": edges,
"properties": base.properties + properties,
"edges": base.edges + edges,
"description": description,
"__init__": schema_init
}
schema_cls = type(
name,
(base,),
cls_dict
)
return schema_cls
@staticmethod
def from_dict(name: str, schema_dict: dict) -> type:
"""
Creates a schema type from a dictionary. See https://gitlab.memri.io/memri/schema/ for format.
TODO add edge constraints ("sequenced" and "singular")
TODO add option for different base class
"""
properties = schema_dict.get("properties", [])
edges = list(schema_dict.get("relations", {}).keys())
description=schema_dict.get("description", None)
return SchemaFactory.create_schema(
name=name,
properties=properties,
edges=edges,
description=description
)
@staticmethod
def from_file(file_name: str) -> type:
"""
Create a schema class from json file.
"""
schema_name = os.path.splitext(os.path.basename(file_name))[0]
with open(file_name, "r") as f:
schema_dict = json.load(f)
return SchemaFactory.from_dict(schema_name, schema_dict)
@staticmethod
def from_url(url: str) -> type:
"""
Create a schema class from json url.
"""
file_name = urllib.parse.urlparse(url)[2].rpartition('/')[-1]
schema_name = os.path.splitext(file_name)[0]
schema_dict = requests.get(url).json()
return SchemaFactory.from_dict(schema_name, schema_dict)
# Cell
# hide
def load_schema_folder(path: str) -> dict[str, type]:
"""
Add a folder of json files to generated schema
"""
generated_schema = dict()
file_names = pathlib.Path(path).glob(f"*.json")
for file_name in file_names:
s = SchemaFactory.from_file(file_name)
generated_schema[s.__name__] = s
return generated_schema
def register_schema_folder(path: str, overwrite_existing=True) -> None:
"""
Load schema from folder and add to globals()
"""
for k, v in load_schema_folder(path).items():
if k in globals() and not overwrite_existing:
print(f"Skipping schema {k}: schema already exists.")
else:
globals()[k] = v
print(globals()[k])
import globals()[k]
\ No newline at end of file
schema @ c6ab7f62
Subproject commit c6ab7f620e96e9459996bfc5149331832f881fdc
This diff is collapsed.
......@@ -4,7 +4,7 @@ __all__ = ['DEFAULT_POD_ADDRESS', 'POD_VERSION', 'PodClient']
# Cell
from ..data.basic import *
from ..data.schema import *
from ..data.schema import CVUStoredDefinition
from ..data.itembase import Edge, ItemBase, Item
from ..data.photo import Photo
from ..imports import *
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment