Commit 26f6b423 authored by Koen van der Veen's avatar Koen van der Veen
Browse files

fix bug with type annotation, add schema, add test

parent 946d6399
Pipeline #9154 failed with stage
in 1 minute and 33 seconds
Showing with 56 additions and 6 deletions
+56 -6
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp data.itembase
```
%% Cell type:markdown id: tags:
# Itembase
%% Cell type:markdown id: tags:
Any data class in pymemri inherits from `Item`. It is a base class for items with some handy functionalities to create new items and edges, retrieve all edges to other items, and sync with the pod.
%% Cell type:code id: tags:
``` python
# export
# hide
from typing import Optional, Dict, List, Generic, TypeVar, Tuple, Union, Iterable, ForwardRef, get_args, get_origin
from pymemri.imports import *
from datetime import datetime
import uuid
ALL_EDGES = "allEdges"
SOURCE, TARGET, TYPE, EDGE_TYPE, LABEL, SEQUENCE = "_source", "_target", "_type", "_type", "label", "sequence"
```
%% Cell type:code id: tags:
``` python
#hide
from nbdev.showdoc import *
```
%% Cell type:code id: tags:
``` python
# export
class Edge():
"""Edges makes a link between two `ItemBase` Items. You won't use this class a lot in practice, as edges are
abstracted away for normal users. When items are retrieved from the database, the edges are parsed automatically.
When you add an edge between to items within pymemri, you will often use `ItemBase.add_edge`"""
def __init__(self, source, target, _type, label=None, sequence=None, created=False, reverse=True):
self.source = source
self.target = target
self._type = _type
self.label = label
self.sequence = sequence
self.created = created
self.reverse = reverse
@classmethod
def from_json(cls, json):
from pymemri.data.schema import get_constructor
# we only set the target here
_type = json[EDGE_TYPE]
json_target = json[TARGET]
target_type = json_target["_type"]
plugin_class = json_target.get("pluginClass", None)
target_constructor = get_constructor(target_type, plugin_class)
target = target_constructor.from_json(json_target)
return cls(source=None, target=target, _type=_type)
def __repr__(self):
return f"{self.source} --{self._type}-> {self.target}"
def update(self, api):
if self.created:
api.create_edges([self])
def __eq__(self, other):
return self.source is other.source and self.target is other.target \
and self._type == other._type and self.reverse == other.reverse and self.created == other.created \
and self.label == other.label
def traverse(self, start):
"""We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function."""
if start == self.source:
return self.target
elif start == self.target:
return self.source
else:
raise ValueError
```
%% Cell type:code id: tags:
``` python
show_doc(Edge.traverse)
```
%% Output
<h4 id="Edge.traverse" class="doc_header"><code>Edge.traverse</code><a href="__main__.py#L39" class="source_link" style="float:right">[source]</a></h4>
> <code>Edge.traverse</code>(**`start`**)
```
We can traverse an edge starting from the source to the target or vice versa. In practice we often call
item.some_edge_type, which calls item.traverse(edgetype), which in turn calls this function.
```
%% Cell type:code id: tags:
``` python
# export
T = TypeVar('T')
def check_target_type(fn):
"""
Decorator to perform type check the target type of the first argument, or list of arguments.
"""
def _check_type_wrapper(self, arg):
if isinstance(arg, Iterable):
for item in arg:
target_type_name = type(item.target).__name__
if not (target_type_name == self.target_type or \
target_type_name in [t.__forward_arg__ if isinstance(t, ForwardRef) else t.__name__ for t in get_args(self.target_type)]):
raise TypeError("Attempted to insert edge with invalid target type")
elif isinstance(arg, Edge):
target_type_name = type(arg.target).__name__
if not (target_type_name == self.target_type or \
target_type_name in [t.__forward_arg__ if isinstance(t, ForwardRef) else t.__name__ for t in get_args(self.target_type)]):
raise TypeError("Attempted to insert edge with invalid target type")
else:
raise TypeError("Attempted to insert edge with invalid type")
return fn(self, arg)
return _check_type_wrapper
class EdgeList(list, Generic[T]):
def __init__(
self,
name: str,
target_type: Union[type, str, ForwardRef],
data: List[Edge] = None,
) -> None:
super().__init__()
self.name = name
if isinstance(target_type, type):
target_type = target_type.__name__
elif isinstance(target_type, ForwardRef):
self.target_type = target_type.__forward_arg__
self.target_type = target_type
if data is not None:
self.extend(data)
@property
def targets(self) -> List["Item"]:
return [edge.target for edge in self]
# Wrap all append, extend and add methods
@check_target_type
def append(self, item: Edge) -> None:
return super().append(item)
@check_target_type
def extend(self, other: Iterable[Edge]) -> None:
return super().extend(other)
@check_target_type
def __add__(self, other: Iterable[Edge]) -> "EdgeList":
return super().__add__(other)
@check_target_type
def __iadd__(self, other: Iterable[Edge]) -> "EdgeList":
return super().__iadd__(other)
def __setitem__(self, i: int, item: Edge) -> None:
if isinstance(item, Edge):
target_type_name = type(item.target).__name__
if not (target_type_name == self.target_type or \
target_type_name in [t.__forward_arg__ if isinstance(t, ForwardRef) else t.__name__ for t in get_args(self.target_type)]):
raise TypeError("Attempted to insert edge with invalid target type")
else:
raise TypeError("Attempted to insert edge with invalid type")
return super().__setitem__(i, item)
def insert(self, i: int, item: Edge) -> None:
if isinstance(item, Edge):
target_type_name = type(item.target).__name__
if not (target_type_name == self.target_type or \
target_type_name in [t.__forward_arg__ if isinstance(t, ForwardRef) else t.__name__ for t in get_args(self.target_type)]):
raise TypeError("Attempted to insert edge with invalid target type")
else:
raise TypeError("Attempted to insert edge with invalid type")
return super().insert(i, item)
```
%% Cell type:code id: tags:
``` python
# export
# hide
class ItemBase:
"""Provides a base class for all items.
All items in the schema inherit from this class, and it provides some
basic functionality for consistency and to enable easier usage."""
properties: List[str] = list()
edges: List[str] = list()
def __init__(self, id: str = None):
self._date_local_modified = dict()
self._in_pod: bool = False
self._new_edges = list()
self._original_properties = dict()
self.id: Optional[str] = id
def __setattr__(self, name, value):
prev_val = getattr(self, name, None)
super(ItemBase, self).__setattr__(name, value)
if name in self.properties and value != prev_val:
self._date_local_modified[name] = datetime.utcnow()
if name not in self._original_properties:
self._original_properties[name] = prev_val
@property
def _updated_properties(self):
return set(self._original_properties.keys())
def __getattribute__(self, name):
val = object.__getattribute__(self, name)
if name in object.__getattribute__(self, "edges"):
if isinstance(val, Edge):
return val.traverse(start=self)
if isinstance(val, EdgeList):
return [edge.traverse(start=self) for edge in val]
return val
def reset_local_sync_state(self):
"""
reset_local_sync_state is called when self is created or updated (optionally via bulk) in the PodClient.
"""
self._original_properties = dict()
self._date_local_modified = dict()
self._in_pod = True
def add_edge(self, name, val):
"""Creates an edge of type name and makes it point to val"""
if name not in self.edges:
raise NameError(f"object {self} does not have edge with name {name}")
existing = object.__getattribute__(self, name)
edge = Edge(self, val, name, created=True)
if edge not in existing:
existing.append(edge)
self._new_edges.append(edge)
def is_expanded(self):
"""returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object."""
return len(self.get_all_edges()) > 0
def get_edges(self, name):
return object.__getattribute__(self, name)
def get_all_edges(self):
return [
e
for attr in self.__dict__.values()
if self.attr_is_edge(attr)
for e in attr
]
def get_all_edge_names(self):
return [k for k, v in self.__dict__.items() if self.attr_is_edge(v)]
def get_property_names(self):
return [k for k, v in self.__dict__.items() if not type(v) == list]
@staticmethod
def attr_is_edge(attr):
return isinstance(attr, list) and len(attr) > 0 and isinstance(attr[0], Edge)
def update(self, api, edges=True, create_if_not_exists=True, skip_nodes=False):
if not self.exists(api):
print(f"creating {self}")
api.create(self)
else:
print(f"updating {self}")
api.update_item(self)
if edges:
api.create_edges(self.get_all_edges())
def exists(self, api):
return api.exists(self.id) if self.id else None
def create_id_if_not_exists(self):
if self.id is None:
self.id = uuid.uuid4().hex
def store(self, client: "PodClient"):
return client.add_to_store(self)
def __repr__(self):
id = self.id
_type = self.__class__.__name__
return f"{_type} (#{id})"
@classmethod
def from_data(cls, *args, **kwargs):
edges = dict()
new_kwargs = dict()
for k, v in kwargs.items():
if isinstance(v, ItemBase):
edge = Edge(None, v, k)
edges[k] = edge
new_kwargs[k] = edge
else:
new_kwargs[k] = v
res = cls(*args, **new_kwargs)
for v in edges.values():
v.source = res
return res
```
%% Cell type:code id: tags:
``` python
# export
class Item(ItemBase):
"""Item is the baseclass for all of the data classes."""
properties = [
"dateCreated",
"dateModified",
"dateServerModified",
"deleted",
"externalId",
"itemDescription",
"starred",
"version",
"id",
"importJson",
"pluginClass",
"isMock",
]
edges = ["label"]
DATE_PROPERTIES = ['dateCreated', 'dateModified', 'dateServerModified']
def __init__(
self,
dateCreated: datetime = None,
dateModified: datetime = None,
dateServerModified: datetime = None,
deleted: bool = None,
externalId: str = None,
itemDescription: str = None,
starred: bool = None,
version: str = None,
id: str = None,
importJson: str = None,
pluginClass: str = None,
isMock: bool = None,
label: EdgeList["CategoricalLabel"] = None,
):
super().__init__(id)
# Properties
self.dateCreated: Optional[str] = dateCreated
self.dateModified: Optional[str] = dateModified
self.dateServerModified: Optional[str] = dateServerModified
self.deleted: Optional[str] = deleted
self.externalId: Optional[str] = externalId
self.itemDescription: Optional[str] = itemDescription
self.starred: Optional[str] = starred
self.version: Optional[str] = version
self.importJson: Optional[str] = importJson
self.pluginClass: Optional[str] = pluginClass
self.isMock: Optional[bool] = isMock
# Edges
self.label: EdgeList["CategoricalLabel"] = EdgeList(
"label", "CategoricalLabel", label
)
@classmethod
def parse_json(self, cls, json):
property_kwargs = Item.parse_properties(cls, json)
edge_kwargs = Item.parse_edges(cls, json)
return {**property_kwargs, **edge_kwargs}
@classmethod
def parse_properties(self, cls, json):
return {p: json.get(p, None) for p in cls.properties}
@classmethod
def parse_edges(self, cls, json):
all_edges = json.get(ALL_EDGES, None)
edge_kwargs = dict()
reverse_edges = [f"~{e}" for e in cls.edges]
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge.type in self.edges + reverse_edges:
edge_name = self.remove_prefix(edge.type)
if edge_name in edge_kwargs:
edge_kwargs[edge_name] += [edge]
else:
edge_kwargs[edge_name] = [edge]
return edge_kwargs
@classmethod
def get_property_types(cls, dates=False) -> Dict[str, type]:
"""
Infer the property types of all properties in cls.
Raises ValueError if type anotations for properties are missing in the cls init.
"""
mro = cls.mro()
property_types = dict()
for basecls in reversed(mro[:mro.index(ItemBase)]):
property_types.update(basecls.__init__.__annotations__)
property_types = {k: v for k, v in property_types.items() if k in cls.properties}
if not set(property_types.keys()) == set(cls.properties):
raise ValueError(f"Item {cls.__name__} has missing property annotations.")
res = dict()
for k, v in property_types.items():
if k[:1] != '_' and k != "private" and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in cls.DATE_PROPERTIES)):
res[k] = v
return res
@classmethod
def get_edge_types(cls) -> list[Tuple[str, str, str]]:
def get_edge_types(cls) -> List[Tuple[str, str, str]]:
"""
Infer the types of all edges in cls as tuple (source_type, target_type)
"""
mro = cls.mro()
tgt_types = dict()
for basecls in reversed(mro[:mro.index(ItemBase)]):
tgt_types.update(basecls.__init__.__annotations__)
tgt_types = {k: v for k, v in tgt_types.items() if k in cls.edges}
res: list[tuple] = []
for k, v in tgt_types.items():
if hasattr(v, "__args__") and len(v.__args__):
v = v.__args__[0]
if get_origin(v) == Union:
for arg in get_args(v):
if isinstance(arg, ForwardRef):
res.append((k, cls.__name__, arg.__forward_arg__))
else:
res.append((k, cls.__name__, arg.__name__))
break
elif isinstance(v, type):
v = v.__name__
elif isinstance(v, ForwardRef):
v = v.__forward_arg__
else:
v = v
else:
v = "Any"
res.append((k, cls.__name__, v))
return res
@classmethod
def remove_prefix(s, prefix="~"):
return s[1:] if s[0] == "`" else s
@classmethod
def from_json(cls, json):
kwargs = Item.parse_json(cls, json)
property_types = cls.get_property_types(dates=True)
for k, v in kwargs.items():
if v is not None and property_types[k] == datetime:
# Datetime in pod is in milliseconds
kwargs[k] = datetime.fromtimestamp(v / 1000.)
res = cls(**kwargs)
for e in res.get_all_edges():
e.source = res
return res
def _get_schema_type(self):
for cls in self.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def to_json(self, dates=True):
res = dict()
private = getattr(self, "private", [])
for k, v in self.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in self.DATE_PROPERTIES)):
if isinstance(v, datetime):
# Save datetimes in milliseconds
v = int(v.timestamp() * 1000)
res[k] = v
res["type"] = self._get_schema_type()
return res
```
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.add_edge)
```
%% Output
<h4 id="ItemBase.add_edge" class="doc_header"><code>ItemBase.add_edge</code><a href="__main__.py#L47" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.add_edge</code>(**`name`**, **`val`**)
Creates an edge of type name and makes it point to val
%% Cell type:code id: tags:
``` python
show_doc(ItemBase.is_expanded)
```
%% Output
<h4 id="ItemBase.is_expanded" class="doc_header"><code>ItemBase.is_expanded</code><a href="__main__.py#L57" class="source_link" style="float:right">[source]</a></h4>
> <code>ItemBase.is_expanded</code>()
returns whether the node is expanded. An expanded node retrieved nodes that are
*directly* connected to it
from the pod, and stored their values via edges in the object.
%% Cell type:markdown id: tags:
# Usage
%% Cell type:markdown id: tags:
With the `Item` and `Edge` classes we can create an item and its surrounding graph. The schema is defined in schema.py, in general we want to use the from_data staticmethod to generate new items, because it ensures that edges are linked from both the source and the target object. Let's make a new item and add it to the pod.
%% Cell type:code id: tags:
``` python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges + ["friend"]
def __init__(self, name: str=None, age: int=None,friend: list=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.friend = fried if friend is not None else []
```
%% Cell type:code id: tags:
``` python
from pymemri.pod.client import PodClient
client = PodClient()
```
%% Cell type:code id: tags:
``` python
assert client.add_to_schema(MyItem(name="abc", age=1))
```
%% Cell type:code id: tags:
``` python
x = MyItem(name="me", age=30)
target = MyItem(name="my friend", age=31)
client.create(target)
x.add_edge("friend", MyItem(name="my friend", age=31))
```
%% Cell type:markdown id: tags:
We can now create our `MyItem`, as a side-effect of creating it, it will receive an id
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
None
%% Cell type:code id: tags:
``` python
assert client.create(x)
```
%% Cell type:code id: tags:
``` python
print(x.id)
```
%% Output
a17c93d199f4128976d7eac3542b669b
%% Cell type:code id: tags:
``` python
y = client.get(x.id)
```
%% Cell type:code id: tags:
``` python
assert len(y.friend) > 0
```
%% Cell type:code id: tags:
``` python
assert y.friend[0].name == "my friend"
assert y.name == "me"
assert y.age == 30
# One year later
y.age = 31
y.add_edge("friend", MyItem(name="my friend2", age=29))
y.update(client)
assert y.age == 31
assert len(y.friend) == 2
```
%% Output
updating MyItem (#a17c93d199f4128976d7eac3542b669b)
BULK: Writing 2/2 items/edges
400 Failure: JSON deserialization error payload.createEdges[0]._target: invalid type: null, expected a string at line 1 column 245
could not complete bulk action, aborting
%% Cell type:code id: tags:
``` python
y.friend
```
%% Output
[MyItem (#None), MyItem (#None)]
%% Cell type:code id: tags:
``` python
y.to_json(dates=False)
```
%% Output
{'id': 'a17c93d199f4128976d7eac3542b669b',
'deleted': False,
'name': 'me',
'age': 31,
'type': 'MyItem'}
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted Untitled1.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
......
......@@ -398,17 +398,20 @@ class Dataset(Item):
class DatasetEntry(Item):
description = """Entry item of dataset."""
properties = Item.properties + []
edges = Item.edges + ["data"]
edges = Item.edges + ["data", "annotation"]
def __init__(
self,
data: EdgeList[Union["Message", "EmailMessage"]] = None,
data: EdgeList[Union["Message", "Tweet"]] = None,
annotation: EdgeList["CategoricalLabel"] = None,
**kwargs
):
super().__init__(**kwargs)
# Edges
self.data: EdgeList[Union["Message", "EmailMessage"]] = EdgeList("data", Union["Message", "EmailMessage"], data)
self.data: EdgeList[Union["Message", "Tweet"]] = EdgeList("data", Union["Message", "Tweet"], data)
self.annotation: EdgeList["CategoricalLabel"] = EdgeList("annotation", "CategoricalLabel", annotation)
class DatasetType(Item):
description = """Fixed dictionary for datasets."""
......@@ -1475,3 +1478,23 @@ class EmailMessage(Message):
"message", "EmailMessage", message
)
self.replyTo: EdgeList["Account"] = EdgeList("replyTo", "Account", replyTo)
class Tweet(Post):
description = """Tweet of Twitter"""
properties = Post.properties + ["service"]
edges = Post.edges + ["mention"]
def __init__(
self,
service: str = None,
mention: EdgeList["Account"] = None,
**kwargs
):
super().__init__(**kwargs)
# Properties
self.service: Optional[str] = service
# Edges
self.mention: EdgeList["Account"] = EdgeList("mention", "Account", mention)
\ No newline at end of file
......@@ -383,7 +383,7 @@ class Item(ItemBase):
return res
@classmethod
def get_edge_types(cls) -> list[Tuple[str, str, str]]:
def get_edge_types(cls) -> List[Tuple[str, str, str]]:
"""
Infer the types of all edges in cls as tuple (source_type, target_type)
"""
......
import pytest
from pymemri.pod.client import PodClient
from pymemri.data.schema import Account, Person
from pymemri.data.itembase import Edge
from pymemri.data.itembase import Edge, EdgeList, Item
@pytest.fixture
def client():
......@@ -55,3 +55,30 @@ def test_edge_insert_wrong_target():
# Length 1 from first add_edge
assert len(account.owner)==1
def test_create_and_read_item(client):
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges + ["friend"]
def __init__(self, name: str=None, age: int=None,friend: list=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.friend = EdgeList("friend", "MyItem", friend)
x = MyItem(name="me", age=30)
target = MyItem(name="my friend", age=31)
client.create(target)
x.add_edge("friend", MyItem(name="my friend", age=31))
assert client.add_to_schema(MyItem(name="abc", age=1))
x = MyItem(name="me", age=30)
target = MyItem(name="my friend", age=31)
client.create(target)
x.add_edge("friend", MyItem(name="my friend", age=31))
assert client.create(x)
y = client.get(x.id)
assert len(y.friend) > 0
assert y.friend[0].name == "my friend"
assert y.name == "me"
assert y.age == 30
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment