Commit 48f82406 authored by Eelco van der Wel's avatar Eelco van der Wel :speech_balloon:
Browse files

Merge branch 'dataset' into 'dev'

Dataset

See merge request !157
parents b864597a bcebbf84
Pipeline #5597 passed with stages
in 3 minutes and 22 seconds
Showing with 473 additions and 336 deletions
+473 -336
%% Cell type:code id:f328cc00 tags:
``` python
# default_exp data.dataset
%load_ext autoreload
%autoreload 2
```
%% Cell type:code id:27a2575d tags:
``` python
# export
# hide
from typing import List, Union
from pathlib import Path
from pymemri.data.itembase import Item
from pymemri.exporters.exporters import Query
```
%% Cell type:code id:5c2f01e8 tags:
``` python
# export
def filter_rows(dataset: dict, filter_val=None) -> dict:
missing_idx = set()
for column in dataset.values():
missing_idx.update([i for i, val in enumerate(column) if val == filter_val])
return {
k: [item for i, item in enumerate(v) if i not in missing_idx] for k, v in dataset.items()
}
```
%% Cell type:code id:32e9a1f8 tags:
``` python
# export
class Dataset(Item):
"""
Temporary dataset schema, needs update when MVP2 is done.
"""
properties= Item.properties + ["name", "queryStr"]
edges = Item.edges + ["item"]
def __init__(self, name: str = None, queryStr: str = None, item: list = None, **kwargs):
super().__init__(**kwargs)
self.queryStr = queryStr
self.name = name
self.item: list = item if item is not None else []
def _get_items(self):
if self._client is None:
raise ValueError("Dataset does not have associated PodClient.")
if not len(self.item):
edges = self._client.get_edges(self.id)
for e in self._client.get_edges(self.id):
self.add_edge(e["name"], e["item"])
return self.item
def _get_data(self, dtype: str, columns: List[str], filter_missing: bool = True):
if self._client is None:
raise ValueError("Dataset does not have associated PodClient.")
items = self._get_items()
query = Query("id", *columns)
result = query.execute(self._client, items)
if filter_missing:
result = filter_rows(result, filter_val=None)
return query.convert_dtype(result, dtype)
def to(self, dtype: str, columns: List[str], filter_missing: bool = True):
return self._get_data(dtype, columns, filter_missing)
def save(self, path: Union[Path, str], columns: List[str], filter_missing: bool = True):
result = self._get_data("pandas", columns, filter_missing)
result.to_csv(path, index=False)
```
%% Cell type:code id:5e11166c tags:
``` python
# hide
from pymemri.pod.client import PodClient
from pymemri.data.schema import Account, Person, Message, Label
from pymemri.data.itembase import Edge
import random
import tempfile
import pandas as pd
```
%% Cell type:code id:d0b0d8dd tags:
``` python
# hide
client = PodClient()
client.add_to_schema(Account, Person, Message, Dataset)
dataset = Dataset()
num_items = 10
messages = []
items = [dataset]
edges = []
for i in range(num_items):
msg = Message(content=f"content_{i}", service="my_service")
account = Account(handle=f"account_{i}")
person = Person(firstName=f"firstname_{i}")
label = Label(name=f"label_{i}")
items.extend([msg, account, person, label])
edges.extend([
Edge(dataset, msg, "item"),
Edge(msg, account, "sender"),
Edge(msg, label, "label"),
Edge(account, person, "owner")
])
messages.append(msg)
client.bulk_action(
create_items=items,
create_edges=edges
)
```
%% Output
BULK: Writing 81/81 items/edges
Completed Bulk action, written 81 items/edges
True
%% Cell type:code id:29b9c9cb tags:
``` python
# hide
dataframe = dataset.to("pd", columns=["content", "sender.owner.firstName", "label.name"])
dataframe.head()
assert isinstance(dataframe, pd.DataFrame)
dataframe.head()
```
%% Output
id content sender.owner.firstName \
0 9ec52c4d3f4945b1a57393a16845d6ac content_0 firstname_0
1 55a419ca44684a72af05254575c7bed1 content_1 firstname_1
2 9d6fa4d89a7c46cb837ee161950ca54d content_2 firstname_2
3 b9d511c8a3b84ae1ba697b6b456bd396 content_3 firstname_3
4 dce44cc400ce47d8b5763d960744440e content_4 firstname_4
label.name
0 label_0
1 label_1
2 label_2
3 label_3
4 label_4
%% Cell type:code id:824e0b06 tags:
``` python
# # hide
# with tempfile.TemporaryFile(mode='w+') as f:
# dataset.save(f, columns=["content", "sender.owner.firstName", "label.name"])
# f.seek(0)
# result = pd.read_csv(f)
# assert result.equals(dataframe)
```
%% Cell type:code id:23cc738a tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
%% Cell type:code id:ffcf3000 tags:
``` python
# default_exp exporters.exporters
%load_ext autoreload
%autoreload 2
```
%% Cell type:code id:c251979b tags:
``` python
# export
# hide
from typing import Dict, List, Optional, Iterable, Any
import pandas as pd
import json
from pymemri.pod.client import PodClient
from pymemri.data.itembase import Item
```
%% Cell type:markdown id:58299d4f tags:
# Query
With the `Query` class you can query the Pod with a list of properties, and export the result to a tabular dataformat. These properties can include edges that point to different items.
%% Cell type:code id:8d32b240 tags:
``` python
# export
class Query:
def __init__(self, *properties: List[str]):
"""
A Query implements functionality to retrieve data from the pod to a tabular format.
Given a list of `properties`, the `execute` method queries the pod for a set of given items,
and retrieves the properties for each item if it exists. Note that a properties can be nested behind
multiple edges, such as "sender.owner.firstName".
"""
self.properties = list(properties)
def traverse_edges(self, client: PodClient, items: List[Item], edges: List[str]) -> List[Item]:
def traverse_edges(self, client: "PodClient", items: List[Item], edges: List[str]) -> List[Item]:
items = items.copy()
for edge in edges:
ids_to_query = list()
query_item_idx = list()
for i in range(len(items)):
item = items[i]
if item is None:
continue
# Replace item with target item. If the edge is empty, it has to be queried again.
try:
if edge not in item.edges:
items[i] = None
else:
items[i] = getattr(item, edge)[0]
except Exception:
ids_to_query.append(item.id)
query_item_idx.append(i)
items[i] = None
new_items = client.search({"ids": ids_to_query})
for i, new_item in zip(query_item_idx, new_items):
try:
items[i] = getattr(new_item, edge)[0]
except Exception:
items[i] = None
return items
def get_property_values(
self, client: PodClient, prop: str, items: List[Item]
self, client: "PodClient", prop: str, items: List[Item]
) -> list:
edges, prop_name = self.parse_property(prop)
target_items = self.traverse_edges(client, items, edges)
result = [getattr(item, prop_name, None) for item in target_items]
return result
@staticmethod
def parse_property(prop: str):
prop = prop.split(".")
edges = prop[:-1]
prop = prop[-1]
return edges, prop
def convert_dtype(self, result, dtype):
if dtype == "dict":
return result
elif dtype == "list":
return [result[prop] for prop in self.properties]
elif dtype in {"pandas", "pd", "df"}:
return pd.DataFrame.from_dict(result)
else:
raise ValueError(f"Unknown dtype: {dtype}")
def execute(self, client: PodClient, items: List[Item], dtype="dict") -> Any:
def execute(self, client: "PodClient", items: List[Item], dtype="dict") -> Any:
result = {
prop: self.get_property_values(client, prop, items) for prop in self.properties
}
return self.convert_dtype(result, dtype)
```
%% Cell type:markdown id:708f56ad tags:
## Tests
%% Cell type:code id:be433175 tags:
``` python
# hide
from pymemri.pod.client import PodClient
from pymemri.data.schema import Account, Person, Message, Label
from pymemri.data.itembase import Edge
import random
```
%% Cell type:markdown id:84e3be22 tags:
### Create dummy data for dataset-
%% Cell type:code id:8cd43657 tags:
``` python
# hide
client = PodClient()
client.add_to_schema(Account, Person, Message)
num_items = 10
messages = []
items = []
edges = []
for i in range(num_items):
msg = Message(content=f"content_{i}", service="my_service")
account = Account(handle=f"account_{i}")
person = Person(firstName=f"firstname_{i}")
label = Label(name=f"label_{i}")
items.extend([msg, account, person, label])
edges.extend([
Edge(msg, account, "sender"),
Edge(msg, label, "label"),
Edge(account, person, "owner")
])
messages.append(msg)
# Dataset is not perfect, drop some random edges
edges = random.sample(edges, int(len(edges)*0.8))
client.bulk_action(
create_items=items,
create_edges=edges
)
```
%% Output
BULK: Writing 64/64 items/edges
Completed Bulk action, written 64 items/edges
True
%% Cell type:markdown id:3440b126 tags:
### Example usage
For example, if we have a list of Messages and we want to get message content,
the accompanying account handles and account owner names, we can query:
%% Cell type:code id:c6e44269 tags:
``` python
messages = client.search({"type": "Message", "service": "my_service"})
query = Query("content", "sender.handle", "sender.owner.firstName")
query.execute(client, messages, dtype="pandas")
```
%% Output
content sender.handle sender.owner.firstName
0 content_4 account_4 firstname_4
1 content_1 account_1 None
2 content_2 account_2 firstname_2
0 content_0 account_0 firstname_0
1 content_1 account_1 firstname_1
2 content_2 account_2 None
3 content_3 account_3 firstname_3
4 content_5 account_5 firstname_5
5 content_6 account_6 firstname_6
6 content_0 account_0 firstname_0
7 content_7 account_7 None
4 content_4 account_4 firstname_4
5 content_5 account_5 None
6 content_6 account_6 None
7 content_7 account_7 firstname_7
8 content_8 account_8 firstname_8
9 content_9 account_9 None
9 content_9 None None
%% Cell type:code id:e79fe310 tags:
``` python
q = Query("content", "label.name", "sender.owner.firstName", "sender.handle", "wrong_property")
result = q.execute(client, messages)
result
```
%% Output
{'content': ['content_4',
{'content': ['content_0',
'content_1',
'content_2',
'content_3',
'content_4',
'content_5',
'content_6',
'content_0',
'content_7',
'content_8',
'content_9'],
'label.name': [None,
'label.name': ['label_0',
'label_1',
'label_2',
None,
None,
'label_4',
'label_5',
'label_6',
'label_0',
'label_7',
None,
'label_8',
'label_9'],
'sender.owner.firstName': ['firstname_4',
'sender.owner.firstName': ['firstname_0',
'firstname_1',
None,
'firstname_2',
'firstname_3',
'firstname_5',
'firstname_6',
'firstname_0',
'firstname_4',
None,
None,
'firstname_7',
'firstname_8',
None],
'sender.handle': ['account_4',
'sender.handle': ['account_0',
'account_1',
'account_2',
'account_3',
'account_4',
'account_5',
'account_6',
'account_0',
'account_7',
'account_8',
'account_9'],
None],
'wrong_property': [None,
None,
None,
None,
None,
None,
None,
None,
None,
None]}
%% Cell type:code id:6fcef105 tags:
``` python
# hide
q = Query("content", "label.name", "sender.owner.firstName", "sender.handle", "wrong_property")
result = q.execute(client, messages)
assert all(len(vals) == len(result["content"]) for vals in result.values())
assert len(result["content"]) == num_items
# Check if columns all align
valid_props = ["label.name", "sender.owner.firstName", "sender.handle"]
for i in range(num_items):
row = [result[prop][i] for prop in valid_props]
row_idx = [val[-1] for val in row if val is not None]
assert len(set(row_idx)) <= 1
assert all(val is None for val in result["wrong_property"])
```
%% Cell type:code id:5013f1bb tags:
``` python
q = Query("content", "label.name", "sender.owner.firstName", "sender.handle")
result = q.execute(client, messages, dtype="pandas")
result.head()
```
%% Output
content label.name sender.owner.firstName sender.handle
0 content_0 label_0 None None
0 content_0 label_0 firstname_0 account_0
1 content_1 label_1 firstname_1 account_1
2 content_2 None firstname_2 account_2
3 content_3 label_3 None account_3
4 content_4 label_4 None None
%% Cell type:markdown id:4022a364 tags:
# Exporting Datasets
%% Cell type:code id:a36944a3 tags:
``` python
# export
class Dataset(Item):
"""
Temporary dataset schema, remove when MVP2 is done.
"""
properties= Item.properties + ["name", "queryStr"]
edges = Item.edges + ["item"]
def __init__(self, name: str = None, queryStr: str = None, item: list = None, **kwargs):
super().__init__(**kwargs)
self.queryStr = queryStr
self.name = name
self.item: list = item if item is not None else []
```
%% Cell type:code id:3771c21f tags:
``` python
# export
def filter_missing(dataset: dict) -> dict:
missing_idx = set()
for column in dataset.values():
missing_idx.update([i for i, val in enumerate(column) if val is None])
return {
k: [item for i, item in enumerate(v) if i not in missing_idx] for k, v in dataset.items()
}
def export_dataset(
client: PodClient,
dataset: Dataset,
content_fields: List[str] = ["content"],
label_field: str = "label.value",
missing_values: bool = False,
dtype: str = "dict",
):
items = dataset.item
query = Query("id", *content_fields, label_field)
result = query.execute(client, dataset.item)
if not missing_values:
result = filter_missing(result)
return query.convert_dtype(result, dtype)
```
%% Cell type:code id:80d46659 tags:
``` python
# hide
client.add_to_schema(Dataset)
search_query = json.dumps({"type": "Message"})
messages = [client.get(msg.id) for msg in messages]
dataset = Dataset(search_query)
edges = [
Edge(dataset, msg, "item") for msg in messages
]
client.bulk_action(
create_items = [dataset],
create_edges = edges
)
```
%% Output
BULK: Writing 11/11 items/edges
Completed Bulk action, written 11 items/edges
True
%% Cell type:code id:4736e26a tags:
``` python
# hide
dataset = client.get(dataset.id)
df = export_dataset(client, dataset, dtype="pandas")
df
```
%% Output
id content label.name
0 96edc0082df740b8a704ceb5c7447f13 content_0 label_0
1 55f8381205ef4ac98e47e17254aa26e1 content_1 label_1
2 4adcf318e6254947b113b8d65d387dc2 content_3 label_3
3 10b8af9fcf9e4a619bba7310083dcdd2 content_4 label_4
4 ea76f206a6b1422aaef77162abceb1a7 content_5 label_5
5 2cd1dedf4fa245feaec64c3ceddd3ba6 content_6 label_6
6 62ce7bda27fc4347b284e1d1329ad785 content_7 label_7
7 eeeecc2165ed4051aaa248aa03458e0d content_8 label_8
8 4f7389b314cf46559dac6601f8d46588 content_9 label_9
2 content_2 None None account_2
3 content_3 None firstname_3 account_3
4 content_4 label_4 firstname_4 account_4
%% Cell type:code id:958ba72c tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.imports import *
from hashlib import sha256
from pymemri.pod.db import DB
from pymemri.pod.utils import *
from pymemri.plugin.schema import *
from pymemri.test_utils import get_ci_variables
from pymemri.pod.api import PodAPI, PodError, DEFAULT_POD_ADDRESS, POD_VERSION
from typing import List, Union
import uuid
import urllib
from datetime import datetime
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
# Mapping from python type to schema type
# TODO move to data.schema once schema is refactored
TYPE_TO_SCHEMA = {
bool: "Bool",
str: "Text",
int: "Integer",
float: "Real",
datetime: "DateTime",
}
def __init__(
self,
url=DEFAULT_POD_ADDRESS,
version=POD_VERSION,
database_key=None,
owner_key=None,
auth_json=None,
register_base_schema=True,
verbose=False,
):
self.verbose = verbose
self.database_key = (
database_key if database_key is not None else self.generate_random_key()
)
self.owner_key = (
owner_key if owner_key is not None else self.generate_random_key()
)
self.api = PodAPI(
database_key=self.database_key,
owner_key=self.owner_key,
url=url,
version=version,
auth_json=auth_json,
verbose=verbose,
)
self.api.test_connection()
self.local_db = DB()
self.registered_classes = dict()
self.register_base_schemas()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(
database_key=read_pod_key("database_key"),
owner_key=read_pod_key("owner_key"),
**kwargs,
)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(self):
assert self.add_to_schema(PluginRun, CVUStoredDefinition, Account, Photo)
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
self.local_db.add(node)
def reset_local_db(self):
self.local_db = DB()
def get_create_dict(self, node):
properties = node.to_json()
properties = {k: v for k, v in properties.items() if v != []}
return properties
def create(self, node):
create_dict = self.get_create_dict(node)
try:
result = self.api.create_item(create_dict)
node.id = result
self.add_to_db(node)
node._set_client(self)
return True
except Exception as e:
print(e)
return False
def create_photo(self, photo):
file = photo.file[0]
# create the photo
items_edges_success = self.bulk_action(
create_items=[photo, file], create_edges=photo.get_edges("file")
)
if not items_edges_success:
raise ValueError("Could not create file or photo item")
return self._upload_image(photo.data)
def _property_dicts_from_instance(self, node):
create_items = []
attributes = node.to_json()
for k, v in attributes.items():
if type(v) not in self.TYPE_TO_SCHEMA:
raise ValueError(f"Could not add property {k} with type {type(v)}")
value_type = self.TYPE_TO_SCHEMA[type(v)]
create_items.append(
{
"type": "ItemPropertySchema",
"itemType": attributes["type"],
"propertyName": k,
"valueType": value_type,
}
)
return create_items
def _property_dicts_from_type(self, item):
create_items = []
for property, p_type in item.get_property_types().items():
p_type = self.TYPE_TO_SCHEMA[p_type]
create_items.append(
{
"type": "ItemPropertySchema",
"itemType": item.__name__,
"propertyName": property,
"valueType": p_type,
}
)
return create_items
def add_to_schema(self, *items: List[Union[object, type]]):
create_items = []
for item in items:
if isinstance(item, type):
property_dicts = self._property_dicts_from_type(item)
else:
property_dicts = self._property_dicts_from_instance(item)
item = type(item)
create_items.extend(property_dicts)
self.registered_classes[item.__name__] = item
try:
self.api.bulk(create_items=create_items)
return True
except Exception as e:
print(e)
return False
def _upload_image(self, img):
if isinstance(img, np.ndarray):
return self.upload_file(img.tobytes())
elif isinstance(img, bytes):
return self.upload_file(img)
else:
raise ValueError(f"Unknown image data type {type(img)}")
def upload_file(self, file):
try:
self.api.upload_file(file)
return True
except PodError as e:
# 409 = CONFLICT, file already exists
if e.status == 409:
return True
return False
def get_file(self, sha):
return self.api.get_file(sha)
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(
f"Could not load data of {photo} attached file item does not have data in pod"
)
return
photo.data = file
else:
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None:
return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
return self.bulk_action(create_edges=edges)
def delete_items(self, items):
return self.bulk_action(delete_items=items)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
@staticmethod
def gather_batch(items, start_idx, start_size=0, max_size=5000000):
idx = start_idx
total_size = start_size
batch_items = []
for i, x in enumerate(items):
if i < idx:
continue
elif len(str(x)) > max_size:
idx = i + 1
print("Could not add item: Item exceeds max item size")
elif total_size + len(str(x)) < max_size:
batch_items.append(x)
total_size += len(str(x))
idx = i + 1
else:
break
return batch_items, idx, total_size
def bulk_action(
self, create_items=None, update_items=None, create_edges=None, delete_items=None
):
all_items = []
if create_items:
all_items += create_items
if update_items:
all_items += update_items
for item in all_items:
item._set_client(self)
# we need to set the id to not lose the reference
if create_items is not None:
for c in create_items:
if c.id is None:
c.id = uuid.uuid4().hex
create_items = (
[self.get_create_dict(i) for i in create_items]
if create_items is not None
else []
)
update_items = (
[self.get_update_dict(i) for i in update_items]
if update_items is not None
else []
)
create_edges = (
[self.get_create_edge_dict(i) for i in create_edges]
if create_edges is not None
else []
)
# Note: skip delete_items without id, as items that are not in pod cannot be deleted
delete_items = (
[item.id for item in delete_items if item.id is not None]
if delete_items is not None
else []
)
n_total = len(create_items + update_items + create_edges + delete_items)
n = 0
i_ci, i_ui, i_ce, i_di = 0, 0, 0, 0
while not (
i_ci == len(create_items)
and i_ui == len(update_items)
and i_ce == len(create_edges)
and i_di == len(delete_items)
):
batch_size = 0
create_items_batch, i_ci, batch_size = self.gather_batch(
create_items, i_ci, start_size=batch_size
)
update_items_batch, i_ui, batch_size = self.gather_batch(
update_items, i_ui, start_size=batch_size
)
delete_items_batch, i_di, batch_size = self.gather_batch(
delete_items, i_di, start_size=batch_size
)
if i_ci == len(create_items):
create_edges_batch, i_ce, batch_size = self.gather_batch(
create_edges, i_ce, start_size=batch_size
)
else:
create_edges_batch = []
n_batch = len(
create_items_batch
+ update_items_batch
+ create_edges_batch
+ delete_items_batch
)
n += n_batch
print(f"BULK: Writing {n}/{n_total} items/edges")
try:
result = self.api.bulk(
create_items_batch,
update_items_batch,
create_edges_batch,
delete_items_batch,
)
except PodError as e:
print(e)
print("could not complete bulk action, aborting")
return False
print(f"Completed Bulk action, written {n} items/edges")
return True
def get_create_edge_dict(self, edge):
return {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
def create_edge(self, edge):
edge_dict = self.get_create_edge_dict(edge)
try:
self.api.create_edge(edge_dict)
return True
except PodError as e:
print(e)
return False
def get(self, id, expanded=True, include_deleted=False):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
raise ValueError(f"Item with id {id} does not exist")
elif res.deleted and not include_deleted:
print(f"Item with id {id} has been deleted")
return
return res
def get_all_items(self):
raise NotImplementedError()
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id, include_deleted=False):
item = self.get(id, expanded=False, include_deleted=include_deleted)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
def get_edges(self, id):
try:
result = self.api.get_edges(id)
for d in result:
d["item"] = self.item_from_json(d["item"])
return result
except PodError as e:
print(e)
return
def _get_item_with_properties(self, id):
try:
result = self.api.get_item(str(id))
if not len(result):
return
return self.item_from_json(result[0])
except PodError as e:
print(e)
return
def get_update_dict(self, node):
properties = node.to_json(dates=False)
properties.pop("type", None)
properties.pop("deleted", None)
return properties
def update_item(self, node):
data = self.get_update_dict(node)
try:
self.api.update_item(data)
node._set_client(self)
return True
except PodError as e:
print(e)
return False
def exists(self, id):
try:
result = self.api.get_item(str(id))
if isinstance(result, list) and len(result) > 0:
return True
return False
except PodError as e:
print(e)
return False
def search_paginate(self, fields_data, limit=50, include_edges=True, add_to_local_db: bool = True):
if "ids" in fields_data:
raise NotImplementedError("Searching by multiple IDs is not implemented for paginated search.")
extra_fields = {"[[edges]]": {}} if include_edges else {}
query = {**fields_data, **extra_fields}
try:
for page in self.api.search_paginate(query, limit):
result = [self._item_from_search(item, add_to_local_db=add_to_local_db) for item in page]
yield self.filter_deleted(result)
except PodError as e:
print(e)
def search(self, fields_data, include_edges: bool = True, add_to_local_db: bool = True):
extra_fields = {"[[edges]]": {}} if include_edges else {}
query = {**fields_data, **extra_fields}
# Special key "ids" for searching a list of ids.
# Requires /bulk search instead of /search.
if "ids" in query:
ids = query.pop("ids")
bulk_query = [{"id": uid, **query} for uid in ids]
try:
result = self.api.bulk(search=bulk_query)["search"]
except PodError as e:
print(e)
result = [item for sublist in result for item in sublist]
else:
try:
result = self.api.search(query)
except PodError as e:
print(e)
result = [self._item_from_search(item, add_to_local_db=add_to_local_db) for item in result]
return self.filter_deleted(result)
def _item_from_search(self, item_json: dict, add_to_local_db: bool = True):
# search returns different fields w.r.t. edges compared to `get` api,
# different method to keep `self.get` clean.
item = self.item_from_json(item_json)
for edge_json in item_json.get("[[edges]]", []):
edge_name = edge_json["_edge"]
try:
edge_item = self.item_from_json(edge_json["_item"])
item.add_edge(edge_name, edge_item)
except Exception as e:
print(f"Could not attach edge {edge_json['_item']} to {item}")
print(e)
continue
return item
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return self.search(query)[0]
def item_from_json(self, json, add_client_ref: bool = True):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(
json["type"],
plugin_class,
plugin_package=plugin_package,
extra=self.registered_classes,
)
new_item = constructor.from_json(json)
existing = self.local_db.get(new_item.id)
# TODO: cleanup
if existing is not None:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
existing._set_client(self) if add_client_ref else None
return existing
else:
new_item._set_client(self) if add_client_ref else None
return new_item
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties:
del properties[ALL_EDGES]
return properties
def send_email(self, to, subject="", body=""):
try:
self.api.send_email(to, subject, body)
print(f"succesfully sent email to {to}")
return True
except PodError as e:
print(e)
return False
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys.
If you want to use the same keys for different `PodClient` instances, you can store a random key pair locally with the `store_keys` CLI, and create a new client with `PodClient.from_local_keys()`. When you are using the app, setting the keys in the pod, and passing them when calling a plugin is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
client.registered_classes["Photo"]
```
%% Output
pymemri.data.photo.Photo
---------------------------------------------------------------------------
NameError Traceback (most recent call last)
/tmp/ipykernel_8011/1587609234.py in <module>
----> 1 client = PodClient()
2 client.registered_classes["Photo"]
/tmp/ipykernel_8011/1146046149.py in __init__(self, url, version, database_key, owner_key, auth_json, register_base_schema, verbose)
39 self.local_db = DB()
40 self.registered_classes = dict()
---> 41 self.register_base_schemas()
42
43 @classmethod
/tmp/ipykernel_8011/1146046149.py in register_base_schemas(self)
54
55 def register_base_schemas(self):
---> 56 assert self.add_to_schema(PluginRun, CVUStoredDefinition, Account, Photo)
57
58 def add_to_db(self, node):
NameError: name 'PluginRun' is not defined
%% Cell type:code id: tags:
``` python
# hide
success = client.api.test_connection()
assert success
```
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. To create an item in the pod, you have to add the schema first. Schemas can be added as follows
%% Cell type:code id: tags:
``` python
from pymemri.data.schema import EmailMessage, Address, PhoneNumber
succes = client.add_to_schema(EmailMessage, Address, PhoneNumber)
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
```
%% Cell type:markdown id: tags:
We can now create an item with one of the above item definitions. As a side-effect, our item will be assigned an id.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
print(email_item.id)
```
%% Output
32cc6109a357186ee028e1fe88416a96
%% Cell type:markdown id: tags:
The types of items in the pod are not limited to definitions to the pymemri schema. We can easily define our own types, or overwrite existing item definitions with the same `add_to_schema` method.
Note that all keyword arguments need to be added to the `properties` class variable to let the pod know what the properties of our item are. Additionally, properties in the Pod are statically typed, and have to be inferred from type the annotations of our `__init__` method.
%% Cell type:code id: tags:
``` python
# export
class Dog(Item):
properties = Item.properties + ["name", "age", "bites", "weight"]
def __init__(self, name: str=None, age: int=None, bites: bool=False, weight: float=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
self.bites = bites
self.weight = weight
```
%% Cell type:code id: tags:
``` python
client.add_to_schema(Dog)
dog2 = Dog(name="bob", age=3, weight=33.2)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
# hide
client.reset_local_db()
dog_from_db = client.get(dog2.id)
assert dog_from_db.name == "bob"
assert dog_from_db.age == 3
assert dog_from_db.weight == 33.2
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
print(client.get_edges(email_item.id))
```
%% Output
[{'item': Person (#e8612025cf0f439120750e8b8e41948a), 'name': 'sender'}]
%% Cell type:code id: tags:
``` python
# hide
assert item_succes
assert edge_succes
```
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
print(email_from_db.sender)
```
%% Output
[Person (#e8612025cf0f439120750e8b8e41948a)]
%% Cell type:code id: tags:
``` python
# hide
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
client.create(person_item)
# Retrieve person from Pod
person_from_db = client.get(person_item.id, expanded=False)
```
%% Cell type:code id: tags:
``` python
# hide
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
print(person_from_db.lastName)
```
%% Output
Awesome
%% Cell type:code id: tags:
``` python
# hide
assert person_from_db.lastName == "Awesome"
```
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:markdown id: tags:
the `PodClient` can search through the pod with the `search` or `search_paginate` methods, which return the results of a search as a list or generator respectively. Search uses the same arguments as the Pod search API, which can be found [here](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#post-v4owner_keysearch).
To display how search works, we first add a few new items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
person_account = Account(service="testService")
client.create(person_item2)
client.create(person_account)
person_item2.add_edge("account", person_account)
client.create_edges(person_item2.get_edges("account"));
```
%% Output
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_6076/495361486.py in <module>
5
6 person_item2.add_edge("account", person_account)
----> 7 client.create_edges(person_item2.get_edges("account"));
/tmp/ipykernel_6076/3366515080.py in create_edges(self, edges)
193
194 def create_edges(self, edges):
--> 195 return self.bulk_action(create_edges=edges)
196
197 def delete_items(self, items):
/tmp/ipykernel_6076/3366515080.py in bulk_action(self, create_items, update_items, create_edges, delete_items)
224 self, create_items=None, update_items=None, create_edges=None, delete_items=None
225 ):
--> 226 for items in create_items + update_items:
227 item._client = self
228
TypeError: unsupported operand type(s) for +: 'NoneType' and 'NoneType'
%% Cell type:code id: tags:
``` python
# hide
client.reset_local_db()
```
%% Cell type:code id: tags:
``` python
# Search for all Persons in the pod
all_people = client.search({"type": "Person"}, include_edges=True)
print("Number of results:", len(all_people))
```
%% Cell type:code id: tags:
``` python
# hide
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert any([len(p.account) for p in all_people])
```
%% Cell type:code id: tags:
``` python
# hide
# search without returning edges
all_people = client.search({"type": "Person"}, include_edges=False)
assert len(all_people)
assert([len(person.get_all_edges())==0 for person in all_people])
```
%% Cell type:code id: tags:
``` python
# hide
assert len(all_people)
```
%% Cell type:code id: tags:
``` python
# hide
# Search with edges
all_people = client.search({"type": "Person"}, include_edges=True)
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
assert any([len(p.account) for p in all_people])
```
%% Cell type:code id: tags:
``` python
# Search by IDs
ids = [person.id for person in all_people]
result = client.search({"ids": ids})
assert [item.id for item in result] == ids
```
%% Cell type:markdown id: tags:
To hande large volumes of Items, the `PodClient.search_paginate` method can search through the pod and return a generator which yields batches of items. This method uses the same search arguments as the `search` method:
%% Cell type:code id: tags:
``` python
# Create 100 accounts to search
client.bulk_action(
create_items=[
Account(identifier=str(i), service="paginate_test") for i in range(100)
]
)
generator = client.search_paginate({"type": "Account", "service": "paginate_test"}, limit=10)
for page in generator:
# process accounts
pass
```
%% Cell type:code id: tags:
``` python
# hide
# Test pagination
accounts = client.search({"type": "Account", "service": "paginate_test"})
generator = client.search_paginate({"type": "Account", "service": "paginate_test"}, limit=10)
accounts_paginated = []
for page in generator:
accounts_paginated.extend(page)
assert len(accounts_paginated) == 100
assert [a.id for a in accounts] == [a.id for a in accounts_paginated]
```
%% Cell type:code id: tags:
``` python
# hide
# Search, edge cases
result = client.search({"type": "Account", "service": "NonExistentService"})
assert result == []
paginator = client.search_paginate({"type": "Account", "service": "NonExistentService"})
try:
next(paginator)
except Exception as e:
if not isinstance(e, StopIteration):
assert False
```
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2)
last_added = client.search_last_added(type="Person")
```
%% Cell type:code id: tags:
``` python
# hide
assert last_added.firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files like Photos or Videos, the `PodClient` has a separate file api. This api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
For example, we can upload a photo with the file API as follows:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(photo.data)
```
%% Cell type:code id: tags:
``` python
# hide
assert succes
assert succes2
data = client.get_file(file.sha256)
photo.data = data
arr = photo.to_np()
assert (arr == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
The PodClient implements an easier API for photos separately, which uses the same file API under the hood
%% Cell type:code id: tags:
``` python
print(client.registered_classes["Photo"])
# client.add_to_schema(Photo)
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = Photo.from_np(x)
client.create_photo(photo);
photo.file
```
%% Cell type:code id: tags:
``` python
# hide
res = client.get_photo(photo.id)
print(res.id)
res.file[0].sha256
assert (res.to_np() == x).all()
```
%% Cell type:markdown id: tags:
Some photos come as bytes, for example when downloading them from a third party service. We can use `photo.from_bytes` to initialize these photos:
%% Cell type:code id: tags:
``` python
byte_photo = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\xe1\x00\x00\x00\xe1\x08\x03\x00\x00\x00\tm"H\x00\x00\x003PLTE\x04\x02\x04\x00\x00\x00\xa0\xa0\xa0\xa3\xa3\xa3\xaa\xaa\xaa\xb4\xb4\xb4\xbd\xbe\xbd\xbb\xbc\xbb\xde\xde\xde\x9b\x9a\x9b\xfe\xfe\xfe\xf2\xf3\xf2\xe5\xe6\xe5\xd8\xd9\xd8\xd1\xd1\xd1\xc9\xca\xc9\xae\xae\xae\x80k\x98\xfc\x00\x00\x01TIDATx\x9c\xed\xdd;r\xc2P\x00\x04A!\x90\x84\xfc\x01\xee\x7fZ\x138\xb1\x13S\xceF\xaf\xfb\x06\x93o\xd5No\xef\x1f\x9f\xb7\xfb}]\xd7my\xba|;\xff4\xff\xdf\xf9O\x97W<\x96W\xac\xbfm\xd7i9\x1d\xdb\xfe,\x9c\x8e\xec4+\xac{\x16^\x14\xb6)\xecS\xd8\xa7\xb0Oa\x9f\xc2\xbe!\n\xcf\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xce\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87\xc4bHa\x9c\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x86xaQ\x18\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd87D\xe1\xe3\xf0\x85\x8b\xc26\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}\n\xfb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14\xae\n\xdb\x14\xf6)\xecS\xd8\xa7\xb0Oa\x9f\xc2>\x85}C\x14n\xa7c\xdb\xa7\xeb>\x1f\xd9~\xfb\x02\xee\x7f\r\xe5\xe1h\x04"\x00\x00\x00\x00IEND\xaeB`\x82'
photo = Photo.from_bytes(byte_photo)
client.create_photo(photo);
```
%% Cell type:code id: tags:
``` python
# hide
# Test on a new client to prevent caching
new_client = PodClient(database_key=client.database_key, owner_key=client.owner_key)
res = new_client.get_photo(photo.id, size=225)
assert res.data == photo.data
```
%% Cell type:markdown id: tags:
## Bulk API
%% Cell type:markdown id: tags:
Adding each item separately to the pod with the `create` method can take a lot of time. For this reason, using the bulk API is faster and more convenient in most cases. Here we show creating items and edges, updating and deleting is also possible.
%% Cell type:code id: tags:
``` python
# Create 100 Dogs to add to the pod, and two edges to a new person
dogs = [Dog(name=f"dog number {i}") for i in range(100)]
person = Person(firstName="Alice")
edge1 = Edge(dogs[0], person, "label")
edge2 = Edge(dogs[1], person, "label")
# Simultaneously add the dogs, person, and edges with the bulk API
success = client.bulk_action(create_items=dogs + [person], create_edges=[edge1, edge2])
```
%% Cell type:code id: tags:
``` python
# hide
dogs = client.search({"type": "Dog"})
dogs_with_edge = [item for item in dogs if len(item.get_all_edges())]
print(len(dogs_with_edge))
assert len(dogs_with_edge) == 2
for d in dogs_with_edge:
assert len(d.label) > 0
```
%% Cell type:code id: tags:
``` python
# hide
# test bulk delete and update
# Change person name, delete first dog :(
person.firstName = "Bob"
to_delete = [dogs[0]]
to_update = [person]
client.bulk_action(delete_items=to_delete, update_items=to_update)
dogs_with_edge = [
item for item in client.search({"type": "Dog"}) if item.name.startswith("dog number 0") or item.name.startswith("dog number 1 ")
]
assert len(dogs_with_edge) == 1
dog = dogs_with_edge[0]
assert dog.label[0].firstName == "Bob"
```
%% Cell type:markdown id: tags:
# Sending emails -
%% Cell type:code id: tags:
``` python
# hide
# skip
to = "myemail@gmail.com"
client.send_email(to=to, subject="test", body="test2")
```
%% Cell type:markdown id: tags:
# Create items that do not exist in the Pod
The `PodClient` can deduplicate items with the same externalId with the `create_if_external_id_not_exists` method.
%% Cell type:code id: tags:
``` python
person_item = Person(firstName="Eve", externalId="gmail_1")
person_item2 = Person(firstName="Eve2", externalId="gmail_1")
client.create_if_external_id_not_exists(person_item)
client.create_if_external_id_not_exists(person_item2)
existing = client.search({"externalId": "gmail_1"})
```
%% Cell type:code id: tags:
``` python
# hide
assert len(existing) == 1
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
......
......@@ -15,13 +15,12 @@ index = {"read_file": "basic.ipynb",
"CVU_BASE_PATH": "cvu.utils.ipynb",
"get_default_cvu": "cvu.utils.ipynb",
"list_default_cvus": "cvu.utils.ipynb",
"filter_rows": "data.dataset.ipynb",
"Dataset": "data.dataset.ipynb",
"DEFAULT_ENCODING": "data.photo.ipynb",
"show_images": "data.photo.ipynb",
"Photo": "data.photo.ipynb",
"Query": "exporters.exporters.ipynb",
"Dataset": "exporters.exporters.ipynb",
"filter_missing": "exporters.exporters.ipynb",
"export_dataset": "exporters.exporters.ipynb",
"ALL_EDGES": "itembase.ipynb",
"Edge": "itembase.ipynb",
"ItemBase": "itembase.ipynb",
......@@ -96,6 +95,7 @@ index = {"read_file": "basic.ipynb",
modules = ["data/basic.py",
"cvu/utils.py",
"data/dataset.py",
"data/photo.py",
"exporters/exporters.py",
"data/itembase.py",
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/data.dataset.ipynb (unless otherwise specified).
__all__ = ['filter_rows', 'Dataset']
# Cell
# hide
from typing import List, Union
from pathlib import Path
from .itembase import Item
from ..exporters.exporters import Query
# Cell
def filter_rows(dataset: dict, filter_val=None) -> dict:
missing_idx = set()
for column in dataset.values():
missing_idx.update([i for i, val in enumerate(column) if val == filter_val])
return {
k: [item for i, item in enumerate(v) if i not in missing_idx] for k, v in dataset.items()
}
# Cell
class Dataset(Item):
"""
Temporary dataset schema, needs update when MVP2 is done.
"""
properties= Item.properties + ["name", "queryStr"]
edges = Item.edges + ["item"]
def __init__(self, name: str = None, queryStr: str = None, item: list = None, **kwargs):
super().__init__(**kwargs)
self.queryStr = queryStr
self.name = name
self.item: list = item if item is not None else []
def _get_items(self):
if self._client is None:
raise ValueError("Dataset does not have associated PodClient.")
if not len(self.item):
edges = self._client.get_edges(self.id)
for e in self._client.get_edges(self.id):
self.add_edge(e["name"], e["item"])
return self.item
def _get_data(self, dtype: str, columns: List[str], filter_missing: bool = True):
if self._client is None:
raise ValueError("Dataset does not have associated PodClient.")
items = self._get_items()
query = Query("id", *columns)
result = query.execute(self._client, items)
if filter_missing:
result = filter_rows(result, filter_val=None)
return query.convert_dtype(result, dtype)
def to(self, dtype: str, columns: List[str], filter_missing: bool = True):
return self._get_data(dtype, columns, filter_missing)
def save(self, path: Union[Path, str], columns: List[str], filter_missing: bool = True):
result = self._get_data("pandas", columns, filter_missing)
result.to_csv(path, index=False)
\ No newline at end of file
......@@ -2,6 +2,7 @@ import random, string
from .itembase import ItemBase, Edge, Item
from ._central_schema import *
from .photo import Photo
from .dataset import Dataset
def get_constructor(_type, plugin_class=None, plugin_package=None, extra=None):
......
from .exporters import export_dataset
__all__ = ["export_dataset"]
\ No newline at end of file
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/exporters.exporters.ipynb (unless otherwise specified).
__all__ = ['Query', 'Dataset', 'filter_missing', 'export_dataset']
__all__ = ['Query']
# Cell
# hide
......@@ -8,7 +8,6 @@ from typing import Dict, List, Optional, Iterable, Any
import pandas as pd
import json
from ..pod.client import PodClient
from ..data.itembase import Item
# Cell
......@@ -23,7 +22,7 @@ class Query:
"""
self.properties = list(properties)
def traverse_edges(self, client: PodClient, items: List[Item], edges: List[str]) -> List[Item]:
def traverse_edges(self, client: "PodClient", items: List[Item], edges: List[str]) -> List[Item]:
items = items.copy()
for edge in edges:
......@@ -54,7 +53,7 @@ class Query:
return items
def get_property_values(
self, client: PodClient, prop: str, items: List[Item]
self, client: "PodClient", prop: str, items: List[Item]
) -> list:
edges, prop_name = self.parse_property(prop)
target_items = self.traverse_edges(client, items, edges)
......@@ -79,48 +78,8 @@ class Query:
else:
raise ValueError(f"Unknown dtype: {dtype}")
def execute(self, client: PodClient, items: List[Item], dtype="dict") -> Any:
def execute(self, client: "PodClient", items: List[Item], dtype="dict") -> Any:
result = {
prop: self.get_property_values(client, prop, items) for prop in self.properties
}
return self.convert_dtype(result, dtype)
# Cell
class Dataset(Item):
"""
Temporary dataset schema, remove when MVP2 is done.
"""
properties= Item.properties + ["name", "queryStr"]
edges = Item.edges + ["item"]
def __init__(self, name: str = None, queryStr: str = None, item: list = None, **kwargs):
super().__init__(**kwargs)
self.queryStr = queryStr
self.name = name
self.item: list = item if item is not None else []
# Cell
def filter_missing(dataset: dict) -> dict:
missing_idx = set()
for column in dataset.values():
missing_idx.update([i for i, val in enumerate(column) if val is None])
return {
k: [item for i, item in enumerate(v) if i not in missing_idx] for k, v in dataset.items()
}
def export_dataset(
client: PodClient,
dataset: Dataset,
content_fields: List[str] = ["content"],
label_field: str = "label.value",
missing_values: bool = False,
dtype: str = "dict",
):
items = dataset.item
query = Query("id", *content_fields, label_field)
result = query.execute(client, dataset.item)
if not missing_values:
result = filter_missing(result)
return query.convert_dtype(result, dtype)
\ No newline at end of file
return self.convert_dtype(result, dtype)
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment