Commit 0fa6b693 authored by Alp Deniz Ogut's avatar Alp Deniz Ogut
Browse files

Init pluginbase with runtime

Showing with 198 additions and 883 deletions
+198 -883
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin.pluginbase
```
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.schema import *
from pymemri.pod.client import PodClient, DEFAULT_POD_ADDRESS
from pymemri.imports import *
from os import environ
from abc import ABCMeta
import abc
import json
import importlib
import string
import time
from enum import Enum
from pymemri.pod.client import PodClient
from fastscript import *
import os
from pymemri.plugin.schema import PluginRun, PersistentState
from pymemri.plugin.schema import PluginRun
from pymemri.data.basic import *
```
%% Cell type:code id: tags:
``` python
# hide
from nbdev.showdoc import *
```
%% Cell type:markdown id: tags:
# Plugins
%% Cell type:markdown id: tags:
PluginBase is the plugin class that the simplest plugin inherits.
Inheriting class should implement:
- run() that implements the logic of the plugin
- add_to_schema() for plugin specific item types
%% Cell type:code id: tags:
``` python
# export
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
POD_TARGET_ITEM_ENV = 'POD_TARGET_ITEM'
POD_OWNER_KEY_ENV = 'POD_OWNER'
POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
```
%% Cell type:code id: tags:
``` python
# export
# hide
class PluginBase(Item, metaclass=ABCMeta):
"""Base class for plugins"""
properties = Item.properties + ["name", "repository", "icon", "data_query", "bundleImage",
properties = Item.properties + ["name", "repository", "icon", "query", "bundleImage",
"runDestination", "pluginClass"]
edges = Item.edges
def __init__(self, pluginRun=None, persistentState=None, name=None, repository=None, icon=None,
def __init__(self, client=None, run_id=None, name=None, repository=None, icon=None,
query=None, bundleImage=None, runDestination=None, pluginClass=None, **kwargs):
if pluginClass is None: pluginClass=self.__class__.__name__
super().__init__(**kwargs)
self.pluginRun = pluginRun
self.persistentState = persistentState
self.client = client
self.run_id = run_id
self.name = name
self.repository = repository
self.icon = icon
self.query = query
self.bundleImage = bundleImage
self.runDestination = runDestination
self.pluginClass = pluginClass
def get_run(self):
return self.client.get(self.run_id, expanded=False)
def get_state(self):
return self.get_run().state
def get_account(self):
run = self.get_run()
return run.account[0] if len(run.account) > 0 else None
def get_settings(self):
run = self.get_run(self.client)
return json.loads(run.settings)
def set_vars(self, vars):
run = self.get_run()
for k,v in vars.items():
if hasattr(run, k):
setattr(run, k, v)
run.update(self.client)
def set_state(self, state, message=None):
self.set_vars({'state': state, 'message': message})
def set_account(self, account):
existing = self.get_account()
if existing:
account.id = existing.id
account.update(self.client)
else:
run = self.get_run()
run.add_edge('account', account)
run.update(self.client)
def set_settings(self, settings):
self.set_vars({'settings': json.dumps(settings)})
@abc.abstractmethod
def run(self, client):
def run(self):
raise NotImplementedError()
@abc.abstractmethod
def add_to_schema(self, client):
def add_to_schema(self):
raise NotImplementedError()
```
%% Cell type:markdown id: tags:
## Creating a plugin
%% Cell type:markdown id: tags:
The memri [pod](https://gitlab.memri.io/memri/pod) uses a plugin system to add features to the backend memri backend. Plugins can import your data (importers), change your data (indexers), or call other serivces. Users can define their own plugins to add new behaviour to their memri app. Let's use the following plugin as an example of how we can start plugins.
%% Cell type:code id: tags:
``` python
# export
# hide
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties + ["containerImage"]
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self, client):
def run(self):
print("running")
client.create(MyItem("some person", 20))
self.client.create(MyItem("some person", 20))
def add_to_schema(self, client):
client.add_to_schema(MyItem("my name", 10))
def add_to_schema(self):
self.client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:code id: tags:
``` python
# hide
MyPlugin()
```
%% Output
MyPlugin (#None)
%% Cell type:markdown id: tags:
```python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self, client):
def run(self):
print("running")
client.create(MyItem("some person", 20))
self.client.create(MyItem("some person", 20))
def add_to_schema(self, client):
client.add_to_schema(MyItem("my name", 10))
def add_to_schema(self):
self.client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:markdown id: tags:
Memri plugins need to define at least 2 methods: `.run()` and `.add_to_schema()`. `.run()` defines the logic of the plugin. `.add_to_schema()` defines the schema for the plugin in the pod. Note that currently, `add_to_schema` requires all item to **have all properties defined that are used in the plugin**. In the future, we might replace add_to_schema, to be done automatically, based on a declarative schema defined in the plugin.
%% Cell type:markdown id: tags:
## Helper classes -
%% Cell type:code id: tags:
``` python
# hide
# export
def get_plugin_cls(plugin_module, plugin_name):
try:
module = importlib.import_module(plugin_module)
plugin_cls = getattr(module, plugin_name)
return plugin_cls
except (ImportError, AttributeError):
raise ImportError(f"Unknown plugin: {plugin_module}.{plugin_name}")
def get_plugin_state(run):
"""
Returns the PersistentState associated with the run.
Returns `None` if no state was found, and the `PersistentState` if a single state is found.
Raises `ValueError` if more than one `PersistentState` is registered in run.
"""
plugin_state = run.get_edges("persistentState")
if len(plugin_state) == 0:
return None
elif len(plugin_state) == 1:
return plugin_state[0]
else:
raise ValueError(f"Expected a single PersistentState for plugin, found {len(plugin_state)}.")
def run_plugin_from_run_id(run_id, client):
"""
Args:
run_id (int): id of the PluginRun
client (PodClient): client containing PluginRun
return_plugin (bool): Returns created plugin instance for testing purposes.
"""
run = client.get(run_id)
plugin_state = get_plugin_state(run)
run = client.get(run_id)
plugin_cls = get_plugin_cls(run.pluginModule, run.pluginName)
plugin = plugin_cls(pluginRun=run, persistentState=plugin_state)
plugin.add_to_schema(client)
plugin = plugin_cls(client=client, run_id=run_id)
plugin.add_to_schema()
# TODO handle plugin status before run
plugin.run(client)
plugin.run()
return plugin
```
%% Cell type:code id: tags:
``` python
# export
# hide
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", error="", targetItemId=""))
assert client.add_to_schema(PersistentState(pluginName="", state="", account=""))
assert client.add_to_schema(CVUStoredDefinition(name="", definition=""))
assert client.add_to_schema(PluginRun("", "", "", state="", message="", targetItemId=""))
assert client.add_to_schema(Account(service="", identifier="", secret=""))
except Exception as e:
raise ValueError("Could not add base schema")
```
%% Cell type:markdown id: tags:
### Test Plugin Runtime
%% Cell type:code id: tags:
``` python
# hide
from pymemri.pod.client import PodClient
from pymemri.data.schema import Account
from pymemri.plugin.pluginbase import PluginRun, register_base_schemas, run_plugin_from_run_id
client = PodClient()
register_base_schemas(client)
# Create a dummy account to use for authentication within the plugin
account = Account(service="my_plugin_service", identifier="username", secret="password")
account.update(client)
# Create a run to enable plugin runtime
run = PluginRun("pymemri", "pymemri.plugin.pluginbase", "MyPlugin", account=[account])
run.update(client)
plugin = run_plugin_from_run_id(run.id, client)
# check if account is accessible
assert plugin.get_account().identifier == "username"
# set a state
plugin.set_state("test_state", message="test_message")
assert plugin.get_state() == "test_state"
```
%% Cell type:markdown id: tags:
## Run from id test -
%% Cell type:code id: tags:
``` python
```
%% Cell type:code id: tags:
``` python
# hide
# skip
client = PodClient()
run = PluginRun("pymemri", "pymemri.plugin.pluginbase", "MyPlugin", state="not started")
assert client.add_to_schema(PluginRun("", "", "", "", ""))
assert client.create(run)
run = client.get(run.id)
run_plugin_from_run_id(run.id, client)
```
%% Output
running
MyPlugin (#None)
%% Cell type:code id: tags:
``` python
# export
def _run_plugin(client, plugin_run_id=None, verbose=False):
"""Runs an plugin, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with plugins)."""
register_base_schemas(client)
run_plugin_from_run_id(plugin_run_id, client)
```
%% Cell type:markdown id: tags:
## _run_plugin test -
%% Cell type:code id: tags:
``` python
# hide
# skip
_run_plugin(client=client, plugin_run_id=run.id)
```
%% Output
running
%% Cell type:code id: tags:
``` python
# export
# hide
def _parse_env():
env = os.environ
print("Reading `run_plugin()` parameters from environment variables")
try:
pod_full_address = env.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
plugin_run_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
print(plugin_run_json)
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
# database_key = pod_service_payload[DATABASE_KEY_ENV]
# owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
```
%% Cell type:markdown id: tags:
## Running your plugin using the CLI
%% Cell type:markdown id: tags:
Plugins can be started using the pymemri `run_plugin` or `simulate_run_plugin_from_frontend` CLI. With `run_plugin` the plugin is invoked directly by spawning a new python process, while `simulate_run_plugin_from_frontend` requests the pod to spawn a new process, docker container, or kubernetes container, which in calls `run_plugin` (for more info see `simulate_run_plugin_from_frontend`. When using `run_plugin`, you can either pass your run arguments as parameters, or set them as environment variables. If both are set, the CLI will use the passed arguments.
%% Cell type:code id: tags:
``` python
# export
PYMEMRI_FOLDER = ".pymemri"
POD_KEYS_FOLDER = "pod_keys"
POD_KEYS_FILENAME = "keys.json"
POD_KEYS_FULL_FOLDER = Path.home() / ".pymemri" / POD_KEYS_FOLDER
DEFAULT_POD_KEY_PATH = POD_KEYS_FULL_FOLDER / POD_KEYS_FILENAME
```
%% Cell type:code id: tags:
``` python
# export
# hide
@call_parse
def store_keys(path:Param("path to store the keys", str)=DEFAULT_POD_KEY_PATH,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None):
if database_key is None: database_key = PodClient.generate_random_key()
if owner_key is None: owner_key = PodClient.generate_random_key()
obj = {"database_key": database_key,
"owner_key": owner_key}
Path(path).parent.mkdir(parents=True, exist_ok=True)
if path.exists():
timestr = time.strftime("%Y%m%d-%H%M%S")
path.rename(POD_KEYS_FULL_FOLDER / f"keys-{timestr}.json")
write_json(obj, path)
```
%% Cell type:code id: tags:
``` python
# hide
store_keys()
```
%% Cell type:code id: tags:
``` python
# export
# hide
def read_pod_key(key_type, file=DEFAULT_POD_KEY_PATH):
try:
json = read_json(file)
except:
raise ValueError(f"Trying to read key from {file}, but file or key does not exist") from None
try:
key = json[key_type]
print(f"reading {key_type} from {file}")
return key
except:
raise ValueError(f"{key_type} not specified in {file}") from None
```
%% Cell type:code id: tags:
``` python
# export
@call_parse
def run_plugin(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_ADDRESS,
plugin_run_id:Param("Run id of the plugin to be executed", str)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
read_args_from_env:Param("Owner key of the pod", bool)=False):
if read_args_from_env:
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env(env)
database_key=None
else:
if database_key is None: database_key = read_pod_key("database_key")
if owner_key is None: owner_key = read_pod_key("owner_key")
pod_auth_json = None
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
auth_json=pod_auth_json)
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
_run_plugin(client=client, plugin_run_id=plugin_run_id)
```
%% Cell type:markdown id: tags:
To start a plugin on your local machine, you can use the CLI. This will create a client for you, and run the code defined in `<myplugin>.run()`
%% Cell type:code id: tags:
``` python
client = PodClient(database_key=read_pod_key("database_key"), owner_key=read_pod_key("owner_key"))
run = PluginRun(containerImage="no_container", pluginModule="pymemri.plugin.pluginbase",
plugin="pymemri.plugin.pluginbase.MyPlugin",
pluginName="MyPlugin", state="not started")
assert client.add_to_schema(PluginRun("", "", "", "", "")) and client.create(run)
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
%% Cell type:code id: tags:
``` python
run_plugin(plugin_run_id=run.id)
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
pod_full_address=http://localhost:3030
owner_key=3789141683232392970480152516863298047903446543357097566923540000
running
%% Cell type:markdown id: tags:
> Note: The data that is created here should be in the pod in order for this to work
%% Cell type:markdown id: tags:
## Run from pod
%% Cell type:markdown id: tags:
In production, we start plugins by making an API call to the pod, which in turn creates an environment for the plugin and starts it using docker containers, kubernetes containers or a shell script. We can start this process using the `simulate_run_plugin_from_frontend` CLI. **Note that when using docker, provided container name should be "installed" within the Pod environemnt (e.g. `docker build -t pymemri .` for this repo) in order to start it.**
%% Cell type:markdown id: tags:
![running a plugin](images/running_a_plugin.svg)
%% Cell type:code id: tags:
``` python
# export
from fastcore.script import call_parse, Param
import os
@call_parse
def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_ADDRESS,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
plugin_id:Param("Pod ID of the plugin", str)=None,
container:Param("Pod container to run frod", str)=None,
plugin_path:Param("Plugin path", str)=None,
account_id:Param("Account id to be used inside the plugin", str)=None,
settings_file:Param("Plugin settings (json)", str)=None):
# TODO remove container, plugin_module, plugin_name and move to Plugin item.
# Open question: This presumes Plugin item is already in pod before simulate_run_plugin_from_frontend is called.
if database_key is None: database_key = read_pod_key("database_key")
if owner_key is None: owner_key = read_pod_key("owner_key")
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
params = [pod_full_address, database_key, owner_key, container, plugin_module, plugin_name]
if (None in params):
raise ValueError(f"Defined some params to run indexer, but not all")
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
for name, val in [("pod_full_address", pod_full_address), ("owner_key", owner_key)]:
print(f"{name}={val}")
if settings_file is not None:
with open(settings_file, 'r') as f:
settings = f.read()
else:
settings = None
if account_id is None:
account = None
else:
account = client.get(account_id)
print(f"Using {account}")
register_base_schemas(client)
run = PluginRun(container, plugin_module, plugin_name)
if plugin_id is not None:
persistent_state = client.get(plugin_id)
run.add_edge("persistentState", persistent_state)
client.create_edge(run.get_edges("persistentState")[0])
run = PluginRun(container, plugin_module, plugin_name, account=[account])
print(f"\ncalling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {run.id}.")
print(f"*Check the pod log/console for debug output.*")
client.create(run)
print(f"Created PluginRun: {run.id}")
return run
```
%% Cell type:code id: tags:
``` python
client = PodClient()
```
%% Cell type:code id: tags:
``` python
!simulate_run_plugin_from_frontend --plugin_path="pymemri.plugin.pluginbase.MyPlugin"
```
%% Output
reading database_key from /Users/koen/.pymemri/pod_keys/keys.json
reading owner_key from /Users/koen/.pymemri/pod_keys/keys.json
Inferred 'pymemri' as plugin container name
pod_full_address=http://localhost:3030
owner_key=3789141683232392970480152516863298047903446543357097566923540000
calling the `create` api on http://localhost:3030 to make your Pod start a plugin with id 06f965E82dee89cDC15B3CdFc9e1c1BA.
*Check the pod log/console for debug output.*
Created PluginRun: 06f965E82dee89cDC15B3CdFc9e1c1BA
%% Cell type:markdown id: tags:
## Appendix -
%% Cell type:code id: tags:
``` python
# hide
# client.start_plugin("pymemri", run.id)
```
%% Cell type:code id: tags:
``` python
# hide
# class StartPlugin(Item):
# properties = Item.properties + ["container", "targetItemId"]
# edges = Item.edges
# def __init__(self, container=None, targetItemId=None, **kwargs):
# super().__init__(**kwargs)
# self.container = container
# self.targetItemId = targetItemId
```
%% Cell type:code id: tags:
``` python
# hide
# class PluginSettings(Item):
# def __init__(self, settings_dict):
# self.settings_dict=settings_dict
# def __getattr__(self, k):
# if k in self.settings_dict:
# return self.settings_dict[k]
# @classmethod
# def from_string(cls, s):
# objs = json.loads(s)
# cls(objs)
```
%% Cell type:code id: tags:
``` python
# hide
# # export
# def generate_test_env(client, indexer_run):
# payload = json.dumps({DATABASE_KEY_ENV: client.database_key, OWNER_KEY_ENV: client.owner_key})
# return {POD_FULL_ADDRESS_ENV: DEFAULT_POD_ADDRESS,
# POD_TARGET_ITEM: indexer_run.id,
# POD_SERVICE_PAYLOAD_ENV: payload}
```
%% Cell type:code id: tags:
``` python
# hide
# run_plugin(env=generate_test_env(client, run))
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted pod.client.ipynb.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id:5b788afc tags:
%% Cell type:code id:48d3c421 tags:
``` python
# default_exp plugin.schema
```
%% Cell type:code id:f8766d7d tags:
%% Cell type:code id:54a8301b tags:
``` python
# export
# hide
import random, string
from pymemri.data.itembase import Item
```
%% Cell type:code id:21341d97 tags:
%% Cell type:code id:ec7eeeb1 tags:
``` python
# export
# hide
class PluginRun(Item):
properties = Item.properties + ["containerImage", "pluginModule", "pluginName", "state", "targetItemId",
"oAuthUrl", "error", "config"]
edges = Item.edges + ["view", "persistentState"]
"oAuthUrl", "message", "config"]
edges = Item.edges + ["account"]
def __init__(self, containerImage, pluginModule, pluginName, state=None, config=None, view=None, targetItemId=None, oAuthUrl=None, error=None, persistentState=None,
def __init__(self, containerImage, pluginModule, pluginName, account=None, state=None, settings=None, targetItemId=None, oAuthUrl=None, message=None,
**kwargs):
"""
PluginRun defines a the run of plugin `plugin_module.plugin_name`,
with an optional `config` string.
Args:
plugin_module (str): module of the plugin.
plugin_name (str): class name of the plugin.
config (str, optional): Optional plugin configuration. For example,
this could be a `json.dumps` of a configuration dict. Defaults to None.
"""
super().__init__(**kwargs)
self.pluginModule = pluginModule
self.pluginName = pluginName
self.containerImage=containerImage
id_ = "".join([random.choice(string.hexdigits) for i in range(32)]) if targetItemId is None else targetItemId
self.containerImage = containerImage
id_ = ''.join([random.choice(string.hexdigits) for i in range(32)]) if targetItemId is None else targetItemId
self.targetItemId=id_
self.id=id_
self.state = state # for stateful plugins
self.config = config
self.settings = settings
self.oAuthUrl = oAuthUrl # for authenticated plugins
self.error = error # universa
self.message = message # universa
self.persistentState = persistentState if persistentState is not None else []
self.view = view if view is not None else []
```
%% Cell type:code id:a8f74669 tags:
``` python
# export
# hide
class PersistentState(Item):
""" Persistent state variables saved for plugin such as views, accounts, the last state to resume from etc. """
properties = Item.properties + ["pluginId", "state"]
edges = Item.edges + ["account", "view"]
def __init__(self, pluginName=None, state=None, account=None, view=None, **kwargs):
super().__init__(**kwargs)
self.pluginName = pluginName
self.state = state
self.account = account if account is not None else []
self.view = view if view is not None else []
def get_state(self):
return self.state
def set_state(self, client, state_str):
self.state = state_str
client.update_item(self)
def get_account(self):
if len(self.account) == 0:
return None
else:
return self.account[0]
def set_account(self, client, account):
if len(self.account) == 0:
if not account.id:
client.create(account)
self.add_edge('account', account)
self.update(client)
else:
existing_account = self.account[0]
for prop in account.properties:
value = getattr(account, prop, None)
if value and hasattr(existing_account, prop):
setattr(existing_account, prop, value)
existing_account.update(client)
def get_view_by_name(self, view_name):
for cvu in self.view:
if cvu.name == view_name:
return cvu
def set_views(self, client, views=None):
for view in views:
client.create(view)
self.add_edge('view', view)
self.update(client)
return True
```
......
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin.stateful
```
%% Cell type:code id: tags:
``` python
#export
from pymemri.data.schema import Item
from pymemri.plugin.pluginbase import PluginBase
import logging
```
%% Cell type:code id: tags:
``` python
#hide
from nbdev.showdoc import *
```
%% Cell type:markdown id: tags:
# Stateful Plugins
%% Cell type:markdown id: tags:
If a plugin with persistent and runtime states are going to be used, this includes the plugins with user interaction, then StatefulPlugin should be used.
If a plugin needs views, an account to login or a last state to continue then a persistent state should be deployed into the pod.
%% Cell type:markdown id: tags:
## Persistent state of a plugin
%% Cell type:code id: tags:
``` python
# export
class PersistentState(Item):
""" Persistent state variables saved for plugin such as views, accounts, the last state to resume from etc. """
properties = Item.properties + ["pluginId", "state"]
edges = Item.edges + ["account", "view"]
def __init__(self, pluginName=None, state=None, account=None, view=None, **kwargs):
super().__init__(**kwargs)
self.pluginName = pluginName
self.state = state
self.account = account if account is not None else []
self.view = view if view is not None else []
def get_state(self):
return self.state
def set_state(self, client, state_str):
self.state = state_str
client.update_item(self)
def get_account(self):
if len(self.account) == 0:
return None
else:
return self.account[0]
def set_account(self, client, account):
if len(self.account) == 0:
if not account.id:
client.create(account)
self.add_edge('account', account)
self.update(client)
else:
existing_account = self.account[0]
for prop in account.properties:
value = getattr(account, prop, None)
if value and hasattr(existing_account, prop):
setattr(existing_account, prop, value)
existing_account.update(client)
def get_view_by_name(self, view_name):
for cvu in self.view:
if cvu.name == view_name:
return cvu
def set_views(self, client, views=None):
for view in views:
client.create(view)
self.add_edge('view', view)
self.update(client)
return True
```
%% Cell type:markdown id: tags:
## Runtime state of a plugin
%% Cell type:code id: tags:
``` python
# export
RUN_IDLE = 'idle' #1
RUN_INITIALIZED = 'initilized' #2
RUN_USER_ACTION_NEEDED = 'userActionNeeded' # 2-3
RUN_USER_ACTION_COMPLETED = 'ready' # 2-3
RUN_STARTED = 'start' #3
RUN_FAILED = 'error' # 3-4
RUN_COMPLETED = 'done' #4
logging.basicConfig(format='%(asctime)s [%(levelname)s] - %(message)s')
```
%% Cell type:markdown id: tags:
StatefulPlugin is a sub-class of PluginBase that allows setting both runtime and persistent states.
%% Cell type:code id: tags:
``` python
# export
class StatefulPlugin(PluginBase):
""" Provides state/view setter and getter functions to plugin runtime """
properties = PluginBase.properties + ["runId", "persistenceId"]
edges = PluginBase.edges
def __init__(self, runId=None, persistenceId=None, **kwargs):
super().__init__(**kwargs)
self.runId = runId
self.persistenceId = persistenceId
def persist(self, client, pluginName, views=None, account=None):
persistence = PersistentState(pluginName=pluginName)
client.create(persistence)
self.persistenceId = persistence.id
if views:
persistence.set_views(client, views)
if account:
persistence.set_account(account)
def get_state(self, client, pluginName=None):
if self.persistenceId:
return client.get(self.persistenceId)
elif pluginName:
result = client.search({'type': 'PersistentState', 'pluginName': pluginName})
if len(result) > 0:
self.persistenceId = result[0].id
return self.get_state(client)
def set_account(self, client, account):
state = self.get_state(client)
state.set_account(account)
def set_state_str(self, client, state_str):
state = self.get_state(client)
state.set_state(client, state_str)
def initialized(self, client):
logging.warning("PLUGIN run is initialized")
self.set_run_vars(client, {'state':RUN_INITIALIZED})
def started(self, client):
logging.warning("PLUGIN run is started")
self.set_run_vars(client, {'state':RUN_STARTED})
def failed(self, client, error):
logging.error(f"PLUGIN run is failed: {error}")
print("Exception while running plugin:", error)
self.set_run_vars(client, {'state':RUN_FAILED, 'error': str(error)})
def completed(self, client):
logging.warning("PLUGIN run is completed")
self.set_run_vars(client, {'state':RUN_COMPLETED})
def complete_action(self, client):
self.set_run_vars(client, {'state': RUN_USER_ACTION_COMPLETED})
def action_required(self, client):
self.set_run_vars(client, {'state': RUN_USER_ACTION_NEEDED})
def is_action_required(self, client):
return self.get_run_state(client) == RUN_USER_ACTION_NEEDED
def is_action_completed(self, client):
return self.get_run_state(client) == RUN_USER_ACTION_COMPLETED
def is_completed(self, client):
return self.get_run_state(client) == RUN_COMPLETED
def is_failed(self, client):
return self.get_run_state(client) == RUN_FAILED
def is_daemon(self, client):
run = self.get_run(client, expanded=False)
return run.interval and run.interval > 0
def get_run(self, client, expanded=False):
return client.get(self.runId, expanded=expanded)
def get_run_state(self, client):
start_plugin = self.get_run(client)
return start_plugin.state
def set_run_vars(self, client, vars):
start_plugin = client.get(self.runId, expanded=False)
for k,v in vars.items():
if hasattr(start_plugin, k):
setattr(start_plugin, k, v)
client.update_item(start_plugin)
def get_run_view(self, client):
run = self.get_run(client, expanded=True)
if run:
for view in run.view:
return view
def set_run_view(self, client, view_name):
state = self.get_state(client)
view = state.get_view_by_name(view_name)
if view:
attached_CVU_edge = self.get_run_view(client) # index error here if there is no already bound CVU
if attached_CVU_edge:
logging.warning(f"Plugin Run already has a view. Updating with {view_name}")
attached_CVU_edge.target = view # update CVU
attached_CVU_edge.update(client) # having doubts if this really updates the existing edge
else:
logging.warning(f"Plugin Run does not have a view. Creating {view_name}")
run = self.get_run(client)
run.add_edge('view', view)
run.update(client)
return True
return False
def add_to_schema(self, client):
assert client.add_to_schema(PersistentState("", ""))
```
%% Cell type:markdown id: tags:
## Setting and communicating states through PluginRun
An example stateful plugin is below.
%% Cell type:code id: tags:
``` python
# export
# hide
from pymemri.data.schema import Person
class MyStatefulPlugin(StatefulPlugin):
def __init__(self, runId=None, **kwargs):
super().__init__(runId=runId, **kwargs)
def run(self, client):
# plugin's magic happens here
# manipulate run state
self.set_run_vars({'state': 'Running'})
# create items in POD
imported_person = Person(firstName="Hari", lastName="Seldon")
client.create(imported_person)
# set persistent state
self.set_state_str("continue_from:5021")
def add_to_schema(self, client):
print("Adding schema")
super().add_to_schema(client)
# add plugin-specific schemas here
client.add_to_schema(Person(firstName="", lastName=""))
pass
```
%% Cell type:markdown id: tags:
Through this inherited class you can set states to control the flow and views that are displayed to user.
%% Cell type:markdown id: tags:
```python
from pymemri.data.schema import Person
class MyStatefulPlugin(StatefulPlugin):
def __init__(self, runId=None, views=None, **kwargs):
super().__init__(runId=runId, views=None, **kwargs)
def run(self, client):
# plugin's magic happens here
# manipulate run state
self.set_run_vars(client, {'state': 'Running'})
# set UI view (CVU)
self.set_run_view(client, 'splash-screen')
# create items in POD
imported_person = Person(firstName="Hari", lastName="Seldon")
client.create(imported_person)
# set persistent state
self.set_state_str("continue_from:5021")
return
def add_to_schema(self, client):
print("Adding schema")
super().add_to_schema(client)
# add plugin-specific schemas here
client.add_to_schema(Person(firstName="", lastName=""))
pass```
%% Cell type:code id: tags:
``` python
# hide
from pymemri.pod.client import PodClient
from pymemri.data.schema import CVUStoredDefinition, Account
from pymemri.plugin.pluginbase import PluginRun, register_base_schemas
from pymemri.plugin.stateful import StatefulPlugin, PersistentState, MyStatefulPlugin, RUN_STARTED
client = PodClient()
register_base_schemas(client)
# Create and persist plugin if not already
stateful = MyStatefulPlugin()
# Add plugin-specific item types to schema
stateful.add_to_schema(client)
# Fetches persistent state of a plugin by name
persistence = stateful.get_state(client, "myStatefulPlugin")
if not persistence:
views = [CVUStoredDefinition(name="login-view"), CVUStoredDefinition(name="other-view")]
stateful.persist(client, "myStatefulPlugin", views=views, account=None)
# create a run
run = PluginRun(containerImage="stateful_plugin", pluginModule="pymemri.plugin.stateful", pluginName="MyStatefulPlugin")
client.create(run)
# set run id of the plugin
stateful.runId = run.id
# set START state
stateful.started(client)
assert stateful.get_run_state(client) == RUN_STARTED
assert stateful.set_run_view(client, "non-existent-view") == False
assert stateful.set_run_view(client, "login-view") == True
assert stateful.get_run_view(client).name == 'login-view'
# set a new persistent account
stateful.set_state_str(client, "the last state")
assert stateful.get_state(client).state == "the last state"
```
......@@ -45,7 +45,6 @@ index = {"read_file": "basic.ipynb",
"MyItem": "plugin.pluginbase.ipynb",
"MyPlugin": "plugin.pluginbase.ipynb",
"get_plugin_cls": "plugin.pluginbase.ipynb",
"get_plugin_state": "plugin.pluginbase.ipynb",
"run_plugin_from_run_id": "plugin.pluginbase.ipynb",
"register_base_schemas": "plugin.pluginbase.ipynb",
"PYMEMRI_FOLDER": "plugin.pluginbase.ipynb",
......@@ -58,16 +57,6 @@ index = {"read_file": "basic.ipynb",
"run_plugin": "plugin.pluginbase.ipynb",
"simulate_run_plugin_from_frontend": "plugin.pluginbase.ipynb",
"PluginRun": "plugin.schema.ipynb",
"PersistentState": "plugin.stateful.ipynb",
"RUN_IDLE": "plugin.stateful.ipynb",
"RUN_INITIALIZED": "plugin.stateful.ipynb",
"RUN_USER_ACTION_NEEDED": "plugin.stateful.ipynb",
"RUN_USER_ACTION_COMPLETED": "plugin.stateful.ipynb",
"RUN_STARTED": "plugin.stateful.ipynb",
"RUN_FAILED": "plugin.stateful.ipynb",
"RUN_COMPLETED": "plugin.stateful.ipynb",
"StatefulPlugin": "plugin.stateful.ipynb",
"MyStatefulPlugin": "plugin.stateful.ipynb",
"DEFAULT_POD_ADDRESS": "pod.client.ipynb",
"POD_VERSION": "pod.client.ipynb",
"PodClient": "pod.client.ipynb",
......@@ -81,7 +70,6 @@ modules = ["data/basic.py",
"data/itembase.py",
"plugin/pluginbase.py",
"plugin/schema.py",
"plugin/stateful.py",
"pod/client.py",
"pod/db.py"]
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.pluginbase.ipynb (unless otherwise specified).
__all__ = ['POD_FULL_ADDRESS_ENV', 'POD_TARGET_ITEM_ENV', 'POD_OWNER_KEY_ENV', 'POD_AUTH_JSON_ENV', 'PluginBase',
'MyItem', 'MyPlugin', 'get_plugin_cls', 'get_plugin_state', 'run_plugin_from_run_id',
'register_base_schemas', 'PYMEMRI_FOLDER', 'POD_KEYS_FOLDER', 'POD_KEYS_FILENAME', 'POD_KEYS_FULL_FOLDER',
'DEFAULT_POD_KEY_PATH', 'store_keys', 'read_pod_key', 'run_plugin', 'simulate_run_plugin_from_frontend']
'MyItem', 'MyPlugin', 'get_plugin_cls', 'run_plugin_from_run_id', 'register_base_schemas', 'PYMEMRI_FOLDER',
'POD_KEYS_FOLDER', 'POD_KEYS_FILENAME', 'POD_KEYS_FULL_FOLDER', 'DEFAULT_POD_KEY_PATH', 'store_keys',
'read_pod_key', 'run_plugin', 'simulate_run_plugin_from_frontend']
# Cell
from ..data.schema import *
......@@ -20,7 +20,7 @@ from enum import Enum
from ..pod.client import PodClient
from fastscript import *
import os
from .schema import PluginRun, PersistentState
from .schema import PluginRun
from ..data.basic import *
# Cell
......@@ -33,16 +33,16 @@ POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
# hide
class PluginBase(Item, metaclass=ABCMeta):
"""Base class for plugins"""
properties = Item.properties + ["name", "repository", "icon", "data_query", "bundleImage",
properties = Item.properties + ["name", "repository", "icon", "query", "bundleImage",
"runDestination", "pluginClass"]
edges = Item.edges
def __init__(self, pluginRun=None, persistentState=None, name=None, repository=None, icon=None,
def __init__(self, client=None, run_id=None, name=None, repository=None, icon=None,
query=None, bundleImage=None, runDestination=None, pluginClass=None, **kwargs):
if pluginClass is None: pluginClass=self.__class__.__name__
super().__init__(**kwargs)
self.pluginRun = pluginRun
self.persistentState = persistentState
self.client = client
self.run_id = run_id
self.name = name
self.repository = repository
self.icon = icon
......@@ -51,12 +51,49 @@ class PluginBase(Item, metaclass=ABCMeta):
self.runDestination = runDestination
self.pluginClass = pluginClass
def get_run(self):
return self.client.get(self.run_id, expanded=False)
def get_state(self):
return self.get_run().state
def get_account(self):
run = self.get_run()
return run.account[0] if len(run.account) > 0 else None
def get_settings(self):
run = self.get_run(self.client)
return json.loads(run.settings)
def set_vars(self, vars):
run = self.get_run()
for k,v in vars.items():
if hasattr(run, k):
setattr(run, k, v)
run.update(self.client)
def set_state(self, state, message=None):
self.set_vars({'state': state, 'message': message})
def set_account(self, account):
existing = self.get_account()
if existing:
account.id = existing.id
account.update(self.client)
else:
run = self.get_run()
run.add_edge('account', account)
run.update(self.client)
def set_settings(self, settings):
self.set_vars({'settings': json.dumps(settings)})
@abc.abstractmethod
def run(self, client):
def run(self):
raise NotImplementedError()
@abc.abstractmethod
def add_to_schema(self, client):
def add_to_schema(self):
raise NotImplementedError()
# Cell
......@@ -77,12 +114,12 @@ class MyPlugin(PluginBase):
def __init__(self, **kwargs):
super().__init__(**kwargs)
def run(self, client):
def run(self):
print("running")
client.create(MyItem("some person", 20))
self.client.create(MyItem("some person", 20))
def add_to_schema(self, client):
client.add_to_schema(MyItem("my name", 10))
def add_to_schema(self):
self.client.add_to_schema(MyItem("my name", 10))
# Cell
# export
......@@ -94,22 +131,6 @@ def get_plugin_cls(plugin_module, plugin_name):
except (ImportError, AttributeError):
raise ImportError(f"Unknown plugin: {plugin_module}.{plugin_name}")
def get_plugin_state(run):
"""
Returns the PersistentState associated with the run.
Returns `None` if no state was found, and the `PersistentState` if a single state is found.
Raises `ValueError` if more than one `PersistentState` is registered in run.
"""
plugin_state = run.get_edges("persistentState")
if len(plugin_state) == 0:
return None
elif len(plugin_state) == 1:
return plugin_state[0]
else:
raise ValueError(f"Expected a single PersistentState for plugin, found {len(plugin_state)}.")
def run_plugin_from_run_id(run_id, client):
"""
Args:
......@@ -117,15 +138,14 @@ def run_plugin_from_run_id(run_id, client):
client (PodClient): client containing PluginRun
return_plugin (bool): Returns created plugin instance for testing purposes.
"""
run = client.get(run_id)
plugin_state = get_plugin_state(run)
run = client.get(run_id)
plugin_cls = get_plugin_cls(run.pluginModule, run.pluginName)
plugin = plugin_cls(pluginRun=run, persistentState=plugin_state)
plugin.add_to_schema(client)
plugin = plugin_cls(client=client, run_id=run_id)
plugin.add_to_schema()
# TODO handle plugin status before run
plugin.run(client)
plugin.run()
return plugin
......@@ -133,9 +153,8 @@ def run_plugin_from_run_id(run_id, client):
# hide
def register_base_schemas(client):
try:
assert client.add_to_schema(PluginRun("", "", "", state="", error="", targetItemId=""))
assert client.add_to_schema(PersistentState(pluginName="", state="", account=""))
assert client.add_to_schema(CVUStoredDefinition(name="", definition=""))
assert client.add_to_schema(PluginRun("", "", "", state="", message="", targetItemId=""))
assert client.add_to_schema(Account(service="", identifier="", secret=""))
except Exception as e:
raise ValueError("Could not add base schema")
......@@ -234,9 +253,9 @@ import os
def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full address", str)=DEFAULT_POD_ADDRESS,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
plugin_id:Param("Pod ID of the plugin", str)=None,
container:Param("Pod container to run frod", str)=None,
plugin_path:Param("Plugin path", str)=None,
account_id:Param("Account id to be used inside the plugin", str)=None,
settings_file:Param("Plugin settings (json)", str)=None):
# TODO remove container, plugin_module, plugin_name and move to Plugin item.
# Open question: This presumes Plugin item is already in pod before simulate_run_plugin_from_frontend is called.
......@@ -261,12 +280,14 @@ def simulate_run_plugin_from_frontend(pod_full_address:Param("The pod full addre
else:
settings = None
if account_id is None:
account = None
else:
account = client.get(account_id)
print(f"Using {account}")
register_base_schemas(client)
run = PluginRun(container, plugin_module, plugin_name)
if plugin_id is not None:
persistent_state = client.get(plugin_id)
run.add_edge("persistentState", persistent_state)
client.create_edge(run.get_edges("persistentState")[0])
run = PluginRun(container, plugin_module, plugin_name, account=[account])
print(f"\ncalling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {run.id}.")
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.schema.ipynb (unless otherwise specified).
__all__ = ['PluginRun', 'PersistentState']
__all__ = ['PluginRun']
# Cell
# hide
......@@ -11,10 +11,10 @@ from ..data.itembase import Item
# hide
class PluginRun(Item):
properties = Item.properties + ["containerImage", "pluginModule", "pluginName", "state", "targetItemId",
"oAuthUrl", "error", "config"]
edges = Item.edges + ["view", "persistentState"]
"oAuthUrl", "message", "config"]
edges = Item.edges + ["account"]
def __init__(self, containerImage, pluginModule, pluginName, state=None, config=None, view=None, targetItemId=None, oAuthUrl=None, error=None, persistentState=None,
def __init__(self, containerImage, pluginModule, pluginName, account=None, state=None, settings=None, targetItemId=None, oAuthUrl=None, message=None,
**kwargs):
"""
PluginRun defines a the run of plugin `plugin_module.plugin_name`,
......@@ -29,68 +29,13 @@ class PluginRun(Item):
super().__init__(**kwargs)
self.pluginModule = pluginModule
self.pluginName = pluginName
self.containerImage=containerImage
id_ = "".join([random.choice(string.hexdigits) for i in range(32)]) if targetItemId is None else targetItemId
self.containerImage = containerImage
id_ = ''.join([random.choice(string.hexdigits) for i in range(32)]) if targetItemId is None else targetItemId
self.targetItemId=id_
self.id=id_
self.state = state # for stateful plugins
self.config = config
self.settings = settings
self.oAuthUrl = oAuthUrl # for authenticated plugins
self.error = error # universa
self.message = message # universa
self.persistentState = persistentState if persistentState is not None else []
self.view = view if view is not None else []
# Cell
# hide
class PersistentState(Item):
""" Persistent state variables saved for plugin such as views, accounts, the last state to resume from etc. """
properties = Item.properties + ["pluginId", "state"]
edges = Item.edges + ["account", "view"]
def __init__(self, pluginName=None, state=None, account=None, view=None, **kwargs):
super().__init__(**kwargs)
self.pluginName = pluginName
self.state = state
self.account = account if account is not None else []
self.view = view if view is not None else []
def get_state(self):
return self.state
def set_state(self, client, state_str):
self.state = state_str
client.update_item(self)
def get_account(self):
if len(self.account) == 0:
return None
else:
return self.account[0]
def set_account(self, client, account):
if len(self.account) == 0:
if not account.id:
client.create(account)
self.add_edge('account', account)
self.update(client)
else:
existing_account = self.account[0]
for prop in account.properties:
value = getattr(account, prop, None)
if value and hasattr(existing_account, prop):
setattr(existing_account, prop, value)
existing_account.update(client)
def get_view_by_name(self, view_name):
for cvu in self.view:
if cvu.name == view_name:
return cvu
def set_views(self, client, views=None):
for view in views:
client.create(view)
self.add_edge('view', view)
self.update(client)
return True
\ No newline at end of file
self.account = account if account is not None else []
\ No newline at end of file
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.stateful.ipynb (unless otherwise specified).
__all__ = ['PersistentState', 'RUN_IDLE', 'RUN_INITIALIZED', 'RUN_USER_ACTION_NEEDED', 'RUN_USER_ACTION_COMPLETED',
'RUN_STARTED', 'RUN_FAILED', 'RUN_COMPLETED', 'StatefulPlugin', 'MyStatefulPlugin']
# Cell
from ..data.schema import Item
from .pluginbase import PluginBase
import logging
# Cell
class PersistentState(Item):
""" Persistent state variables saved for plugin such as views, accounts, the last state to resume from etc. """
properties = Item.properties + ["pluginId", "state"]
edges = Item.edges + ["account", "view"]
def __init__(self, pluginName=None, state=None, account=None, view=None, **kwargs):
super().__init__(**kwargs)
self.pluginName = pluginName
self.state = state
self.account = account if account is not None else []
self.view = view if view is not None else []
def get_state(self):
return self.state
def set_state(self, client, state_str):
self.state = state_str
client.update_item(self)
def get_account(self):
if len(self.account) == 0:
return None
else:
return self.account[0]
def set_account(self, client, account):
if len(self.account) == 0:
if not account.id:
client.create(account)
self.add_edge('account', account)
self.update(client)
else:
existing_account = self.account[0]
for prop in account.properties:
value = getattr(account, prop, None)
if value and hasattr(existing_account, prop):
setattr(existing_account, prop, value)
existing_account.update(client)
def get_view_by_name(self, view_name):
for cvu in self.view:
if cvu.name == view_name:
return cvu
def set_views(self, client, views=None):
for view in views:
client.create(view)
self.add_edge('view', view)
self.update(client)
return True
# Cell
RUN_IDLE = 'idle' #1
RUN_INITIALIZED = 'initilized' #2
RUN_USER_ACTION_NEEDED = 'userActionNeeded' # 2-3
RUN_USER_ACTION_COMPLETED = 'ready' # 2-3
RUN_STARTED = 'start' #3
RUN_FAILED = 'error' # 3-4
RUN_COMPLETED = 'done' #4
logging.basicConfig(format='%(asctime)s [%(levelname)s] - %(message)s')
# Cell
class StatefulPlugin(PluginBase):
""" Provides state/view setter and getter functions to plugin runtime """
properties = PluginBase.properties + ["runId", "persistenceId"]
edges = PluginBase.edges
def __init__(self, runId=None, persistenceId=None, **kwargs):
super().__init__(**kwargs)
self.runId = runId
self.persistenceId = persistenceId
def persist(self, client, pluginName, views=None, account=None):
persistence = PersistentState(pluginName=pluginName)
client.create(persistence)
self.persistenceId = persistence.id
if views:
persistence.set_views(client, views)
if account:
persistence.set_account(account)
def get_state(self, client, pluginName=None):
if self.persistenceId:
return client.get(self.persistenceId)
elif pluginName:
result = client.search({'type': 'PersistentState', 'pluginName': pluginName})
if len(result) > 0:
self.persistenceId = result[0].id
return self.get_state(client)
def set_account(self, client, account):
state = self.get_state(client)
state.set_account(account)
def set_state_str(self, client, state_str):
state = self.get_state(client)
state.set_state(client, state_str)
def initialized(self, client):
logging.warning("PLUGIN run is initialized")
self.set_run_vars(client, {'state':RUN_INITIALIZED})
def started(self, client):
logging.warning("PLUGIN run is started")
self.set_run_vars(client, {'state':RUN_STARTED})
def failed(self, client, error):
logging.error(f"PLUGIN run is failed: {error}")
print("Exception while running plugin:", error)
self.set_run_vars(client, {'state':RUN_FAILED, 'error': str(error)})
def completed(self, client):
logging.warning("PLUGIN run is completed")
self.set_run_vars(client, {'state':RUN_COMPLETED})
def complete_action(self, client):
self.set_run_vars(client, {'state': RUN_USER_ACTION_COMPLETED})
def action_required(self, client):
self.set_run_vars(client, {'state': RUN_USER_ACTION_NEEDED})
def is_action_required(self, client):
return self.get_run_state(client) == RUN_USER_ACTION_NEEDED
def is_action_completed(self, client):
return self.get_run_state(client) == RUN_USER_ACTION_COMPLETED
def is_completed(self, client):
return self.get_run_state(client) == RUN_COMPLETED
def is_failed(self, client):
return self.get_run_state(client) == RUN_FAILED
def is_daemon(self, client):
run = self.get_run(client, expanded=False)
return run.interval and run.interval > 0
def get_run(self, client, expanded=False):
return client.get(self.runId, expanded=expanded)
def get_run_state(self, client):
start_plugin = self.get_run(client)
return start_plugin.state
def set_run_vars(self, client, vars):
start_plugin = client.get(self.runId, expanded=False)
for k,v in vars.items():
if hasattr(start_plugin, k):
setattr(start_plugin, k, v)
client.update_item(start_plugin)
def get_run_view(self, client):
run = self.get_run(client, expanded=True)
if run:
for view in run.view:
return view
def set_run_view(self, client, view_name):
state = self.get_state(client)
view = state.get_view_by_name(view_name)
if view:
attached_CVU_edge = self.get_run_view(client) # index error here if there is no already bound CVU
if attached_CVU_edge:
logging.warning(f"Plugin Run already has a view. Updating with {view_name}")
attached_CVU_edge.target = view # update CVU
attached_CVU_edge.update(client) # having doubts if this really updates the existing edge
else:
logging.warning(f"Plugin Run does not have a view. Creating {view_name}")
run = self.get_run(client)
run.add_edge('view', view)
run.update(client)
return True
return False
def add_to_schema(self, client):
assert client.add_to_schema(PersistentState("", ""))
# Cell
# hide
from ..data.schema import Person
class MyStatefulPlugin(StatefulPlugin):
def __init__(self, runId=None, **kwargs):
super().__init__(runId=runId, **kwargs)
def run(self, client):
# plugin's magic happens here
# manipulate run state
self.set_run_vars({'state': 'Running'})
# create items in POD
imported_person = Person(firstName="Hari", lastName="Seldon")
client.create(imported_person)
# set persistent state
self.set_state_str("continue_from:5021")
def add_to_schema(self, client):
print("Adding schema")
super().add_to_schema(client)
# add plugin-specific schemas here
client.add_to_schema(Person(firstName="", lastName=""))
pass
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment