Commit c70feae5 authored by Koen van der Veen's avatar Koen van der Veen Committed by Alp Deniz Ogut
Browse files

rewrite cli's

parent 0fa6b693
Showing with 461 additions and 278 deletions
+461 -278
......@@ -28,5 +28,5 @@ COPY ./nbs ./nbs
RUN pip3 install --editable .
# CMD ["python3", "tools/run_integrator.py"]
CMD ["run_plugin"]
CMD ["run_plugin", "--read_args_from_env", "True"]
{
"containerImage": "pymemri",
"pluginModule": "pymemri.plugin.pluginbase",
"plugin": "pymemri.plugin.pluginbase.MyPlugin",
"pluginName": "MyPlugin",
"state": "not started",
"account": {
"service": "facebook",
"identifier": "myusername",
"secret": "mypassword"
},
"settings": {
"example_setting": "example_value"
}
}
\ No newline at end of file
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin.pluginbase
```
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.schema import *
from pymemri.pod.client import PodClient, DEFAULT_POD_ADDRESS
from pymemri.pod.client import *
from pymemri.imports import *
from pymemri.pod.utils import *
from os import environ
from abc import ABCMeta
import abc
import json
import importlib
import string
import time
from enum import Enum
from pymemri.pod.client import PodClient
from fastscript import *
import os
from pymemri.plugin.schema import PluginRun
from pymemri.data.basic import *
```
%% Cell type:code id: tags:
``` python
# hide
from nbdev.showdoc import *
```
%% Cell type:markdown id: tags:
# Plugins
%% Cell type:markdown id: tags:
PluginBase is the plugin class that the simplest plugin inherits.
Inheriting class should implement:
- run() that implements the logic of the plugin
- add_to_schema() for plugin specific item types
%% Cell type:code id: tags:
``` python
# export
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
POD_TARGET_ITEM_ENV = 'POD_TARGET_ITEM'
POD_OWNER_KEY_ENV = 'POD_OWNER'
POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
```
%% Cell type:code id: tags:
``` python
# export
# hide
class PluginBase(Item, metaclass=ABCMeta):
"""Base class for plugins"""
properties = Item.properties + ["name", "repository", "icon", "query", "bundleImage",
"runDestination", "pluginClass"]
edges = Item.edges
def __init__(self, client=None, run_id=None, name=None, repository=None, icon=None,
query=None, bundleImage=None, runDestination=None, pluginClass=None, **kwargs):
if pluginClass is None: pluginClass=self.__class__.__name__
super().__init__(**kwargs)
self.client = client
self.run_id = run_id
self.name = name
self.repository = repository
self.icon = icon
self.query = query
self.bundleImage = bundleImage
self.runDestination = runDestination
self.pluginClass = pluginClass
def get_run(self):
return self.client.get(self.run_id, expanded=False)
def get_state(self):
return self.get_run().state
def get_account(self):
run = self.get_run()
return run.account[0] if len(run.account) > 0 else None
def get_settings(self):
run = self.get_run(self.client)
return json.loads(run.settings)
def set_vars(self, vars):
run = self.get_run()
for k,v in vars.items():
if hasattr(run, k):
setattr(run, k, v)
run.update(self.client)
def set_state(self, state, message=None):
self.set_vars({'state': state, 'message': message})
def set_account(self, account):
existing = self.get_account()
if existing:
account.id = existing.id
account.update(self.client)
else:
run = self.get_run()
run.add_edge('account', account)
run.update(self.client)
def set_settings(self, settings):
self.set_vars({'settings': json.dumps(settings)})
@abc.abstractmethod
def run(self):
raise NotImplementedError()
@abc.abstractmethod
def add_to_schema(self):
raise NotImplementedError()
```
%% Cell type:markdown id: tags:
## Creating a plugin
%% Cell type:markdown id: tags:
The memri [pod](https://gitlab.memri.io/memri/pod) uses a plugin system to add features to the backend memri backend. Plugins can import your data (importers), change your data (indexers), or call other serivces. Users can define their own plugins to add new behaviour to their memri app. Let's use the following plugin as an example of how we can start plugins.
%% Cell type:code id: tags:
``` python
# export
# hide
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties + ["containerImage"]
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self):
print("running")
self.client.create(MyItem("some person", 20))
def add_to_schema(self):
self.client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:code id: tags:
``` python
# hide
MyPlugin()
```
%% Output
MyPlugin (#None)
%% Cell type:markdown id: tags:
```python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self):
print("running")
self.client.create(MyItem("some person", 20))
def add_to_schema(self):
self.client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:markdown id: tags:
Memri plugins need to define at least 2 methods: `.run()` and `.add_to_schema()`. `.run()` defines the logic of the plugin. `.add_to_schema()` defines the schema for the plugin in the pod. Note that currently, `add_to_schema` requires all item to **have all properties defined that are used in the plugin**. In the future, we might replace add_to_schema, to be done automatically, based on a declarative schema defined in the plugin.
%% Cell type:markdown id: tags:
## Helper classes -
%% Cell type:code id: tags:
``` python
# hide
# export
def get_plugin_cls(plugin_module, plugin_name):
try:
module = importlib.import_module(plugin_module)
plugin_cls = getattr(module, plugin_name)
return plugin_cls
except (ImportError, AttributeError):
raise ImportError(f"Unknown plugin: {plugin_module}.{plugin_name}")
def run_plugin_from_run_id(run_id, client):
"""
Args:
run_id (int): id of the PluginRun
client (PodClient): client containing PluginRun
return_plugin (bool): Returns created plugin instance for testing purposes.
"""
run = client.get(run_id)
plugin_cls = get_plugin_cls(run.pluginModule, run.pluginName)
plugin = plugin_cls(client=client, run_id=run_id)
plugin.add_to_schema()
# TODO handle plugin status before run
plugin.run()
return plugin
```
%% Cell type:code id: tags:
``` python
# export
# hide
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", message="", targetItemId=""))
assert client.add_to_schema(Account(service="", identifier="", secret=""))
except Exception as e:
raise ValueError("Could not add base schema")
```
%% Cell type:markdown id: tags:
### Test Plugin Runtime
%% Cell type:code id: tags:
``` python
# hide
from pymemri.pod.client import PodClient
from pymemri.data.schema import Account
from pymemri.plugin.pluginbase import PluginRun, register_base_schemas, run_plugin_from_run_id
client = PodClient()
register_base_schemas(client)
# Create a dummy account to use for authentication within the plugin
account = Account(service="my_plugin_service", identifier="username", secret="password")
account.update(client)
# Create a run to enable plugin runtime
run = PluginRun("pymemri", "pymemri.plugin.pluginbase", "MyPlugin", account=[account])
run.update(client)
plugin = run_plugin_from_run_id(run.id, client)
# check if account is accessible
assert plugin.get_account().identifier == "username"
# set a state
plugin.set_state("test_state", message="test_message")
assert plugin.get_state() == "test_state"
```
%% Cell type:markdown id: tags:
## Run from id test -
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# hide
# skip
client = PodClient()
run = PluginRun("pymemri", "pymemri.plugin.pluginbase", "MyPlugin", state="not started")
account = Account(identifier="login", secret="password")
run.add_edge("account", account)
assert client.add_to_schema(PluginRun("", "", "", "", ""))
assert client.create(run)
assert client.create(account)
assert client.create_edge(run.get_edges("account")[0])
run = client.get(run.id)
run_plugin_from_run_id(run.id, client)
```
%% Output
running
MyPlugin (#None)
%% Cell type:code id: tags:
``` python
# export
def _run_plugin(client, plugin_run_id=None, verbose=False):
"""Runs an plugin, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with plugins)."""
register_base_schemas(client)
run_plugin_from_run_id(plugin_run_id, client)
```
%% Cell type:markdown id: tags:
## _run_plugin test -
%% Cell type:code id: tags:
``` python
# hide
# skip
_run_plugin(client=client, plugin_run_id=run.id)
run_plugin_from_run_id(run.id, client);
```
%% Output
running
logging in with account login and password password
%% Cell type:code id: tags:
``` python
# export
# hide
def _parse_env():
env = os.environ
print("Reading `run_plugin()` parameters from environment variables")
try:
pod_full_address = env.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
plugin_run_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
print(plugin_run_json)
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
# database_key = pod_service_payload[DATABASE_KEY_ENV]
# owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
```
%% Cell type:markdown id: tags:
## Running your plugin using the CLI
%% Cell type:markdown id: tags:
Plugins can be started using the pymemri `run_plugin` or `simulate_run_plugin_from_frontend` CLI. With `run_plugin` the plugin is invoked directly by spawning a new python process, while `simulate_run_plugin_from_frontend` requests the pod to spawn a new process, docker container, or kubernetes container, which in calls `run_plugin` (for more info see `simulate_run_plugin_from_frontend`. When using `run_plugin`, you can either pass your run arguments as parameters, or set them as environment variables. If both are set, the CLI will use the passed arguments.
%% Cell type:code id: tags:
``` python
# export
PYMEMRI_FOLDER = ".pymemri"
POD_KEYS_FOLDER = "pod_keys"
POD_KEYS_FILENAME = "keys.json"
POD_KEYS_FULL_FOLDER = Path.home() / ".pymemri" / POD_KEYS_FOLDER
DEFAULT_POD_KEY_PATH = POD_KEYS_FULL_FOLDER / POD_KEYS_FILENAME
```
%% Cell type:code id: tags:
``` python
# export
# hide
@call_parse
def store_keys(path:Param("path to store the keys", str)=DEFAULT_POD_KEY_PATH,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None):
if database_key is None: database_key = PodClient.generate_random_key()
if owner_key is None: owner_key = PodClient.generate_random_key()
obj = {"database_key": database_key,
"owner_key": owner_key}
Path(path).parent.mkdir(parents=True, exist_ok=True)
if path.exists():
timestr = time.strftime("%Y%m%d-%H%M%S")
path.rename(POD_KEYS_FULL_FOLDER / f"keys-{timestr}.json")
write_json(obj, path)
```
%% Cell type:code id: tags:
``` python
# hide
store_keys()
```
%% Cell type:code id: tags:
``` python
# export
# hide
def read_pod_key(key_type, file=DEFAULT_POD_KEY_PATH):
try:
json = read_json(file)
except:
raise ValueError(f"Trying to read key from {file}, but file or key does not exist") from None
try:
key = json[key_type]
print(f"reading {key_type} from {file}")
return key
except:
raise ValueError(f"{key_type} not specified in {file}") from None
def parse_config(file, remove_container=False):
json_dict = read_json(file)
account = Account.from_json(json_dict["account"])
del json_dict["account"]
settings = json.dumps(json_dict["settings"])
del json_dict["settings"]
run = PluginRun.from_json(json_dict)
run.settings = settings
run.add_edge("account", account)
if remove_container:
run.containerImage = "none"
return run
def create_run_expanded(client, run):
client.create(run)
accounts = run.account
if accounts:
account=accounts[0]
client.create(account)
client.create_edge(run.get_edges("account")[0])
```
%% Cell type:code id: tags:
``` python
# export
@call_parse
def run_plugin(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_ADDRESS,
plugin_run_id:Param("Run id of the plugin to be executed", str)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
read_args_from_env:Param("Owner key of the pod", bool)=False):
read_args_from_env:Param("Read the args from the environment", bool)=False,
config_file:Param("config file for the PluginRun", str)=None):
if read_args_from_env:
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env(env)
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env()
database_key=None
else:
if database_key is None: database_key = read_pod_key("database_key")
if owner_key is None: owner_key = read_pod_key("owner_key")
pod_auth_json = None
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
auth_json=pod_auth_json)
if config_file is not None:
run = parse_config(config_file, remove_container=True)
create_run_expanded(client, run)
plugin_run_id=run.id
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
_run_plugin(client=client, plugin_run_id=plugin_run_id)
run_plugin_from_run_id(run_id=plugin_run_id, client=client)
```
%% Cell type:markdown id: tags:
To start a plugin on your local machine, you can use the CLI. This will create a client for you, and run the code defined in `<myplugin>.run()`
%% Cell type:code id: tags:
``` python
client = PodClient(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"))
run = PluginRun(containerImage="no_container", pluginModule="pymemri.plugin.pluginbase",
plugin="pymemri.plugin.pluginbase.MyPlugin",
pluginName="MyPlugin", state="not started")
assert client.add_to_schema(PluginRun("", "", "", "", "")) and client.create(run)
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
%% Cell type:code id: tags:
``` python
run_plugin(plugin_run_id=run.id)
run_plugin(config_file="../example_config.json")
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
pod_full_address=http://localhost:3030
owner_key=3789141683232392970480152516863298047903446543357097566923540000
owner_key=6455899399524782969362422414184641420500300004234408639997045006
running
logging in with account myusername and password mypassword
%% Cell type:markdown id: tags:
> Note: The data that is created here should be in the pod in order for this to work
%% Cell type:markdown id: tags:
## Run from pod
%% Cell type:markdown id: tags:
In production, we start plugins by making an API call to the pod, which in turn creates an environment for the plugin and starts it using docker containers, kubernetes containers or a shell script. We can start this process using the `simulate_run_plugin_from_frontend` CLI. **Note that when using docker, provided container name should be "installed" within the Pod environemnt (e.g. `docker build -t pymemri .` for this repo) in order to start it.**
%% Cell type:markdown id: tags:
![running a plugin](images/running_a_plugin.svg)
%% Cell type:code id: tags:
``` python
# export
from fastcore.script import call_parse, Param
import os
@call_parse
def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_ADDRESS,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
container:Param("Pod container to run frod", str)=None,
plugin_path:Param("Plugin path", str)=None,
account_id:Param("Account id to be used inside the plugin", str)=None,
settings_file:Param("Plugin settings (json)", str)=None):
# TODO remove container, plugin_module, plugin_name and move to Plugin item.
# Open question: This presumes Plugin item is already in pod before simulate_run_plugin_from_frontend is called.
if database_key is None: database_key = read_pod_key("database_key")
if owner_key is None: owner_key = read_pod_key("owner_key")
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
params = [pod_full_address, database_key, owner_key, container, plugin_module, plugin_name]
params = [pod_full_address, database_key, owner_key]
if (None in params):
raise ValueError(f"Defined some params to run indexer, but not all")
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
for name, val in [("pod_full_address", pod_full_address), ("owner_key", owner_key)]:
print(f"{name}={val}")
if settings_file is not None:
with open(settings_file, 'r') as f:
settings = f.read()
else:
settings = None
if account_id is None:
account = None
else:
account = client.get(account_id)
print(f"Using {account}")
register_base_schemas(client)
run = PluginRun(container, plugin_module, plugin_name, account=[account])
else:
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
run = PluginRun(container, plugin_module, plugin_name)
if plugin_id is not None:
persistent_state = client.get(plugin_id)
run.add_edge("persistentState", persistent_state)
client.create_edge(run.get_edges("persistentState")[0])
client.create(run)
print(f"\ncalling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {run.id}.")
print(f"*Check the pod log/console for debug output.*")
client.create(run)
print(f"Created PluginRun: {run.id}")
return run
```
%% Cell type:code id: tags:
``` python
client = PodClient()
```
%% Cell type:code id: tags:
``` python
!simulate_run_plugin_from_frontend --plugin_path="pymemri.plugin.pluginbase.MyPlugin"
!simulate_run_plugin_from_frontend --config_file="../example_config.json"
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
Inferred 'pymemri' as plugin container name
pod_full_address=http://localhost:3030
owner_key=3789141683232392970480152516863298047903446543357097566923540000
owner_key=6455899399524782969362422414184641420500300004234408639997045006
calling the `create` api on http://localhost:3030 to make your Pod start a plugin with id 06f965E82dee89cDC15B3CdFc9e1c1BA.
calling the `create` api on http://localhost:3030 to make your Pod start a plugin with id 62Dae38b0D800363FFdCa8b57687C0fa.
*Check the pod log/console for debug output.*
Created PluginRun: 06f965E82dee89cDC15B3CdFc9e1c1BA
%% Cell type:code id: tags:
``` python
# hide
# !simulate_run_plugin_from_frontend --plugin_path="pymemri.plugin.pluginbase.MyPlugin"
```
%% Cell type:markdown id: tags:
## Appendix -
%% Cell type:code id: tags:
``` python
# hide
# client.start_plugin("pymemri", run.id)
```
%% Cell type:code id: tags:
``` python
# hide
# class StartPlugin(Item):
# properties = Item.properties + ["container", "targetItemId"]
# edges = Item.edges
# def __init__(self, container=None, targetItemId=None, **kwargs):
# super().__init__(**kwargs)
# self.container = container
# self.targetItemId = targetItemId
```
%% Cell type:code id: tags:
``` python
# hide
# class PluginSettings(Item):
# def __init__(self, settings_dict):
# self.settings_dict=settings_dict
# def __getattr__(self, k):
# if k in self.settings_dict:
# return self.settings_dict[k]
# @classmethod
# def from_string(cls, s):
# objs = json.loads(s)
# cls(objs)
```
%% Cell type:code id: tags:
``` python
# hide
# # export
# def generate_test_env(client, indexer_run):
# payload = json.dumps({DATABASE_KEY_ENV: client.database_key, OWNER_KEY_ENV: client.owner_key})
# return {POD_FULL_ADDRESS_ENV: DEFAULT_POD_ADDRESS,
# POD_TARGET_ITEM: indexer_run.id,
# POD_SERVICE_PAYLOAD_ENV: payload}
```
%% Cell type:code id: tags:
``` python
# hide
# run_plugin(env=generate_test_env(client, run))
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted pod.client.ipynb.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id:48d3c421 tags:
``` python
# default_exp plugin.schema
```
%% Cell type:code id:54a8301b tags:
``` python
# export
# hide
import random, string
from pymemri.data.itembase import Item
```
%% Cell type:code id:ec7eeeb1 tags:
``` python
# export
# hide
class PluginRun(Item):
properties = Item.properties + ["containerImage", "pluginModule", "pluginName", "state", "targetItemId",
"oAuthUrl", "message", "config"]
edges = Item.edges + ["account"]
def __init__(self, containerImage, pluginModule, pluginName, account=None, state=None, settings=None, targetItemId=None, oAuthUrl=None, message=None,
**kwargs):
"""
PluginRun defines a the run of plugin `plugin_module.plugin_name`,
with an optional `config` string.
with an optional `settings` string.
Args:
plugin_module (str): module of the plugin.
plugin_name (str): class name of the plugin.
config (str, optional): Optional plugin configuration. For example,
settings (str, optional): Optional plugin configuration. For example,
this could be a `json.dumps` of a configuration dict. Defaults to None.
"""
super().__init__(**kwargs)
self.pluginModule = pluginModule
self.pluginName = pluginName
self.containerImage = containerImage
id_ = ''.join([random.choice(string.hexdigits) for i in range(32)]) if targetItemId is None else targetItemId
self.targetItemId=id_
self.id=id_
self.state = state # for stateful plugins
self.settings = settings
self.oAuthUrl = oAuthUrl # for authenticated plugins
self.message = message # universa
self.account = account if account is not None else []
```
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.data.photo import Photo
from pymemri.imports import *
from hashlib import sha256
from pymemri.pod.db import DB
from pymemri.pod.utils import *
from pymemri.plugin.schema import *
```
%% Cell type:code id: tags:
``` python
# export
DEFAULT_POD_ADDRESS = "http://localhost:3030"
POD_VERSION = "v4"
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None, verbose=False):
auth_json=None, verbose=False, register_base_schema=True):
self.verbose = verbose
self.url = url
self.version = POD_VERSION
self.test_connection(verbose=self.verbose)
self.database_key=database_key if database_key is not None else self.generate_random_key()
self.owner_key=owner_key if owner_key is not None else self.generate_random_key()
self.base_url = f"{url}/{version}/{self.owner_key}"
self.auth_json = {"type":"ClientAuth","databaseKey":self.database_key} if auth_json is None \
else {**{"type": "PluginAuth"}, **auth_json}
self.local_db = DB()
self.registered_classes=dict()
self.register_base_schemas()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"), **kwargs)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", error="", targetItemId="",
settings=""))
assert client.add_to_schema(PersistentState(pluginName="", state="", account=""))
assert client.add_to_schema(CVUStoredDefinition(name="", definition=""))
assert client.add_to_schema(Account(service="", identifier="", secret="", code="",
refreshToken="", errorMessage=""))
except Exception as e:
raise ValueError("Could not add base schema")
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
self.local_db.add(node)
def test_connection(self, verbose=True):
try:
res = requests.get(self.url)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
print("Could no connect to backend")
return False
def create(self, node):
if isinstance(node, Photo) and not self.create_photo_file(node): return False
try:
properties = self.get_properties_json(node)
properties = {k:v for k, v in properties.items() if v != []}
body = {"auth": self.auth_json, "payload":properties}
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
self.add_to_db(node)
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def add_to_schema(self, node):
self.registered_classes[node.__class__.__name__] = type(node)
attributes = self.get_properties_json(node)
for k, v in attributes.items():
if not isinstance(v, list) and k != "type":
if isinstance(v, str):
value_type = "Text"
elif isinstance(v, int):
value_type = "Integer"
payload = {"type": "ItemPropertySchema", "itemType": attributes["type"],
"propertyName": k, "valueType": value_type}
body = {"auth": self.auth_json, "payload": payload }
try:
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
self.add_to_db(node)
except requests.exceptions.RequestException as e:
print(e)
return False
return True
def create_photo_file(self, photo):
file = photo.file[0]
self.create(file)
return self._upload_image(photo.data)
def _upload_image(self, arr):
return self.upload_file(arr.tobytes())
def upload_file(self, file):
# TODO: currently this only works for numpy images
try:
sha = sha256(file).hexdigest()
result = requests.post(f"{self.base_url}/upload_file/{self.database_key}/{sha}", data=file)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def get_file(self, sha):
# TODO: currently this only works for numpy images
try:
body= {"auth": self.auth_json,
"payload": {"sha256": sha}}
result = requests.post(f"{self.base_url}/get_file", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
return result.content
except requests.exceptions.RequestException as e:
print(e)
return None
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(f"Could not load data of {photo} attached file item does not have data in pod")
return
data = np.frombuffer(file, dtype=np.uint8)
c = photo.channels
shape = (photo.height,photo.width, c) if c is not None and c > 1 else (photo.height, photo.width)
data = data.reshape(shape)
if size is not None: data = resize(data, size)
photo.data = data
return
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None: return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format [{"_type": "friend", "_source": 1, "_target": 2}]"""
create_edges = []
for e in edges:
src, target = e.source.id, e.target.id
if src is None or target is None:
print(f"Could not create edge {e} missing source or target id")
return False
data = {"_source": src, "_target": target, "_name": e._type}
if e.label is not None: data[LABEL] = e.label
if e.sequence is not None: data[SEQUENCE] = e.sequence
if e.reverse:
data2 = copy(data)
data2["_source"] = target
data2["_target"] = src
data2["_name"] = "~" + data2["_name"]
create_edges.append(data2)
create_edges.append(data)
return self.bulk_action(create_items=[], update_items=[],create_edges=create_edges)
def delete_items(self, items):
ids = [i.id for i in items]
return self.bulk_action(delete_items=ids)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
def bulk_action(self, create_items=None, update_items=None, create_edges=None, delete_items=None):
create_items = create_items if create_items is not None else []
update_items = update_items if update_items is not None else []
create_edges = create_edges if create_edges is not None else []
delete_items = delete_items if delete_items is not None else []
edges_data = {"auth": self.auth_json, "payload": {
"createItems": create_items, "updateItems": update_items,
"createEdges": create_edges, "deleteItems": delete_items}}
try:
result = requests.post(f"{self.base_url}/bulk",
json=edges_data)
if result.status_code != 200:
if "UNIQUE constraint failed" in str(result.content):
print(result.status_code, "Edge already exists")
else:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def create_edge(self, edge):
payload = {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
body = {"auth": self.auth_json,
"payload": payload}
try:
result = requests.post(f"{self.base_url}/create_edge", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
return self.create_edges([edge])
def get(self, id, expanded=True):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
return None
elif res.deleted == True:
print(f"Item with id {id} does not exist anymore")
return None
else:
return res
def get_all_items(self):
raise NotImplementedError()
try:
body = { "databaseKey": self.database_key, "payload":None}
result = requests.post(f"{self.base_url}/get_all_items", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
res = [self.item_from_json(x) for x in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
print(e)
return None
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id):
item = self.get(id, expanded=False)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
def get_edges(self, id):
body = {"payload": {"item": str(id),
"direction": "Outgoing",
"expandItems": True},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/get_edges", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
for d in json:
d["item"] = self.item_from_json(d["item"])
return json
except requests.exceptions.RequestException as e:
print(e)
return None
def _get_item_with_properties(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
if json == []:
return None
else:
res = self.item_from_json(json[0])
return res
except requests.exceptions.RequestException as e:
print(e)
return None
def get_properties_json(self, node, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(node, "private", [])
for k, v in node.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type(node)
return res
@staticmethod
def _get_schema_type(node):
for cls in node.__class__.mro():
if cls.__name__ != "ItemBase":
return cls.__name__
def update_item(self, node):
data = self.get_properties_json(node, dates=False)
if "type" in data:
del data["type"]
if "deleted" in data:
del data["deleted"]
id = data["id"]
body = {"payload": data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/update_item",
json=body)
if result.status_code != 200:
print(result, result.content)
except requests.exceptions.RequestException as e:
print(e)
def exists(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
json = result.json()
if isinstance(json, list) and len(json) > 0:
return True
except requests.exceptions.RequestException as e:
print(e)
return None
def search(self, fields_data):
body = {"payload": fields_data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/search", json=body)
json = result.json()
res = [self.item_from_json(item) for item in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
return None
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return self.search(query)[0]
def item_from_json(self, json):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(json["type"], plugin_class, plugin_package=plugin_package,
extra=self.registered_classes)
new_item = constructor.from_json(json)
existing = self.local_db.get(new_item.id)
# TODO: cleanup
if existing is not None:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
return existing
else:
return new_item
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties: del properties[ALL_EDGES]
return properties
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys. *Note that this will create a new database for your every time you create a PodClient, if you want to access the same database with multiple PodClients, you have to set the same keys* When you are using the app, setting the keys in the pod, and passing them when calling an integrator is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
success = client.test_connection()
assert success
```
%% Output
Succesfully connected to pod
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. When Initializing an Item, always make sure to use the from_data classmethod to initialize.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
email_item
```
%% Output
EmailMessage (#None)
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(email_item)
assert succes
```
%% Cell type:markdown id: tags:
We can now create our item. As a side-effect, our item will be assigned an id.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
```
%% Output
True
%% Cell type:code id: tags:
``` python
email_item.id
```
%% Output
'01db116ef3b5d631dfe810d9d63c2746'
'e21cf01d4c65cd6504b0aa4a49898209'
%% Cell type:markdown id: tags:
We can easily define our own types, and use them in the pod.
%% Cell type:code id: tags:
``` python
class Dog(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
```
%% Cell type:code id: tags:
``` python
dog = Dog("max", 2)
client.add_to_schema(dog);
dog2 = Dog("bob", 3)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
dog_from_db = client.get(dog2.id, expanded=False)
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
assert item_succes and edge_succes
```
%% Cell type:code id: tags:
``` python
client.get_edges(email_item.id)
```
%% Output
[{'item': Person (#18b8646793e51ae434549c42b60d0e2a), 'name': 'sender'}]
[{'item': Person (#469eb1ab5cfabdbdff7f8c57859f7c84), 'name': 'sender'}]
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
```
%% Cell type:code id: tags:
``` python
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
assert client.create(person_item)
```
%% Cell type:code id: tags:
``` python
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db.lastName == "Awesome"
```
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
client.create(person_item2);
all_people = client.search({"type": "Person"})
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
all_people[:3]
```
%% Output
[Person (#18b8646793e51ae434549c42b60d0e2a),
Person (#4c0ed41801f3d4f942677d987675a86a),
Person (#364347a2b50365463cbc5fb03dfb1458)]
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2);
```
%% Cell type:code id: tags:
``` python
assert client.search_last_added(type="Person").firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files, the `PodClient` has a file api. The file api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
%% Cell type:code id: tags:
``` python
from pymemri.data.photo import *
```
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(x)
assert succes
assert succes2
```
%% Cell type:code id: tags:
``` python
data = client.get_file(file.sha256)
arr = np.frombuffer(data, dtype=np.uint8)
assert (arr.reshape(640,640) == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
For photos we do this automatically using `PodClient.create` on a Photo and `PodClient.get_photo`:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
```
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(IPhoto.from_np(x))
```
%% Cell type:code id: tags:
``` python
assert client.create(photo)
```
%% Cell type:code id: tags:
``` python
res = client.get_photo(photo.id, size=640)
```
%% Cell type:code id: tags:
``` python
res
```
%% Output
IPhoto (#415fc96bde3bb6e426ad2a792151e216)
%% Cell type:code id: tags:
``` python
assert (res.data == x).all()
```
%% Cell type:markdown id: tags:
# Check if an item exists -
%% Cell type:code id: tags:
``` python
# hide
# person_item = Person.from_data(firstName="Eve", externalId="gmail_1")
# person_item2 = Person.from_data(firstName="Eve2", externalId="gmail_1")
# client.create_if_external_id_not_exists(person_item)
# client.create_if_external_id_not_exists(person_item2)
# existing = client.search({"externalId": "gmail_1"})
# assert len(existing) == 1
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Resetting the db -
%% Cell type:code id: tags:
``` python
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted pod.client.ipynb.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id:f541251c tags:
``` python
# default_exp pod.utils
%load_ext autoreload
%autoreload 2
```
%% Output
The autoreload extension is already loaded. To reload it, use:
%reload_ext autoreload
%% Cell type:code id:f23b1d4a tags:
``` python
# export
from pathlib import Path
from pymemri.data.basic import *
```
%% Cell type:code id:d2d9948e tags:
``` python
# export
PYMEMRI_FOLDER = ".pymemri"
POD_KEYS_FOLDER = "pod_keys"
POD_KEYS_FILENAME = "keys.json"
POD_KEYS_FULL_FOLDER = Path.home() / ".pymemri" / POD_KEYS_FOLDER
DEFAULT_POD_KEY_PATH = POD_KEYS_FULL_FOLDER / POD_KEYS_FILENAME
```
%% Cell type:code id:75fed7bd tags:
``` python
# export
def read_pod_key(key_type, file=DEFAULT_POD_KEY_PATH):
try:
json = read_json(file)
except:
raise ValueError(f"Trying to read key from {file}, but file or key does not exist") from None
try:
key = json[key_type]
print(f"reading {key_type} from {file}")
return key
except:
raise ValueError(f"{key_type} not specified in {file}") from None
```
%% Cell type:code id:643bd34c tags:
``` python
```
......@@ -47,20 +47,22 @@ index = {"read_file": "basic.ipynb",
"get_plugin_cls": "plugin.pluginbase.ipynb",
"run_plugin_from_run_id": "plugin.pluginbase.ipynb",
"register_base_schemas": "plugin.pluginbase.ipynb",
"PYMEMRI_FOLDER": "plugin.pluginbase.ipynb",
"POD_KEYS_FOLDER": "plugin.pluginbase.ipynb",
"POD_KEYS_FILENAME": "plugin.pluginbase.ipynb",
"POD_KEYS_FULL_FOLDER": "plugin.pluginbase.ipynb",
"DEFAULT_POD_KEY_PATH": "plugin.pluginbase.ipynb",
"store_keys": "plugin.pluginbase.ipynb",
"read_pod_key": "plugin.pluginbase.ipynb",
"parse_config": "plugin.pluginbase.ipynb",
"create_run_expanded": "plugin.pluginbase.ipynb",
"run_plugin": "plugin.pluginbase.ipynb",
"simulate_run_plugin_from_frontend": "plugin.pluginbase.ipynb",
"PluginRun": "plugin.schema.ipynb",
"DEFAULT_POD_ADDRESS": "pod.client.ipynb",
"POD_VERSION": "pod.client.ipynb",
"PodClient": "pod.client.ipynb",
"DB": "pod.db.ipynb"}
"DB": "pod.db.ipynb",
"PYMEMRI_FOLDER": "pod.utils.ipynb",
"POD_KEYS_FOLDER": "pod.utils.ipynb",
"POD_KEYS_FILENAME": "pod.utils.ipynb",
"POD_KEYS_FULL_FOLDER": "pod.utils.ipynb",
"DEFAULT_POD_KEY_PATH": "pod.utils.ipynb",
"read_pod_key": "pod.utils.ipynb"}
modules = ["data/basic.py",
"data/photo.py",
......@@ -71,7 +73,8 @@ modules = ["data/basic.py",
"plugin/pluginbase.py",
"plugin/schema.py",
"pod/client.py",
"pod/db.py"]
"pod/db.py",
"pod/utils.py"]
doc_url = "http://memri.docs.memri.io/pymemri/"
......
import flask
from flask import render_template
from pymemri.pod.client import PodClient
from pymemri.plugin.pluginbase import POD_TARGET_ITEM_ENV
import os
import json
import traceback
from fastscript import call_parse, Param
app = flask.Flask(__name__, template_folder='template')
app.config["DEBUG"] = True
# def get_run_from_env(pod_client):
# try:
# env = os.environ
# run_item_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
# run_id = run_item_json["id"]
# run = pod_client.get(run_id)
# return run
# except Exception as e:
# traceback.print_exc()
# return None
@app.route('/qr')
def index():
# global qr_code_data
# qr_code_data =""
global pod_client
global _run_id
# run = get_run_from_env(pod_client)
try:
run = pod_client.get(_run_id)
account = run.account[0]
qr_code_data = account.code
except Exception as e:
traceback.print_exc()
qr_code_data = None
return render_template('images.html', chart_output=qr_code_data)
pod_client = PodClient.from_local_keys()
_run_id = None
@call_parse
def run_qr_simulator(run_id:Param("Run id, we attach qr code to run.account.code", str)=None):
assert run_id is not None
global _run_id
_run_id = run_id
app.run(host='0.0.0.0', port=8000)
# if __name__ == "__main__":
# run()
<html>
<meta http-equiv="refresh" content="2" />
<div>
<img src="{{chart_output|safe}}" />
</div>
</html>
\ No newline at end of file
......@@ -65,103 +65,103 @@ def get_constructor(_type, plugin_class=None, plugin_package=None, extra=None):
# An account or subscription, for instance for some online service, or a bank account or wallet.
class Account(Item):
def __init__(self, dateAccessed=None, dateCreated=None, dateModified=None, deleted=None,
externalId=None, itemDescription=None, starred=None, version=None, id=None, importJson=None,
handle=None, displayName=None, service=None, itemType=None, avatarUrl=None, changelog=None,
label=None, genericAttribute=None, measure=None, sharedWith=None, belongsTo=None, price=None,
location=None, organization=None, identifier=None, secret=None, code=None, accessToken=None,
refreshToken=None, errorMessage=None, contact=None):
super().__init__(dateAccessed=dateAccessed, dateCreated=dateCreated, dateModified=dateModified,
deleted=deleted, externalId=externalId, itemDescription=itemDescription, starred=starred,
version=version, id=id, importJson=importJson, changelog=changelog, label=label,
genericAttribute=genericAttribute, measure=measure, sharedWith=sharedWith)
self.handle = handle
self.displayName = displayName
self.service = service
self.itemType = itemType
self.avatarUrl = avatarUrl
self.identifier = identifier
self.secret = secret
self.code = code
self.accessToken = accessToken
self.refreshToken = refreshToken
self.errorMessage = errorMessage
self.contact = contact if contact is not None else []
self.belongsTo = belongsTo if belongsTo is not None else []
self.price = price if price is not None else []
self.location = location if location is not None else []
self.organization = organization if organization is not None else []
# class Account(Item):
# def __init__(self, dateAccessed=None, dateCreated=None, dateModified=None, deleted=None,
# externalId=None, itemDescription=None, starred=None, version=None, id=None, importJson=None,
# handle=None, displayName=None, service=None, itemType=None, avatarUrl=None, changelog=None,
# label=None, genericAttribute=None, measure=None, sharedWith=None, belongsTo=None, price=None,
# location=None, organization=None, identifier=None, secret=None, code=None, accessToken=None,
# refreshToken=None, errorMessage=None, contact=None):
# super().__init__(dateAccessed=dateAccessed, dateCreated=dateCreated, dateModified=dateModified,
# deleted=deleted, externalId=externalId, itemDescription=itemDescription, starred=starred,
# version=version, id=id, importJson=importJson, changelog=changelog, label=label,
# genericAttribute=genericAttribute, measure=measure, sharedWith=sharedWith)
# self.handle = handle
# self.displayName = displayName
# self.service = service
# self.itemType = itemType
# self.avatarUrl = avatarUrl
# self.identifier = identifier
# self.secret = secret
# self.code = code
# self.accessToken = accessToken
# self.refreshToken = refreshToken
# self.errorMessage = errorMessage
# self.contact = contact if contact is not None else []
# self.belongsTo = belongsTo if belongsTo is not None else []
# self.price = price if price is not None else []
# self.location = location if location is not None else []
# self.organization = organization if organization is not None else []
@classmethod
def from_json(cls, json):
all_edges = json.get("allEdges", None)
dateAccessed = json.get("dateAccessed", None)
dateCreated = json.get("dateCreated", None)
dateModified = json.get("dateModified", None)
deleted = json.get("deleted", None)
externalId = json.get("externalId", None)
itemDescription = json.get("itemDescription", None)
starred = json.get("starred", None)
version = json.get("version", None)
id = json.get("id", None)
importJson = json.get("importJson", None)
handle = json.get("handle", None)
displayName = json.get("displayName", None)
service = json.get("service", None)
itemType = json.get("itemType", None)
avatarUrl = json.get("avatarUrl", None)
identifier = json.get("identifier", None)
secret = json.get("secret", None)
code = json.get("code", None)
accessToken = json.get("accessToken", None)
refreshToken = json.get("refreshToken", None)
errorMessage = json.get("errorMessage", None)
# @classmethod
# def from_json(cls, json):
# all_edges = json.get("allEdges", None)
# dateAccessed = json.get("dateAccessed", None)
# dateCreated = json.get("dateCreated", None)
# dateModified = json.get("dateModified", None)
# deleted = json.get("deleted", None)
# externalId = json.get("externalId", None)
# itemDescription = json.get("itemDescription", None)
# starred = json.get("starred", None)
# version = json.get("version", None)
# id = json.get("id", None)
# importJson = json.get("importJson", None)
# handle = json.get("handle", None)
# displayName = json.get("displayName", None)
# service = json.get("service", None)
# itemType = json.get("itemType", None)
# avatarUrl = json.get("avatarUrl", None)
# identifier = json.get("identifier", None)
# secret = json.get("secret", None)
# code = json.get("code", None)
# accessToken = json.get("accessToken", None)
# refreshToken = json.get("refreshToken", None)
# errorMessage = json.get("errorMessage", None)
changelog = []
label = []
genericAttribute = []
measure = []
sharedWith = []
belongsTo = []
price = []
location = []
organization = []
contact = []
# changelog = []
# label = []
# genericAttribute = []
# measure = []
# sharedWith = []
# belongsTo = []
# price = []
# location = []
# organization = []
# contact = []
if all_edges is not None:
for edge_json in all_edges:
edge = Edge.from_json(edge_json)
if edge._type == "changelog" or edge._type == "~changelog":
changelog.append(edge)
elif edge._type == "label" or edge._type == "~label":
label.append(edge)
elif edge._type == "genericAttribute" or edge._type == "~genericAttribute":
genericAttribute.append(edge)
elif edge._type == "measure" or edge._type == "~measure":
measure.append(edge)
elif edge._type == "sharedWith" or edge._type == "~sharedWith":
sharedWith.append(edge)
elif edge._type == "belongsTo" or edge._type == "~belongsTo":
belongsTo.append(edge)
elif edge._type == "price" or edge._type == "~price":
price.append(edge)
elif edge._type == "location" or edge._type == "~location":
location.append(edge)
elif edge._type == "organization" or edge._type == "~organization":
organization.append(edge)
elif edge._type == "contact" or edge._type == "~contact":
contact.append(edge)
# if all_edges is not None:
# for edge_json in all_edges:
# edge = Edge.from_json(edge_json)
# if edge._type == "changelog" or edge._type == "~changelog":
# changelog.append(edge)
# elif edge._type == "label" or edge._type == "~label":
# label.append(edge)
# elif edge._type == "genericAttribute" or edge._type == "~genericAttribute":
# genericAttribute.append(edge)
# elif edge._type == "measure" or edge._type == "~measure":
# measure.append(edge)
# elif edge._type == "sharedWith" or edge._type == "~sharedWith":
# sharedWith.append(edge)
# elif edge._type == "belongsTo" or edge._type == "~belongsTo":
# belongsTo.append(edge)
# elif edge._type == "price" or edge._type == "~price":
# price.append(edge)
# elif edge._type == "location" or edge._type == "~location":
# location.append(edge)
# elif edge._type == "organization" or edge._type == "~organization":
# organization.append(edge)
# elif edge._type == "contact" or edge._type == "~contact":
# contact.append(edge)
res = cls(dateAccessed=dateAccessed, dateCreated=dateCreated, dateModified=dateModified,
deleted=deleted, externalId=externalId, itemDescription=itemDescription, starred=starred,
version=version, id=id, importJson=importJson, handle=handle, displayName=displayName,
service=service, itemType=itemType, avatarUrl=avatarUrl, changelog=changelog, label=label,
genericAttribute=genericAttribute, measure=measure, sharedWith=sharedWith, belongsTo=belongsTo,
price=price, location=location, organization=organization, identifier=identifier, secret=secret,
code=code, accessToken=accessToken, refreshToken=refreshToken, errorMessage=errorMessage, contact=contact)
for e in res.get_all_edges(): e.source = res
return res
# res = cls(dateAccessed=dateAccessed, dateCreated=dateCreated, dateModified=dateModified,
# deleted=deleted, externalId=externalId, itemDescription=itemDescription, starred=starred,
# version=version, id=id, importJson=importJson, handle=handle, displayName=displayName,
# service=service, itemType=itemType, avatarUrl=avatarUrl, changelog=changelog, label=label,
# genericAttribute=genericAttribute, measure=measure, sharedWith=sharedWith, belongsTo=belongsTo,
# price=price, location=location, organization=organization, identifier=identifier, secret=secret,
# code=code, accessToken=accessToken, refreshToken=refreshToken, errorMessage=errorMessage, contact=contact)
# for e in res.get_all_edges(): e.source = res
# return res
# A postal address.
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.pluginbase.ipynb (unless otherwise specified).
__all__ = ['POD_FULL_ADDRESS_ENV', 'POD_TARGET_ITEM_ENV', 'POD_OWNER_KEY_ENV', 'POD_AUTH_JSON_ENV', 'PluginBase',
'MyItem', 'MyPlugin', 'get_plugin_cls', 'run_plugin_from_run_id', 'register_base_schemas', 'PYMEMRI_FOLDER',
'POD_KEYS_FOLDER', 'POD_KEYS_FILENAME', 'POD_KEYS_FULL_FOLDER', 'DEFAULT_POD_KEY_PATH', 'store_keys',
'read_pod_key', 'run_plugin', 'simulate_run_plugin_from_frontend']
'MyItem', 'MyPlugin', 'get_plugin_cls', 'run_plugin_from_run_id', 'register_base_schemas', 'store_keys',
'parse_config', 'create_run_expanded', 'run_plugin', 'simulate_run_plugin_from_frontend']
# Cell
from ..data.schema import *
from ..pod.client import PodClient, DEFAULT_POD_ADDRESS
from ..pod.client import *
from ..imports import *
from ..pod.utils import *
from os import environ
from abc import ABCMeta
import abc
......@@ -17,7 +17,6 @@ import importlib
import string
import time
from enum import Enum
from ..pod.client import PodClient
from fastscript import *
import os
from .schema import PluginRun
......@@ -158,13 +157,6 @@ def register_base_schemas(client):
except Exception as e:
raise ValueError("Could not add base schema")
# Cell
def _run_plugin(client, plugin_run_id=None, verbose=False):
"""Runs an plugin, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with plugins)."""
register_base_schemas(client)
run_plugin_from_run_id(plugin_run_id, client)
# Cell
# hide
def _parse_env():
......@@ -177,19 +169,10 @@ def _parse_env():
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
# database_key = pod_service_payload[DATABASE_KEY_ENV]
# owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
# Cell
PYMEMRI_FOLDER = ".pymemri"
POD_KEYS_FOLDER = "pod_keys"
POD_KEYS_FILENAME = "keys.json"
POD_KEYS_FULL_FOLDER = Path.home() / ".pymemri" / POD_KEYS_FOLDER
DEFAULT_POD_KEY_PATH = POD_KEYS_FULL_FOLDER / POD_KEYS_FILENAME
# Cell
# hide
@call_parse
......@@ -210,17 +193,26 @@ def store_keys(path:Param("path to store the keys", str)=DEFAULT_POD_KEY_PATH,
# Cell
# hide
def read_pod_key(key_type, file=DEFAULT_POD_KEY_PATH):
try:
json = read_json(file)
except:
raise ValueError(f"Trying to read key from {file}, but file or key does not exist") from None
try:
key = json[key_type]
print(f"reading {key_type} from {file}")
return key
except:
raise ValueError(f"{key_type} not specified in {file}") from None
def parse_config(file, remove_container=False):
json_dict = read_json(file)
account = Account.from_json(json_dict["account"])
del json_dict["account"]
settings = json.dumps(json_dict["settings"])
del json_dict["settings"]
run = PluginRun.from_json(json_dict)
run.settings = settings
run.add_edge("account", account)
if remove_container:
run.containerImage = "none"
return run
def create_run_expanded(client, run):
client.create(run)
accounts = run.account
if accounts:
account=accounts[0]
client.create(account)
client.create_edge(run.get_edges("account")[0])
# Cell
@call_parse
......@@ -228,10 +220,11 @@ def run_plugin(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_A
plugin_run_id:Param("Run id of the plugin to be executed", str)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
read_args_from_env:Param("Owner key of the pod", bool)=False):
read_args_from_env:Param("Read the args from the environment", bool)=False,
config_file:Param("config file for the PluginRun", str)=None):
if read_args_from_env:
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env(env)
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env()
database_key=None
else:
if database_key is None: database_key = read_pod_key("database_key")
......@@ -241,9 +234,14 @@ def run_plugin(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_A
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
auth_json=pod_auth_json)
if config_file is not None:
run = parse_config(config_file, remove_container=True)
create_run_expanded(client, run)
plugin_run_id=run.id
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
_run_plugin(client=client, plugin_run_id=plugin_run_id)
run_plugin_from_run_id(run_id=plugin_run_id, client=client)
# Cell
from fastcore.script import call_parse, Param
......@@ -261,12 +259,7 @@ def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full addre
# Open question: This presumes Plugin item is already in pod before simulate_run_plugin_from_frontend is called.
if database_key is None: database_key = read_pod_key("database_key")
if owner_key is None: owner_key = read_pod_key("owner_key")
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
params = [pod_full_address, database_key, owner_key, container, plugin_module, plugin_name]
params = [pod_full_address, database_key, owner_key]
if (None in params):
raise ValueError(f"Defined some params to run indexer, but not all")
......@@ -289,9 +282,18 @@ def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full addre
register_base_schemas(client)
run = PluginRun(container, plugin_module, plugin_name, account=[account])
else:
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
run = PluginRun(container, plugin_module, plugin_name)
if plugin_id is not None:
persistent_state = client.get(plugin_id)
run.add_edge("persistentState", persistent_state)
client.create_edge(run.get_edges("persistentState")[0])
client.create(run)
print(f"\ncalling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {run.id}.")
print(f"*Check the pod log/console for debug output.*")
client.create(run)
print(f"Created PluginRun: {run.id}")
return run
\ No newline at end of file
......@@ -18,12 +18,12 @@ class PluginRun(Item):
**kwargs):
"""
PluginRun defines a the run of plugin `plugin_module.plugin_name`,
with an optional `config` string.
with an optional `settings` string.
Args:
plugin_module (str): module of the plugin.
plugin_name (str): class name of the plugin.
config (str, optional): Optional plugin configuration. For example,
settings (str, optional): Optional plugin configuration. For example,
this could be a `json.dumps` of a configuration dict. Defaults to None.
"""
super().__init__(**kwargs)
......
......@@ -10,6 +10,8 @@ from ..data.photo import Photo
from ..imports import *
from hashlib import sha256
from .db import DB
from .utils import *
from ..plugin.schema import *
# Cell
DEFAULT_POD_ADDRESS = "http://localhost:3030"
......@@ -19,7 +21,7 @@ POD_VERSION = "v4"
class PodClient:
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None, verbose=False):
auth_json=None, verbose=False, register_base_schema=True):
self.verbose = verbose
self.url = url
self.version = POD_VERSION
......@@ -33,11 +35,27 @@ class PodClient:
self.local_db = DB()
self.registered_classes=dict()
self.register_base_schemas()
@classmethod
def from_local_keys(cls, path=DEFAULT_POD_KEY_PATH, **kwargs):
return cls(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"), **kwargs)
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", error="", targetItemId="",
settings=""))
assert client.add_to_schema(PersistentState(pluginName="", state="", account=""))
assert client.add_to_schema(CVUStoredDefinition(name="", definition=""))
assert client.add_to_schema(Account(service="", identifier="", secret="", code="",
refreshToken="", errorMessage=""))
except Exception as e:
raise ValueError("Could not add base schema")
def add_to_db(self, node):
existing = self.local_db.get(node.id)
if existing is None and node.id is not None:
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/pod.utils.ipynb (unless otherwise specified).
__all__ = ['PYMEMRI_FOLDER', 'POD_KEYS_FOLDER', 'POD_KEYS_FILENAME', 'POD_KEYS_FULL_FOLDER', 'DEFAULT_POD_KEY_PATH',
'read_pod_key']
# Cell
from pathlib import Path
from ..data.basic import *
# Cell
PYMEMRI_FOLDER = ".pymemri"
POD_KEYS_FOLDER = "pod_keys"
POD_KEYS_FILENAME = "keys.json"
POD_KEYS_FULL_FOLDER = Path.home() / ".pymemri" / POD_KEYS_FOLDER
DEFAULT_POD_KEY_PATH = POD_KEYS_FULL_FOLDER / POD_KEYS_FILENAME
# Cell
def read_pod_key(key_type, file=DEFAULT_POD_KEY_PATH):
try:
json = read_json(file)
except:
raise ValueError(f"Trying to read key from {file}, but file or key does not exist") from None
try:
key = json[key_type]
print(f"reading {key_type} from {file}")
return key
except:
raise ValueError(f"{key_type} not specified in {file}") from None
\ No newline at end of file
......@@ -15,7 +15,7 @@ language = English
custom_sidebar = True
license = apache2
status = 2
console_scripts = run_plugin=pymemri.plugin.pluginbase:run_plugin simulate_run_plugin_from_frontend=pymemri.plugin.pluginbase:simulate_run_plugin_from_frontend store_keys=pymemri.plugin.pluginbase:store_keys
console_scripts = run_plugin=pymemri.plugin.pluginbase:run_plugin simulate_run_plugin_from_frontend=pymemri.plugin.pluginbase:simulate_run_plugin_from_frontend store_keys=pymemri.plugin.pluginbase:store_keys qr_simulator=pymemri.client_simulator.qr_simulator:run_qr_simulator
requirements = requests tqdm ipdb fastprogress fastscript opencv-python nbdev==1.1.5 matplotlib
nbs_path = nbs
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment