Commit 8a600abf authored by Eelco van der Wel's avatar Eelco van der Wel 💬
Browse files

Merge branch 'listener-threads' into 'dev'

listeners with threading

See merge request !115
parents cd86b4f5 097bc3bf
Pipeline #4573 passed with stages
in 4 minutes and 4 seconds
......@@ -12,11 +12,11 @@
"# default_exp plugin.listeners"
]
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": null,
"id": "978bb73f",
"metadata": {},
"outputs": [],
"source": [
"# export\n",
......@@ -30,37 +30,41 @@
{
"cell_type": "code",
"execution_count": null,
"id": "8b001a81",
"metadata": {},
"outputs": [],
 
"source": [
"# export\n",
"def status_listener(client, run_id, status, callback, interval=5, verbose=False):\n",
" \"\"\"\n",
" Listens to status field of plugin, and executes callback when run.status == status.\n",
" \"\"\"\n",
" if verbose:\n",
" print(f\"Listening for status='{status}' on Item {run_id}\")\n",
" while True:\n",
" Listens to status field of plugin, and executes callback when run.status == status.\n",
" \"\"\"\n",
" if verbose:\n",
" print(f\"Listening for status='{status}' on Item {run_id}\")\n",
" while True:\n",
" time.sleep(interval)\n",
" callback()\n",
" except Exception as e:\n",
" try:\n",
" run = client.get(run_id)\n",
" if verbose:\n",
" print(\"run status:\", run.status)\n",
" if run.status == status:\n",
" callback()\n",
" except Exception as e:\n",
" print(f\"Could not get run in status listener: {e}\")"
" \n",
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8e80ddf0",
"metadata": {},
"outputs": [],
"source": [
"# export\n",
"def abort_plugin_callback():\n",
" print(\"Status aborted, killing plugin...\")\n",
" pid = os.getpid()\n",
" os.kill(pid, signal.SIGABRT)\n",
"\n",
"def get_abort_plugin_listener(client, run_id, **kwargs):\n",
" listener = Thread(\n",
......@@ -69,15 +73,11 @@
" kwargs=kwargs\n",
" )\n",
" listener.start()\n",
" return listener"
]
]
},
{
"cell_type": "code",
"execution_count": null,
},
{
"cell_type": "code",
"execution_count": null,
"id": "28289b0a",
"metadata": {},
......
......@@ -89,15 +89,14 @@
if client is None:
raise ValueError("Plugins need a `client: PodClient` as kwarg to run.")
self.client = client
self.persistentState = persistentState
# self.listeners = []
# if self.client and self.pluginRun:
# status_abort_listener = get_abort_plugin_listener(client, pluginRun.id)
# status_abort_listener.start()
# self.listeners.append(status_abort_listener)
self._status_listeners = []
if self.client and self.pluginRun:
status_abort_listener = get_abort_plugin_listener(client, pluginRun.id)
self._status_listeners.append(status_abort_listener)
def set_run_status(self, status):
# TODO sync before setting status (requires pod_client.sync())
if self.pluginRun and self.client:
self.pluginRun.status = status
......@@ -247,45 +246,10 @@
run = client.get(run.id)
assert run.status == RUN_COMPLETED
```
%%%% Output: error
---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
<ipython-input-10-8d03030f17d6> in <module>
16 print(run.to_json())
17
---> 18 run_plugin_from_run_id(run.id, client);
19
20 run = client.get(run.id)
<ipython-input-9-0059839fa739> in run_plugin_from_run_id(run_id, client)
21
22 plugin_cls = get_plugin_cls(run.pluginModule, run.pluginName)
---> 23 plugin = plugin_cls(pluginRun=run, client=client)
24 plugin.add_to_schema()
25
~/projects/pymemri/pymemri/plugin/pluginbase.py in __init__(self, **kwargs)
93
94 def __init__(self, **kwargs):
---> 95 super().__init__(**kwargs)
96
97 def run(self):
~/projects/pymemri/pymemri/plugin/pluginbase.py in __init__(self, pluginRun, client, persistentState, **kwargs)
62 self.listeners = []
63 if self.client and self.pluginRun:
---> 64 status_abort_listener = get_abort_plugin_listener(client, pluginRun.id)
65 status_abort_listener.start()
66 self.listeners.append(status_abort_listener)
~/projects/pymemri/pymemri/plugin/listeners.py in get_abort_plugin_listener(client, pluginRun)
37 return Process(
38 target=status_listener,
---> 39 args=(client, pluginRun.id, "aborted", abort_process_callback)
40 )
AttributeError: 'str' object has no attribute 'id'
%% Cell type:code id: tags:
``` python
# export
# hide
......@@ -419,55 +383,10 @@
``` python
run_plugin(config_file="../example_config.json")
```
%%%% Output: error
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-19-8327b2c35617> in <module>
----> 1 run_plugin(config_file="../example_config.json")
<ipython-input-15-d798f9c604c1> in run_plugin(pod_full_address, plugin_run_id, database_key, owner_key, read_args_from_env, config_file)
19 print(f"Plugin accesible via {os.environ.get(POD_PLUGIN_DNS_ENV)}:8080")
20
---> 21 client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
22 auth_json=pod_auth_json)
23
~/projects/pymemri/pymemri/pod/client.py in __init__(self, url, version, database_key, owner_key, auth_json, verbose, register_base_schema)
50 self.local_db = DB()
51 self.registered_classes=dict()
---> 52 self.register_base_schemas()
53
54 @classmethod
~/projects/pymemri/pymemri/pod/client.py in register_base_schemas(self)
62 def register_base_schemas(self):
63
---> 64 assert self.add_to_schema(
65 PluginRun, CVUStoredDefinition, Account, Photo
66 )
~/projects/pymemri/pymemri/pod/client.py in add_to_schema(self, *items)
146 for item in items:
147 if isinstance(item, type):
--> 148 property_dicts = self._property_dicts_from_type(item)
149 else:
150 property_dicts = self._property_dicts_from_instance(item)
~/projects/pymemri/pymemri/pod/client.py in _property_dicts_from_type(self, item)
132 def _property_dicts_from_type(self, item):
133 create_items = []
--> 134 for property, p_type in item.get_property_types().items():
135 p_type = self.TYPE_TO_SCHEMA[p_type]
136 create_items.append({
~/projects/pymemri/pymemri/data/itembase.py in get_property_types(cls, dates)
266 mro = cls.mro()
267 property_types = dict()
--> 268 for basecls in reversed(mro[:mro.index(ItemBase)]):
269 property_types.update(basecls.__init__.__annotations__)
270 property_types = {k: v for k, v in property_types.items() if k in cls.properties}
ValueError: <class 'pymemri.data.itembase.ItemBase'> is not in list
%% Cell type:markdown id: tags:
> Note: The data that is created here should be in the pod in order for this to work
%% Cell type:markdown id: tags:
......
......@@ -28,7 +28,7 @@ index = {"read_file": "basic.ipynb",
"read_username_password": "plugin.authenticators.credentials.ipynb",
"OAuthAuthenticator": "plugin.authenticators.oauth.ipynb",
"status_listener": "plugin.listeners.ipynb",
"abort_process_callback": "plugin.listeners.ipynb",
"abort_plugin_callback": "plugin.listeners.ipynb",
"get_abort_plugin_listener": "plugin.listeners.ipynb",
"POD_FULL_ADDRESS_ENV": "plugin.pluginbase.ipynb",
"POD_TARGET_ITEM_ENV": "plugin.pluginbase.ipynb",
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.listeners.ipynb (unless otherwise specified).
__all__ = ['status_listener', 'abort_process_callback', 'get_abort_plugin_listener']
__all__ = ['status_listener', 'abort_plugin_callback', 'get_abort_plugin_listener']
# Cell
import time
import os
import signal
from multiprocessing import Process
from threading import Thread
# Cell
def status_listener(client, run_id, status, callback, interval=5, verbose=False):
......@@ -21,20 +21,23 @@ def status_listener(client, run_id, status, callback, interval=5, verbose=False)
try:
run = client.get(run_id)
if verbose:
print(run.status)
print("run status:", run.status)
if run.status == status:
callback()
except Exception as e:
print(f"Could not get run in status listener: {e}")
def abort_process_callback():
# Cell
def abort_plugin_callback():
print("Status aborted, killing plugin...")
pgid = os.getpgid(os.getpid())
os.killpg(pgid, signal.SIGABRT)
pid = os.getpid()
os.kill(pid, signal.SIGABRT)
# Cell
def get_abort_plugin_listener(client, run_id):
return Process(
def get_abort_plugin_listener(client, run_id, **kwargs):
listener = Thread(
target=status_listener,
args=(client, run_id, "aborted", abort_process_callback)
)
\ No newline at end of file
args=(client, run_id, "aborted", abort_plugin_callback),
kwargs=kwargs
)
listener.start()
return listener
\ No newline at end of file
......@@ -59,11 +59,10 @@ class PluginBase(metaclass=ABCMeta):
self.client = client
self.persistentState = persistentState
# self.listeners = []
# if self.client and self.pluginRun:
# status_abort_listener = get_abort_plugin_listener(client, pluginRun.id)
# status_abort_listener.start()
# self.listeners.append(status_abort_listener)
self._status_listeners = []
if self.client and self.pluginRun:
status_abort_listener = get_abort_plugin_listener(client, pluginRun.id)
self._status_listeners.append(status_abort_listener)
def set_run_status(self, status):
# TODO sync before setting status (requires pod_client.sync())
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment