Commit 1398b360 authored by Eelco van der Wel's avatar Eelco van der Wel :speech_balloon:
Browse files

allow running webserver without pluginRun

parent b458e229
Pipeline #9469 passed with stage
in 1 minute and 24 seconds
Showing with 48 additions and 4 deletions
+48 -4
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin.pluginbase
```
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.schema import *
from pymemri.pod.client import *
from pymemri.imports import *
from pymemri.plugin.states import *
from pymemri.pod.utils import *
from pymemri.plugin.listeners import get_abort_plugin_listener
from pymemri.webserver.webserver import WebServer
from os import environ
from abc import ABCMeta
import abc
import json
import importlib
import string
import time
from enum import Enum
from fastscript import *
import os
from pymemri.plugin.schema import Account, PluginRun
from pymemri.data.basic import *
from pymemri.pod.client import Dog, PodClient, DEFAULT_POD_ADDRESS
import warnings
from pymemri.data.basic import write_json
from pymemri.plugin.authenticators.credentials import PLUGIN_DIR
from fastcore.script import call_parse, Param
import os
import traceback
```
%% Cell type:code id: tags:
``` python
# hide
from nbdev.showdoc import *
```
%% Cell type:code id: tags:
``` python
# export
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
POD_TARGET_ITEM_ENV = 'POD_TARGET_ITEM'
POD_OWNER_KEY_ENV = 'POD_OWNER'
POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
POD_PLUGIN_DNS_ENV = 'PLUGIN_DNS'
```
%% Cell type:code id: tags:
``` python
# export
class PluginBase(metaclass=ABCMeta):
"""Base class for plugins"""
schema_classes = []
def __init__(self, pluginRun=None, client=None, **kwargs):
super().__init__()
if pluginRun is None:
warnings.warn(
"Plugin needs a pluginRun as kwarg, running without will only work in development.",
RuntimeWarning)
self.pluginRun = pluginRun
if client is None:
raise ValueError("Plugins need a `client: PodClient` as kwarg to run.")
self.client = client
self._status_listeners = []
self._config_dict = kwargs
self._webserver = WebServer(pluginRun.webserverPort or 8080)
if pluginRun is None:
self._webserver = WebServer(8080)
else:
self._webserver = WebServer(pluginRun.webserverPort or 8080)
def set_run_status(self, status):
# TODO sync before setting status (requires pod_client.sync())
if self.pluginRun and self.client:
self.pluginRun.status = status
self.client.update_item(self.pluginRun)
def set_progress(self, progress):
if self.pluginRun and self.client:
self.pluginRun.progress = progress
self.client.update_item(self.pluginRun)
def setup(self):
if self.client and self.pluginRun:
status_abort_listener = get_abort_plugin_listener(self.client, self.pluginRun.id)
self._status_listeners.append(status_abort_listener)
self._webserver.run()
def teardown(self):
for listener in self._status_listeners:
listener.stop()
def _run(self):
self.setup()
self.run()
self.teardown()
@abc.abstractmethod
def run(self):
raise NotImplementedError()
def add_to_schema(self):
"""
Add all schema classes required by the plugin to self.client here.
"""
if len(self.schema_classes):
self.client.add_to_schema(*self.schema_classes)
@classmethod
def get_schema_properties(cls):
schema = []
for item in cls.schema_classes:
item_schema = PodClient._property_dicts_from_type(item)
schema.extend(item_schema)
return schema
@classmethod
def get_schema_edges(cls):
schema = []
for item in cls.schema_classes:
edge_types = item.get_edge_types()
edge_schema = [
{"type": "ItemEdgeSchema",
"edgeName": k,
"sourceType": s,
"targetType": t}
for (k, s, t) in edge_types
]
schema.extend(edge_schema)
return schema
@classmethod
def get_schema(cls, include_edges: bool = True):
schema = cls.get_schema_properties()
if include_edges:
edges = cls.get_schema_edges()
schema.extend(edges)
return schema
```
%% Cell type:code id: tags:
``` python
# export
# hide
class PluginError(Exception):
"""Generic class for plugin errors. This error is raised when a plugin raises an unexpected exception."""
pass
```
%% Cell type:markdown id: tags:
# Plugins
%% Cell type:markdown id: tags:
`PluginBase` is the base class of all Pymemri plugins. You can either build your plugin from scratch, or start with one of our [Plugins Templates](https://gitlab.memri.io/plugins/plugin-templates).
All inheriting classes should implement:
- `PluginBase.run`, which implements the logic of the plugin
- `PluginBase.add_to_schema`, for adding plugin specific item types to the Pod
Note that both the `pluginRun` and `client` arguments are mandatory for running a plugin. When using the `run_plugin` CLI, these are handled for you. For local development and testing, a plugin can be initialized with just a `client`, which results in a `RuntimeWarning`.
%% Cell type:markdown id: tags:
## Example plugin
%% Cell type:markdown id: tags:
Let's use the following plugin as an example of how we can define and run plugins.
%% Cell type:markdown id: tags:
```python
from pymemri.pod.client import Dog
class ExamplePlugin(PluginBase):
schema_classes = [Dog]
def __init__(self, dog_name: str = "Bob", **kwargs):
super().__init__(**kwargs)
self.dog_name = dog_name
def run(self):
print("Started plugin run...")
dog = Dog(self.dog_name, 10)
self.client.create(dog)
print("Run success!")
```
%% Cell type:code id: tags:
``` python
# export
# hide
class ExamplePlugin(PluginBase):
schema_classes = [Dog, Message]
def __init__(self, dog_name: str = "Bob", **kwargs):
super().__init__(**kwargs)
self.dog_name = dog_name
def run(self):
print("Started plugin run...")
dog = Dog(self.dog_name, 10)
self.client.create(dog)
print("Run success!")
```
%% Cell type:code id: tags:
``` python
# hide
example_schema = ExamplePlugin.get_schema()
assert isinstance(example_schema, list)
assert len(example_schema)
```
%% Cell type:markdown id: tags:
### Authentication
%% Cell type:markdown id: tags:
Many plugins use authentication for external services that require passwords or oauth authentication. Pymemri implements some common cases, see `OAuthAuthenticator` or `PasswordAuthenticator`.
%% Cell type:markdown id: tags:
# Helper methods -
%% Cell type:code id: tags:
``` python
# export
# hide
def write_run_info(plugin, id_):
try:
if plugin is None:
raise ValueError("Empty container")
run_path = PLUGIN_DIR / plugin / "current_run.json"
run_path.parent.mkdir(parents=True, exist_ok=True)
print(f"writing run info to {run_path}")
write_json({"id": id_}, run_path)
except Exception as e:
print(f"""failed to write run info to {run_path}\n{e}""")
```
%% Cell type:code id: tags:
``` python
# hide
# export
def get_plugin_cls(plugin_module, plugin_name):
try:
module = importlib.import_module(plugin_module)
plugin_cls = getattr(module, plugin_name)
return plugin_cls
except (ImportError, AttributeError):
raise ImportError(f"Unknown plugin: {plugin_module}.{plugin_name}")
def run_plugin_from_run_id(run_id, client, **kwargs):
"""
Runs a plugin from run_id, initialized with **kwargs.
Args:
client (PodClient): client containing PluginRun
run_id (int): id of the PluginRun
"""
run = client.get(run_id)
write_run_info(run.pluginModule.split(".")[0] if run.pluginModule is not None else run.containerImage, run.id)
plugin_cls = get_plugin_cls(run.pluginModule, run.pluginName)
plugin = plugin_cls(pluginRun=run, client=client, **kwargs)
plugin.add_to_schema()
plugin.set_run_status(RUN_STARTED)
plugin._run()
plugin.pluginRun = plugin.client.get(run_id)
plugin.set_run_status(RUN_COMPLETED)
return plugin
```
%% Cell type:markdown id: tags:
## Run from id test -
%% Cell type:code id: tags:
``` python
# hide
# skip
client = PodClient()
run = PluginRun(
containerImage="pymemri",
pluginModule="pymemri.plugin.pluginbase",
pluginName="ExamplePlugin",
status="not started")
account = Account(identifier="login", secret="password")
run.add_edge("account", account)
assert client.add_to_schema(PluginRun("", "", "", "", ""))
assert client.create(run)
assert client.create(account)
assert client.create_edge(run.get_edges("account")[0])
print(run.to_json())
run_plugin_from_run_id(run.id, client);
run = client.get(run.id)
assert run.status == RUN_COMPLETED
```
%% Cell type:code id: tags:
``` python
# export
# hide
def _parse_env():
env = os.environ
print("Reading `run_plugin()` parameters from environment variables")
try:
pod_full_address = env.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
plugin_run_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
print(plugin_run_json)
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
```
%% Cell type:markdown id: tags:
## Running your plugin using the CLI
%% Cell type:markdown id: tags:
Plugins can be started using the pymemri `run_plugin` or `simulate_run_plugin_from_frontend` CLI. With `run_plugin` the plugin is invoked directly by spawning a new python process, while `simulate_run_plugin_from_frontend` requests the pod to spawn a new process, docker container, or kubernetes container, which in calls `run_plugin` (for more info see `simulate_run_plugin_from_frontend`. When using `run_plugin`, you can either pass your run arguments as parameters, or set them as environment variables. If both are set, the CLI will use the passed arguments.
%% Cell type:code id: tags:
``` python
# export
# hide
@call_parse
def store_keys(path:Param("path to store the keys", str)=DEFAULT_POD_KEY_PATH,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None,
replace: Param("Replace existing stored keys", str)=True):
if not replace:
try:
read_pod_key("database_key")
read_pod_key("owner_key")
print("Existing stored keys found, exiting without generating new keys.")
return
except ValueError:
pass
if database_key is None: database_key = PodClient.generate_random_key()
if owner_key is None: owner_key = PodClient.generate_random_key()
obj = {"database_key": database_key,
"owner_key": owner_key}
Path(path).parent.mkdir(parents=True, exist_ok=True)
if path.exists():
timestr = time.strftime("%Y%m%d-%H%M%S")
path.rename(POD_KEYS_FULL_FOLDER / f"keys-{timestr}.json")
write_json(obj, path)
```
%% Cell type:code id: tags:
``` python
# hide
store_keys(replace=False)
```
%% Cell type:code id: tags:
``` python
# export
# hide
def parse_metadata(fn, remove_container=False):
metadata = read_json(fn)
for k in ["pluginModule", "pluginName"]:
if k not in metadata:
raise ValueError(f"Missing metadata: {k}")
run_vars = {k: v for k, v in metadata.items() if k in PluginRun.properties}
run = PluginRun.from_json(run_vars)
if remove_container:
run.containerImage = "none"
if "account" in metadata:
account = Account.from_json(metadata["account"])
run.add_edge("account", account)
return run
def parse_config(run_config, config_file=None, remove_container=False):
"""
Parse the configuration of the plugin. A configuration is a dict that is passed to the plugin init as kwargs.
If configuration file is defined, the run_config is ignored.
"""
if config_file is not None:
config = read_json(config_file)
elif isinstance(run_config, str) and len(run_config):
config = json.loads(run_config)
else:
config = dict()
if not isinstance(config, dict):
raise ValueError(f"Incorrect plugin config format, expected a dict, got a {type(config)}")
return config
def create_run_expanded(client, run):
client.create(run)
accounts = run.account
if accounts:
account=accounts[0]
client.create(account)
client.create_edge(run.get_edges("account")[0])
```
%% Cell type:code id: tags:
``` python
# export
@call_parse
def run_plugin(
pod_full_address: Param("The pod full address", str) = DEFAULT_POD_ADDRESS,
plugin_run_id: Param("Run id of the plugin to be executed", str) = None,
database_key: Param("Database key of the pod", str) = None,
owner_key: Param("Owner key of the pod", str) = None,
read_args_from_env: Param("Read the args from the environment", bool) = False,
metadata: Param("metadata file for the PluginRun", str) = None,
config_file: Param(
"A plugin configuration, overwrites the configuration of the PluginRun", str
) = None,
):
if read_args_from_env:
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env()
database_key = None
else:
if database_key is None:
database_key = read_pod_key("database_key")
if owner_key is None:
owner_key = read_pod_key("owner_key")
pod_auth_json = None
if POD_PLUGIN_DNS_ENV in os.environ:
print(f"Plugin accessible via {os.environ.get(POD_PLUGIN_DNS_ENV)}:8080")
client = PodClient(
url=pod_full_address,
database_key=database_key,
owner_key=owner_key,
auth_json=pod_auth_json,
)
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
if metadata is not None:
run = parse_metadata(metadata, remove_container=True)
create_run_expanded(client, run)
plugin_run_id = run.id
else:
run = client.get(plugin_run_id)
plugin_config = parse_config(run.config, config_file)
try:
run_plugin_from_run_id(
plugin_run_id, client, **plugin_config
)
except Exception as e:
run = client.get(plugin_run_id)
run.status = RUN_FAILED
client.update_item(run)
print(traceback.format_exc(), flush=True)
raise PluginError("The plugin quit unexpectedly.") from None
```
%% Cell type:markdown id: tags:
To start a plugin on your local machine, you can use the CLI. This will create a client for you, initialize the plugin, and run the code defined in the run method of your plugin.
%% Cell type:code id: tags:
``` python
!run_plugin --metadata "../example_plugin.json"
```
%% Cell type:markdown id: tags:
### Plugin configuration
Often, plugins require some configuration for a run. For example, our `ExamplePlugin` has a `dog_name` argument, which could be different for different runs.
Pymemri handles plugin configuration by passing a dictionary of configuration values to the `__init__` method of the plugin. A possible configuration for the `ExamplePlugin` could be:
```json
{"dog_name": "Alice"}
```
Configuration can be passed to the `run_plugin` CLI in two ways:
- Defined in the `PluginRun` item, as `config` property. This value should be a json serialized dictionary, which is deserialized and passed to the plugin by the CLI
- Defined in a json file and passed to `run_plugin` as a `--config_file` argument. If this option is used, the `config` property from the `PluginRun` is ignored.
%% Cell type:markdown id: tags:
## Run from pod
%% Cell type:markdown id: tags:
In production, we start plugins by making an API call to the pod, which in turn creates an environment for the plugin and starts it using docker containers, kubernetes containers or a shell script. We can start this process using the `simulate_run_plugin_from_frontend` CLI. **Note that when using docker, provided container name should be built within the Pod environment (e.g. `docker build -t pymemri .` for this repo), or available on the memri gitlab container repository.**
%% Cell type:markdown id: tags:
![running a plugin](images/running_a_plugin.svg)
%% Cell type:code id: tags:
``` python
# export
@call_parse
def simulate_run_plugin_from_frontend(
pod_full_address: Param("The pod full address", str) = DEFAULT_POD_ADDRESS,
database_key: Param("Database key of the pod", str) = None,
owner_key: Param("Owner key of the pod", str) = None,
container: Param("Pod container to run frod", str) = None,
plugin_path: Param("Plugin path", str) = None,
metadata: Param("metadata file for the PluginRun", str) = None,
config_file: Param(
"A plugin configuration, overwrites the configuration of the PluginRun", str
) = None,
account_id: Param("Account id to be used inside the plugin", str) = None,
):
if database_key is None:
database_key = read_pod_key("database_key")
if owner_key is None:
owner_key = read_pod_key("owner_key")
params = [pod_full_address, database_key, owner_key]
if None in params:
raise ValueError(f"Missing Pod credentials")
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
print(f"pod_full_address={pod_full_address}\nowner_key={owner_key}\n")
if metadata is not None:
run = parse_metadata(metadata)
create_run_expanded(client, run)
else:
if container is None:
container = plugin_path.split(".", 1)[0]
print(f"Inferred '{container}' as plugin container name")
plugin_module, plugin_name = plugin_path.rsplit(".", 1)
run = PluginRun(container, plugin_module, plugin_name)
if account_id is not None:
account = client.get(account_id)
run.add_edge("account", account)
print(f"Using existing {account}")
client.create(run)
print(
f"Created pluginrun with id {run.id} on {pod_full_address}"
)
plugin_dir = run.containerImage
write_run_info(plugin_dir, run.id)
print(f"*Check the pod log/console for debug output.*")
return run
```
%% Cell type:markdown id: tags:
### Example usage
%% Cell type:code id: tags:
``` python
!simulate_run_plugin_from_frontend --metadata "../example_plugin.json"
```
%% Cell type:markdown id: tags:
## Appendix -
%% Cell type:code id: tags:
``` python
# hide
# client.start_plugin("pymemri", run.id)
```
%% Cell type:code id: tags:
``` python
# hide
# # export
# def generate_test_env(client, indexer_run):
# payload = json.dumps({DATABASE_KEY_ENV: client.database_key, OWNER_KEY_ENV: client.owner_key})
# return {POD_FULL_ADDRESS_ENV: DEFAULT_POD_ADDRESS,
# POD_TARGET_ITEM: indexer_run.id,
# POD_SERVICE_PAYLOAD_ENV: payload}
```
%% Cell type:code id: tags:
``` python
# hide
# run_plugin(env=generate_test_env(client, run))
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted Untitled.ipynb.
Converted Untitled1.ipynb.
Converted Untitled2.ipynb.
Converted basic.ipynb.
Converted cvu.utils.ipynb.
Converted data.dataset.ipynb.
Converted data.loader.ipynb.
Converted data.oauth.ipynb.
Converted data.photo.ipynb.
Converted exporters.exporters.ipynb.
Converted gitlab_api.ipynb.
Converted index.ipynb.
Converted itembase.ipynb.
Converted plugin.authenticators.credentials.ipynb.
Converted plugin.authenticators.oauth.ipynb.
Converted plugin.listeners.ipynb.
Converted plugin.pluginbase.ipynb.
Converted plugin.states.ipynb.
Converted plugins.authenticators.password.ipynb.
Converted pod.api.ipynb.
Converted pod.client.ipynb.
Converted pod.db.ipynb.
Converted pod.utils.ipynb.
Converted template.config.ipynb.
Converted template.formatter.ipynb.
Converted test_owner_key.ipynb.
Converted test_schema.ipynb.
Converted test_utils.ipynb.
Converted wa_dummy_data.ipynb.
......
......@@ -60,7 +60,11 @@ class PluginBase(metaclass=ABCMeta):
self.client = client
self._status_listeners = []
self._config_dict = kwargs
self._webserver = WebServer(pluginRun.webserverPort or 8080)
if pluginRun is None:
self._webserver = WebServer(8080)
else:
self._webserver = WebServer(pluginRun.webserverPort or 8080)
def set_run_status(self, status):
# TODO sync before setting status (requires pod_client.sync())
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment