Commit 85cac2bb authored by Youp's avatar Youp
Browse files

- Refactored the pod_client to have an extra primitives-layer, decreasing...

- Refactored the pod_client to have an extra primitives-layer, decreasing coupling while increasing testability
- Started creating tests for the PodClient
- Not ready for merge yet
No related merge requests found
Showing with 631 additions and 201 deletions
+631 -201
......@@ -60,4 +60,4 @@ sidebars:
- home_sidebar
theme: jekyll-theme-cayman
baseurl: /integrators/
baseurl: /integrators
......@@ -172,6 +172,14 @@ class ItemBase():
for e in self.get_all_edges():
e.update(api)
def to_dict(self):
res = dict()
for k,v in self.__dict__.items():
if k[:1] != '_' and not (isinstance(v, list) and len(v)>0 and isinstance(v[0], Edge)) and v is not None:
res[k] = v
res["_type"] = self.__class__.__name__
return res
def exists(self, api):
res = api.search_by_fields({"uid": self.uid})
return len(res) == 1
......
......@@ -6,11 +6,13 @@ __all__ = ['API_URL', 'PodClient']
from ..itembase import Edge, ItemBase
from ..schema import *
from ..imports import *
import hashlib
# Cell
API_URL = "http://localhost:3030/v2"
# Cell
class PodClient:
def __init__(self, url=API_URL, database_key=None, owner_key=None):
......@@ -20,59 +22,79 @@ class PodClient:
self.database_key=database_key
self.owner_key=owner_key
def test_connection(self, verbose=True):
try:
res = requests.get(self.url)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
print("Could no connect to backend")
return False
# PRIMITIVE FUNCTIONS
def version(self):
result = requests.get(f'{self.url}/version',
verify=False)
print(result.content)
def create(self, node):
# if node.uid is None:
# print(f"Error, node {node} has no uid, not creating")
try:
body = { "databaseKey": self.database_key, "payload":self.get_properties_json(node) }
def get_item(self, uid):
wrapped_item = {
'databaseKey': self.database_key,
'payload': uid,
}
result = requests.post(f'{self.base_url}/get_item',
json=wrapped_item,
verify=False)
result = requests.post(f"{self.base_url}/create_item",
json=body)
if result.ok:
# TODO: get_item actually returns a list
return result.json()
else:
# TODO: better exception handling
raise Exception(result.status_code, result.text)
def get_all_items(self):
NotImplementedError()
def create_item(self, item, item_type=None):
# TODO: unify with create_item once https/http difference is resolved
if item_type != None:
item['_type'] = item_type
wrapped_item = {
'databaseKey': self.database_key,
'payload': item,
}
result = requests.post(f'{self.base_url}/create_item',
json=wrapped_item,
verify=False)
if result.ok:
return result.json()
else:
# TODO: better exception handling
raise Exception(result.status_code, result.text)
def update_item(self, item):
wrapped_item = {"databaseKey": self.database_key,
"payload": item}
try:
result = requests.post(f"{self.base_url}/update_item",
json=wrapped_item)
if result.status_code != 200:
print(result, result.content)
return False
else:
uid = int(result.json())
node.uid = uid
ItemBase.add_to_db(node)
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format [{"_type": "friend", "_source": 1, "_target": 2}]"""
edges_data = []
for e in edges:
src, target = e.source.uid, e.target.uid
data = {"_source": src, "_target": target, "_type": e._type}
if e.label is not None: data[LABEL] = e.label
if e.sequence is not None: data[SEQUENCE] = e.sequence
if e.reverse:
data2 = copy(data)
data2["_source"] = target
data2["_target"] = src
data2["_type"] = "~" + data2["_type"]
edges_data.append(data2)
edges_data.append(data)
def delete_item(self, item):
NotImplementedError()
edges_data = {"databaseKey": self.database_key, "payload": {
"createItems": [], "updateItems": [], "createEdges": edges_data}}
def bulk_action(self, create_items, update_items, delete_items, create_edges, delete_edges):
edges_data = {"databaseKey": self.database_key,
"payload": {
"createItems": create_items,
"updateItems": update_items,
"deleteItems": delete_items,
"createEdges": create_edges,
"deleteEdges": delete_edges
}}
try:
result = requests.post(f"{self.base_url}/bulk_action",
json=edges_data)
json=edges_data,
verify=False)
if result.status_code != 200:
if "UNIQUE constraint failed" in str(result.content):
print(result.status_code, "Edge already exists")
......@@ -85,25 +107,30 @@ class PodClient:
print(e)
return False
def create_edge(self, edge):
return self.create_edges([edge])
def search_by_fields(self, fields_data):
body = {"databaseKey": self.database_key,
"payload": fields_data}
try:
result = requests.post(f"{self.base_url}/search_by_fields",
json=body)
json = result.json()
return [self.item_from_json(item) for item in json]
except requests.exceptions.RequestException as e:
return None
def get(self, uid, expanded=True):
if not expanded:
return self._get_item_with_properties(uid)
else:
return self._get_item_expanded(uid)
def get_items_with_edges(self, item_uids):
body = {"databaseKey": self.database_key,
"payload": item_uids,}
def _get_item_expanded(self, uid):
body = {"payload": [uid],
"databaseKey": self.database_key}
try:
result = requests.post(f"{self.base_url}/get_items_with_edges",
json=body)
json=body,
verify=False)
if result.status_code != 200:
print(result, result.content)
return None
else:
print(result.json())
json = result.json()[0]
res = self.item_from_json(json)
return res
......@@ -112,55 +139,81 @@ class PodClient:
print(e)
return None
def _get_item_with_properties(uid):
try:
result = requests.get(f"{self.base_url}/items/{uid}")
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
if json == []:
return None
else:
return json
except requests.exceptions.RequestException as e:
print(e)
return None
def upload_file(self, file, hash):
result = requests.post(f'{self.base_url}/upload_file/{self.database_key}/{hash}',
file,
verify=False)
print(result.status_code)
def get_properties_json(self, node):
res = dict()
for k,v in node.__dict__.items():
if k[:1] != '_' and not (isinstance(v, list) and len(v)>0 and isinstance(v[0], Edge)) and v is not None:
res[k] = v
res["_type"] = node.__class__.__name__
return res
if result.ok:
return True
else:
if result.status_code == 409:
return False
raise Exception(result.status_code, result.text)
def update_item(self, node):
data = self.get_properties_json(node)
uid = data["uid"]
body = {"payload": data,
"databaseKey": self.database_key}
def get_file(self, hash):
wrapped_item = {
'databaseKey': self.database_key,
'payload': {
'sha256': hash,
}
}
try:
result = requests.post(f"{self.base_url}/update_item",
json=body)
if result.status_code != 200:
print(result, result.content)
except requests.exceptions.RequestException as e:
print(e)
result = requests.post(f'{self.base_url}/get_file',
json=wrapped_item,
verify=False)
def search_by_fields(self, fields_data):
if result.ok:
return result.content
else:
raise Exception(result.status_code, result.text)
body = {"payload": fields_data,
"databaseKey": self.database_key}
# END OF PRIMITIVE FUNCTIONS
def test_connection(self, verbose=True):
# TODO: change this function, remove get-request
try:
result = requests.post(f"{self.base_url}/search_by_fields",
json=body)
json = result.json()
return [self.item_from_json(item) for item in json]
res = requests.get(self.url, verify=False)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
return None
print("Could no connect to backend")
return False
def create(self, node):
self.create_item(node.to_dict())
# todo: ItemBase.add_to_db(node) if succesful
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format [{"_type": "friend", "_source": 1, "_target": 2}]"""
edges_data = []
for e in edges:
src, target = e.source.uid, e.target.uid
data = {"_source": src, "_target": target, "_type": e._type}
if e.label is not None: data[LABEL] = e.label
if e.sequence is not None: data[SEQUENCE] = e.sequence
edges_data.append(data)
if e.reverse:
data2 = copy(data)
data2["_source"] = target
data2["_target"] = src
data2["_type"] = "~" + data2["_type"]
edges_data.append(data2)
self.bulk_action([], [], [], edges_data, [])
def create_edge(self, edge):
return self.create_edges([edge])
def get(self, uid, expanded=True):
if not expanded:
return self.get_item(uid)
else:
return self.get_items_with_edges([uid])
def item_from_json(self, json):
indexer_class = json.get("indexerClass", None)
......@@ -196,4 +249,20 @@ class PodClient:
else:
print("Starting importer")
except requests.exceptions.RequestException as e:
print("Error with calling importer {e}")
\ No newline at end of file
print("Error with calling importer {e}")
def upload_and_create_file(self, file):
hash = hashlib.sha256(file).hexdigest()
# TODO: check if creation was succesful
file_item = {
'sha256': hash
}
self.create_item(file_item, item_type='File')
# file_item = Item()
# file_item.sha256 = hash
# self.create(file_item)
self.upload_file(file, hash)
return file_item
\ No newline at end of file
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp itembase
```
%% Cell type:markdown id: tags:
# Itembase
%% Cell type:code id: tags:
``` python
# export
# hide
from integrators.imports import *
ALL_EDGES = "allEdges"
SOURCE, TARGET, TYPE, EDGE_TYPE, LABEL, SEQUENCE = "_source", "_target", "_type", "_type", "label", "sequence"
UID_GEN= 10000 + (random.randint(0, 1e3) * 1000)
```
%% Cell type:code id: tags:
``` python
# export
# hide
class DB():
def __init__(self):
self.nodes = dict()
def add(self, node):
uid = node.uid
if uid in self.nodes:
print(f"Error trying to add node, but node with with UID: {uid} is already in database")
self.nodes[uid] = node
def get(self, uid):
res = self.nodes.get(uid, None)
return res
def contains(node):
uid = node.get_property("uid")
return uid in self.nodes
def create(self, node):
existing = self.get(node.properties.get("uid", None))
if existing is not None:
if not existing._expanded:
existing.edges = node.edges
existing._expanded = node.edges is not None
return existing
else:
self.add(node)
return node
def parse_base_item_json(json):
uid = json.get("uid", None)
dateAccessed = json.get("dateAccessed", None)
dateCreated = json.get("dateCreated", None)
dateModified = json.get("dateModified", None)
deleted = json.get("deleted", None)
externalId = json.get("externalId", None)
itemDescription = json.get("itemDescription", None)
starred = json.get("starred", None)
version = json.get("version", None)
return uid, dateAccessed, dateCreated, dateModified, deleted, externalId, itemDescription, starred, version, None, None
```
%% Cell type:code id: tags:
``` python
# export
class Edge():
def __init__(self, source, target, _type, label=None, sequence=None, created=False, reverse=True):
self.source = source
self.target = target
self._type = _type
self.label = label
self.sequence = sequence
self.created = created
self.reverse = reverse
@classmethod
def from_json(cls, json):
from .schema import get_constructor
# we only set the target here
_type = json[EDGE_TYPE]
json_target = json[TARGET]
target_type = json_target["_type"]
indexer_class = json_target.get("indexerClass", None)
target_constructor = get_constructor(target_type, indexer_class)
target = target_constructor.from_json(json_target)
return cls(source=None, target=target, _type=_type)
def __repr__(self):
return f"{self.source} --{self._type}-> {self.target}"
def update(self, api):
if self.created:
api.create_edges([self])
def __eq__(self, other):
return self.source is other.source and self.target is other.target \
and self._type == other._type
def traverse(self, start):
if start == self.source:
return self.target
elif start == self.target:
return self.source
else:
raise ValueError
```
%% Cell type:code id: tags:
``` python
# export
class ItemBase():
global_db = DB()
def __init__(self, uid=None):
self.uid=uid
self.add_to_db(self)
@classmethod
def add_to_db(cls, node):
existing = cls.global_db.get(node.uid)
if existing is None and node.uid is not None:
cls.global_db.add(node)
def replace_self(self, other):
self.__dict__.update(other.__dict__)
def __getattribute__(self, name):
val = object.__getattribute__(self, name)
if isinstance(val, Edge):
edge = val
return edge.traverse(start=self)
if isinstance(val, list) and len(val) > 0 and isinstance(val[0], Edge):
edges = val
return [edge.traverse(start=self) for edge in edges]
else:
return val
def add_edge(self, name, val):
val = Edge(self, val, name, created=True)
if name not in self.__dict__:
raise NameError(f"object {self} does not have edge with name {name}")
existing = object.__getattribute__(self, name)
res = existing + [val]
self.__setattr__(name, res)
def is_expanded(self):
if "_expanded" in self.__dict__:
return self._expanded
else:
return False
# def __setattr__(self, name, val):
# if isinstance(val, ItemBase) or self.attr_is_edge(val):
# raise ValueError("Don't set edges directly, use node.add_edge instead")
# super().__setattr__(name, val)
def get_edges(self, name):
return object.__getattribute__(self, name)
def get_all_edges(self):
return [e for attr in self.__dict__.values() if self.attr_is_edge(attr) for e in attr]
def get_all_edge_names(self):
return [k for k,v in self.__dict__.items() if self.attr_is_edge(v)]
@staticmethod
def attr_is_edge(attr):
return isinstance(attr, list) and len(attr)>0 and isinstance(attr[0], Edge)
def update(self, api, edges=True, create_if_not_exists=True, skip_nodes=False):
if not self.exists(api):
print(f"creating {self}")
api.create(self)
else:
print(f"updating {self}")
api.put(self)
if edges:
for e in self.get_all_edges():
e.update(api)
def to_dict(self):
res = dict()
for k,v in self.__dict__.items():
if k[:1] != '_' and not (isinstance(v, list) and len(v)>0 and isinstance(v[0], Edge)) and v is not None:
res[k] = v
res["_type"] = self.__class__.__name__
return res
def exists(self, api):
res = api.search_by_fields({"uid": self.uid})
return len(res) == 1
def expand(self, api):
self._expanded = True
res = api.get(self.uid, expanded=True)
for edge_name in res.get_all_edge_names():
edges = res.get_edges(edge_name)
for e in edges:
e.source = self
self.__setattr__(edge_name, edges)
# self.edges = res.edges
return self
def __repr__(self):
uid = self.uid
_type = self.__class__.__name__
return f"{_type} (#{uid})"
@classmethod
def from_data(cls, *args, **kwargs):
edges = dict()
new_kwargs = dict()
for k, v in kwargs.items():
if isinstance(v, ItemBase):
edge = Edge(None, v, k)
edges[k] = edge
new_kwargs[k] = edge
else:
new_kwargs[k] = v
res = cls(*args, **new_kwargs)
for v in edges.values():
v.source = res
return res
```
%% Cell type:markdown id: tags:
With the Item and Edge classes we can create an item and its surrounding graph. The schema is defined in schema.py, in general we want to use the from_data staticmethod to generate new items, because it ensures that edges are linked from both the source and the target object. Let's make an email item and create it in the pod.
%% Cell type:code id: tags:
``` python
# hide
from integrators.schema import *
```
%% Cell type:code id: tags:
``` python
item = EmailMessage.from_data(content="example content field")
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted pod.client.ipynb.
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment