Commit 9f68c317 authored by Koen van der Veen's avatar Koen van der Veen
Browse files

Run plugin from pod

parent e2150b13
Showing with 338 additions and 127 deletions
+338 -127
......@@ -14,7 +14,7 @@ RUN touch ./README.md
# Install dependencies
RUN python3 setup.py egg_info
RUN pip3 install -r integrators.egg-info/requires.txt
RUN pip3 install -r pymemri.egg-info/requires.txt
# Copy the real project-s sources (docker caching is broken from here onwards)
......@@ -23,10 +23,10 @@ COPY ./README.md ./README.md
COPY ./tools ./tools
COPY ./pymemri ./pymemri
COPY ./nbs ./nbs
COPY ./test ./test
# Build the final image
RUN pip3 install --editable .
# CMD ["python3", "tools/run_integrator.py"]
CMD ["run_plugin"]
......@@ -254,7 +254,7 @@ nb_path: "nbs/plugin.pluginbase.ipynb"
<div class="output_markdown rendered_html output_subarea ">
<h4 id="run_plugin" class="doc_header"><code>run_plugin</code><a href="https://gitlab.memri.io/memri/pymemri/tree/prod/pymemri/plugin/pluginbase.py#L145" class="source_link" style="float:right">[source]</a></h4><blockquote><p><code>run_plugin</code>(<strong><code>pod_full_address</code></strong>:<code>Param object at 0x7fad54cdaa10&gt;</code>=<em><code>None</code></em>, <strong><code>plugin_run_id</code></strong>:<code>Param object at 0x7fad54cdaa50&gt;</code>=<em><code>None</code></em>, <strong><code>database_key</code></strong>:<code>Param object at 0x7fad54cdacd0&gt;</code>=<em><code>None</code></em>, <strong><code>owner_key</code></strong>:<code>Param object at 0x7fad54cdaa90&gt;</code>=<em><code>None</code></em>)</p>
<h4 id="run_plugin" class="doc_header"><code>run_plugin</code><a href="https://gitlab.memri.io/memri/pymemri/tree/prod/pymemri/plugin/pluginbase.py#L139" class="source_link" style="float:right">[source]</a></h4><blockquote><p><code>run_plugin</code>(<strong><code>pod_full_address</code></strong>:<code>Param object at 0x7fdc47df5e90&gt;</code>=<em><code>None</code></em>, <strong><code>plugin_run_id</code></strong>:<code>Param object at 0x7fdc47a4d7d0&gt;</code>=<em><code>None</code></em>, <strong><code>database_key</code></strong>:<code>Param object at 0x7fdc496dcf10&gt;</code>=<em><code>None</code></em>, <strong><code>owner_key</code></strong>:<code>Param object at 0x7fdc496dcf90&gt;</code>=<em><code>None</code></em>, <strong><code>from_pod</code></strong>:<code>Param object at 0x7fdc44203c50&gt;</code>=<em><code>False</code></em>, <strong><code>container</code></strong>:<code>Param object at 0x7fdc4a0afad0&gt;</code>=<em><code>None</code></em>)</p>
</blockquote>
</div>
......@@ -274,6 +274,13 @@ nb_path: "nbs/plugin.pluginbase.ipynb"
</div>
{% endraw %}
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>To start a plugin on your local machine, you can use the CLI. This will create a client for you, and run the code defined in <code>&lt;myplugin&gt;.run()</code></p>
</div>
</div>
</div>
{% raw %}
<div class="cell border-box-sizing code_cell rendered">
......@@ -297,9 +304,9 @@ nb_path: "nbs/plugin.pluginbase.ipynb"
<div class="output_subarea output_stream output_stdout output_text">
<pre>Used arguments passed to `run_plugin()` (ignoring environment)
pod_full_address=http://localhost:3030
plugin_run_id=50a298368a040592adfd8c1c801af959
database_key=6184197375534391316725372949916181999760839224725764742871635308
owner_key=3780693072719641989650646548306482314355897960660743337420186235
plugin_run_id=0f32df2e21355877f6a7a5a52121ff9e
owner_key=1818730013390615608004006544815007352548160219130568937676646480
auth_json=None
running
</pre>
......@@ -312,6 +319,59 @@ running
</div>
{% endraw %}
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>In production, we start plugins by making an API call to the pod, which in turn creates an environment for the plugin and starts it (currently on docker is supported). We can start this process using the CLI by provding <code>--from_pod==True</code> and providing a <code>--container</code> (the docker container used by the pod). <strong>Note that the provided docker container should be installed within the Pod environemnt (e.g. <code>docker build -t pymemri .</code> for this repo) in order to start it.</strong></p>
</div>
</div>
</div>
{% raw %}
<div class="cell border-box-sizing code_cell rendered">
<div class="input">
<div class="inner_cell">
<div class="input_area">
<div class=" highlight hl-ipython3"><pre><span></span><span class="o">!</span>run_plugin --pod_full_address<span class="o">=</span><span class="nv">$DEFAULT_POD_ADDRESS</span> --plugin_run_id<span class="o">=</span><span class="nv">$run</span>.id --owner_key<span class="o">=</span><span class="nv">$client</span>.owner_key <span class="err">\</span>
<span class="o">--</span><span class="n">database_key</span><span class="o">=</span><span class="err">$</span><span class="n">client</span><span class="o">.</span><span class="n">database_key</span> <span class="o">--</span><span class="n">from_pod</span><span class="o">=</span><span class="kc">True</span><span class="p">,</span> <span class="o">--</span><span class="n">container</span><span class="o">=</span><span class="s2">&quot;pymemri&quot;</span>
</pre></div>
</div>
</div>
</div>
<div class="output_wrapper">
<div class="output">
<div class="output_area">
<div class="output_subarea output_stream output_stdout output_text">
<pre>Used arguments passed to `run_plugin()` (ignoring environment)
pod_full_address=http://localhost:3030
plugin_run_id=0f32df2e21355877f6a7a5a52121ff9e
owner_key=1818730013390615608004006544815007352548160219130568937676646480
auth_json=None
calling the `create` api on http://localhost:3030 to make your Pod start a plugin with id 0f32df2e21355877f6a7a5a52121ff9e.
*Check the pod log/console for debug output.*
</pre>
</div>
</div>
</div>
</div>
</div>
{% endraw %}
<div class="cell border-box-sizing text_cell rendered"><div class="inner_cell">
<div class="text_cell_render border-box-sizing rendered_html">
<p>{% include note.html content='The data that was created earlier (PluginRun, plugin) should be in the pod in order for this to work' %}</p>
</div>
</div>
</div>
</div>
......@@ -54,7 +54,7 @@ nb_path: "nbs/pod.client.ipynb"
<div class="output_markdown rendered_html output_subarea ">
<h2 id="PodClient" class="doc_header"><code>class</code> <code>PodClient</code><a href="https://gitlab.memri.io/memri/pymemri/tree/prod/pymemri/pod/client.py#L18" class="source_link" style="float:right">[source]</a></h2><blockquote><p><code>PodClient</code>(<strong><code>url</code></strong>=<em><code>'http://localhost:3030'</code></em>, <strong><code>version</code></strong>=<em><code>'v3'</code></em>, <strong><code>database_key</code></strong>=<em><code>None</code></em>, <strong><code>owner_key</code></strong>=<em><code>None</code></em>)</p>
<h2 id="PodClient" class="doc_header"><code>class</code> <code>PodClient</code><a href="https://gitlab.memri.io/memri/pymemri/tree/prod/pymemri/pod/client.py#L18" class="source_link" style="float:right">[source]</a></h2><blockquote><p><code>PodClient</code>(<strong><code>url</code></strong>=<em><code>'http://localhost:3030'</code></em>, <strong><code>version</code></strong>=<em><code>'v3'</code></em>, <strong><code>database_key</code></strong>=<em><code>None</code></em>, <strong><code>owner_key</code></strong>=<em><code>None</code></em>, <strong><code>auth_json</code></strong>=<em><code>None</code></em>)</p>
</blockquote>
</div>
......@@ -364,7 +364,7 @@ Property myAge not defined in Schema (attempted to use it for json value 20) -->
<div class="output_text output_subarea output_execute_result">
<pre>[{&#39;item&#39;: Person (#806a88787321a9a81054e63d17ad2fbb), &#39;name&#39;: &#39;sender&#39;}]</pre>
<pre>[{&#39;item&#39;: Person (#1282de00d7dbd7d3e24d36ffe180b0d3), &#39;name&#39;: &#39;sender&#39;}]</pre>
</div>
</div>
......@@ -540,9 +540,9 @@ Property myAge not defined in Schema (attempted to use it for json value 20) -->
<div class="output_text output_subarea output_execute_result">
<pre>[Person (#806a88787321a9a81054e63d17ad2fbb),
Person (#1f8e823a9b9ddaedf8dd37c9682beb0b),
Person (#cba8e451efdfe7be87dfeb10444ee670)]</pre>
<pre>[Person (#1282de00d7dbd7d3e24d36ffe180b0d3),
Person (#4726309701370ded76ba0a734cf04b55),
Person (#9241966f2df21487b5f83a0926c7bd49)]</pre>
</div>
</div>
......@@ -790,7 +790,7 @@ Property myAge not defined in Schema (attempted to use it for json value 20) -->
<div class="output_text output_subarea output_execute_result">
<pre>IPhoto (#424cce3929a63f5849c40c181e0ce252)</pre>
<pre>IPhoto (#84f2b73f6ea6326a9cb26b47ba62cc41)</pre>
</div>
</div>
......
%% Cell type:code id: tags:
``` python
# hide
from pymemri import *
from pymemri.plugin.pluginbase import run_plugin
```
%% Cell type:markdown id: tags:
# pymemri
> Pymemri is a python client for the Memri Personal online datastore (pod). This client can be used to build plugins in python. Plugins connect and add the information to your Pod. Plugins that <b>import your data from external services</b> are called **Importers** (Gmail, WhatsApp, etc.). Plugins that <b>connect new data to the existing data</b> are called **indexers** (face recognition, spam detection, object detection, etc.). Lastly there are plugins that <b>execute actions</b> (sending messages, uploading files).
%% Cell type:markdown id: tags:
[![Gitlab pipeline status (self-hosted)](https://img.shields.io/gitlab/pipeline/memri/pymemri/dev?gitlab_url=https%3A%2F%2Fgitlab.memri.io&label=CI&logo=gitlab&style=plastic)](https://gitlab.memri.io/memri/pymemri/-/pipelines/latest)
[![Discourse status](https://img.shields.io/discourse/status?color=00A850&label=Discourse&logo=data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAACAAAAAhCAYAAAC4JqlRAAAEhUlEQVR42r2XA5RjyxqFM7Y90ymd6uCNbdu2bdu2bdu2bdu27dmvUtfpk9zu5K6utXZU+Pb/l3IsISlCiFiCiBKS0m4GpdMNxpa5JCmbZFDeQTKWR0oZyfJfF8l5IQVdr6DfDcrwL3qnDM3knKf2GyysIoMacKcSfNAvg7AllNIkvrDDCMJ6uQZRgj9S2Xjpmppgk1MlShRNULrcD6iZvhqM1QkOP5xqvMnTQIFKJQlHZ6WhAQJDrALtrFz/JoNjhLHaXumqwRizjjYlF/S4gt5PZuAst+Fk9lS4UDg9LhRJj9O502B3quTowARs3k18Vusqo/lKJ7yiWafsCrwtwMA9BT6eISWe106DtzUF3jV04kP/Avg0tQo+z6iGj0OK4nXzzDhSMRtyO6Q3E3ftdnuMf8CTJ08eUVXccG+ck3KcVPArARL3q2XS4A/dc+DboT7Ah2XA11X/1JeV+HFxFB6Pr4Y6xTJ4McF7/jP1lLZ3b+RQ2quivqrgz2upqOtIfN3YVkPcwKa6cmwASlXO7vGsMAwjoYar/RFe/fDIvVF/K9dpf1g9A97WDcT3k4NNQN7V52B3JM+R0dSEIKz3n6ece2VypYvJBI5nTKXT/m1755DCtZ68W4hMi1pABDqCmiDs2u/p5yPcK+sSHT1e1EmDj4MK+wL/U0V3tQBp1R2mu4sQoQywPe4Vo6wcp6RdR68WlV8Gep/tDT57CLg9RdBpoLS6y8BD94qFAQKncqfF+/aZ/lx0vmrejRGwL2uJBJXbmmSB9nMZ+OxesUoZuFwqo9rjlf2Bay25NQpyTX3E6LbQbB1MsZjNzXSrwNVymfB1XSu/DYy/MhBsdW1EGnfILAPzTA30JBwX1EHydUt7vw00PtYRtlWNEHHSCbMMTDY1kJcwfex+WdnML/ivrythXVcN9sVdEGnM/iAcyVgfbcBMa41AfJhYyS8Da++OhWVpacSbOR8xuswxux2reDRQmnBcq5UT+LzCJ/hX1S/VljoIWF0DlpUvkLB8syAMIQTxaCA/5bhcMSt+vV3ik4FWx7vo6BPMnoqwi+6bnIb0vEUVjwaaE4EXE2v4BO9+uqeG25a1hWXVa8Sr3cvsLujs1cC0XGnw8+bEEIHvv12A4nuaa7h9RUtYVjxB5JG7Ibh0j/65lDKmVwNbmxX/x+A/v6zCroeTsPX+BLz7uPTP31+9X4z1d8eh7pH2iLS8HKIsLw86b7CK/CUiTjwG7kxttvrbargnA4FKN1d21YAjj6ei9f4+SL+0M6yzh4HMHq73dZxVlRB1RXkdbfhlZcFW14FY0BfhF17QaY/RbQG4zWn2L/kyYyyyVwNZUjoxeHpHFOrVGLJuQyQpVA3WTHkhpB0sZQbEbjEG4WddRvhF55QuqVX+REPDLnqA6L2WICBncc8PLYT8T4PdDPgkki4HkuUpg6QFKsGaOR+ECPTW/ofBWBkN9cGAXxKUfZCMldbAUDdA2D1JaVoNC10D+mlo7J/bLfQM0PuSsMH6mA1OUS6n+iNB+UQF7a8+15aEJLeEsPwfm4dbxw/8yD4AAAAASUVORK5CYII=&server=https%3A%2F%2Fdiscourse.memri.io)](https://discourse.memri.io)
[![Twitter URL](https://img.shields.io/twitter/url?label=%40YourMemri&logo=twitter&style=plastic&url=https%3A%2F%2Ftwitter.com%2FYourMemri)](https://twitter.com/YourMemri)
%% Cell type:markdown id: tags:
This repository is built with [nbdev](https://github.com/fastai/nbdev), which means that the repo structure has a few differences compared to a standard python repo.
%% Cell type:markdown id: tags:
## Installing
%% Cell type:markdown id: tags:
### As a package
```bash
pip install pymemri
```
%% Cell type:markdown id: tags:
### Development
To install the Python package, and correctly setup nbdev for development run:
```bash
pip install -e . && nbdev_install_git_hooks
```
The last command configures git to automatically clean metadata from your notebooks before a commit.
%% Cell type:markdown id: tags:
## Quickstart
To use the pymemri `PodClient`, we first need to have a pod running. The quickest way to do this is to install from the [pod repo](https://gitlab.memri.io/memri/pod), and run `./examples/run_development.sh` from within that repo.
```
from pymemri.data.schema import *
from pymemri.pod.client import *
class Dog(Item):
def __init__(self, name, age, id=None, deleted=None):
super().__init__(id=id, deleted=deleted)
self.name = name
self.age = age
@classmethod
def from_json(cls, json):
id = json.get("id", None)
name = json.get("name", None)
age = json.get("age", None)
return cls(id=id,name=name,age=age)
client = PodClient()
example_dog = Dog("max", 2)
client.add_to_schema(example_dog)
dog = Dog("bob", 3)
client.create(dog)
```
%% Cell type:markdown id: tags:
## Running a plugin
%% Cell type:markdown id: tags:
After installation, users can use the plugin CLI to manually run a plugin. For more information, see `run_plugin`.
%% Cell type:markdown id: tags:
```bash
run_plugin --pod_full_address=<pod_address> --plugin_run_id=<plugin_run_id> --owner_key=<owner_key> \
--database_key=<dabase_key>
```
%% Cell type:markdown id: tags:
## Docs
- [pymemri docs](http://memri.docs.memri.io/pymemri/pod.client.html#File-API)
- [plugin tutorial](https://blog.memri.io/getting-started-building-a-plugin/)
%% Cell type:markdown id: tags:
## Nbdev & Jupyter Notebooks
The Python integrators are written in [nbdev](https://nbdev.fast.ai/) ([video](https://www.youtube.com/watch?v=9Q6sLbz37gk&t=1301s)). With nbdev, it is encouraged to write code in
[Jupyter Notebooks](https://jupyter.readthedocs.io/en/latest/install/notebook-classic.html). Nbdev syncs all the notebooks in `/nbs` with the python code in `/pymemri`. Tests are written side by side with the code in the notebooks, and documentation is automatically generated from the code and markdown in the notebooks and exported into the `/docs` folder. Check out the [nbdev quickstart](wiki/nbdev_quickstart.md) for an introduction, **watch the video linked above**, or see the [nbdev documentation](https://nbdev.fast.ai/) for a all functionalities and tutorials.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
%load_ext autoreload
%autoreload 2
# default_exp plugin.pluginbase
```
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.schema import *
from pymemri.pod.client import PodClient, DEFAULT_POD_ADDRESS
from pymemri.imports import *
from os import environ
from abc import ABCMeta
import abc
import json
```
%% Cell type:code id: tags:
``` python
# hide
from nbdev.showdoc import *
```
%% Cell type:markdown id: tags:
# Plugins
%% Cell type:code id: tags:
``` python
# export
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
RUN_UID_ENV = 'RUN_ID'
POD_SERVICE_PAYLOAD_ENV = 'POD_SERVICE_PAYLOAD'
DATABASE_KEY_ENV = 'databaseKey'
OWNER_KEY_ENV = 'ownerKey'
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
POD_TARGET_ITEM_ENV = 'POD_TARGET_ITEM'
POD_OWNER_KEY_ENV = 'POD_OWNER'
POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
```
%% Cell type:code id: tags:
``` python
# export
# hide
class PluginBase(Item, metaclass=ABCMeta):
"""Base class for plugins"""
properties = Item.properties + ["name", "repository", "icon", "data_query", "bundleImage",
"runDestination", "pluginClass", "pluginPackage"]
edges = Item.edges + ["PluginRun"]
def __init__(self, name=None, repository=None, icon=None, query=None, bundleImage=None, runDestination=None,
pluginClass=None, indexerRun=None, **kwargs):
if pluginClass is None: pluginClass=self.__class__.__name__
self.pluginPackage=None
super().__init__(**kwargs)
self.name = name
self.repository = repository
self.icon = icon
self.query = query
self.bundleImage = bundleImage
self.runDestination = runDestination
self.pluginClass = pluginClass
self.indexerRun = indexerRun if indexerRun is not None else []
@abc.abstractmethod
def run(self):
raise NotImplementedError()
@abc.abstractmethod
def add_to_schema(self):
raise NotImplementedError()
```
%% Cell type:code id: tags:
``` python
# export
# hide
class PluginRun(Item):
properties = Item.properties
edges = Item.edges + ["plugin"]
def __init__(self, plugin=None, **kwargs):
super().__init__(**kwargs)
self.plugin=plugin if plugin is not None else []
```
%% Cell type:markdown id: tags:
## Creating a plugin
%% Cell type:markdown id: tags:
The memri [pod](https://gitlab.memri.io/memri/pod) uses a plugin system to add features to the backend memri backend. Plugins, can import your data (importers), change your data (indexers), or call other serivces. Users can define their own plugins to add new behaviour to their memri app. Let's use the following plugin as an example of how we can start plugins.
%% Cell type:code id: tags:
``` python
# export
# hide
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pluginPackage="pymemri.plugin.pluginbase"
def run(self, run, client):
print("running")
client.create(MyItem("some person", 20))
def add_to_schema(self, client):
client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:markdown id: tags:
Memri plugins need to define at least 2 methods: `.run()` and `.add_to_schema()`. `.run()` defines the logic of the plugin. `.add_to_schema()` defines the schema for the plugin in the pod. Note that currently, `add_to_schema` requires all item to **have all properties defined that are used in the plugin**. In the future, we might replace add_to_schema, to be done automatically, based on a declarative schema defined in the plugin.
%% Cell type:code id: tags:
``` python
MyPlugin()
```
%% Output
MyPlugin (#None)
%% Cell type:markdown id: tags:
```python
class MyItem(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
class MyPlugin(PluginBase):
""""""
properties = PluginBase.properties
edges= PluginBase.edges
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.pluginPackage="pymemri.plugin.pluginbase"
def run(self, run, client):
print("running")
client.create(MyItem("some person", 20))
def add_to_schema(self, client):
client.add_to_schema(MyItem("my name", 10))
```
%% Cell type:code id: tags:
``` python
from pymemri.pod.client import PodClient
client = PodClient()
```
%% Cell type:code id: tags:
``` python
assert client.add_to_schema(MyPlugin(name="abc", data_query="abc"))
assert client.add_to_schema(PluginRun())
```
%% Cell type:code id: tags:
``` python
plugin = MyPlugin(name="abc", data_query="abc")
run = PluginRun()
run.add_edge("plugin", plugin)
```
%% Cell type:code id: tags:
``` python
client.create(run)
client.create(plugin)
client.create_edge(run.get_edges("plugin")[0]);
```
%% Cell type:code id: tags:
``` python
run = client.get(run.id)
```
%% Cell type:markdown id: tags:
## Running your plugin
%% Cell type:markdown id: tags:
Plugins can be started using the pymemri `run_plugin` CLI. To use the CLI, you can either pass your run arguments as parameters, or set them as environment variables. If both are set, the CLI will prefer the passed arguments.
%% Cell type:code id: tags:
``` python
# hide
# export
def run_plugin_from_run_id(run_id, client):
run = client.get(run_id)
plugins = run.plugin
if len(plugins) == 0:
raise ValueError(f"plugin run {run_id} has no plugin attached to it. Make sure there is a 'plugin' \
edge from your run to the actual plugin object.")
if len(plugins) > 1:
raise ValueError("Too many plugins attached to run")
plugin = plugins[0]
plugin.add_to_schema(client)
plugin.run(run, client)
```
%% Cell type:code id: tags:
``` python
# export
# hide
def register_base_classes(client):
try:
assert client.add_to_schema(PluginRun())
except Exception as e:
raise ValueError("Could not add base schema")
```
%% Cell type:code id: tags:
``` python
# hide
run_plugin_from_run_id(run.id, client)
```
%% Output
running
%% Cell type:code id: tags:
``` python
# export
def _run_plugin(pod_full_address=None, plugin_run_id=None, database_key=None, owner_key=None,
verbose=False):
def _run_plugin(client, plugin_run_id=None, verbose=False):
"""Runs an plugin, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with plugins)."""
if verbose:
for name, val in [("pod_full_address", pod_full_address), ("plugin_run_id", plugin_run_id),
("database_key", database_key), ("owner_key", owner_key)]:
print(f"{name}={val}")
print()
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
register_base_classes(client)
run_plugin_from_run_id(plugin_run_id, client)
```
%% Cell type:code id: tags:
``` python
# hide
_run_plugin(pod_full_address=DEFAULT_POD_ADDRESS, plugin_run_id=run.id,
database_key=client.database_key, owner_key=client.owner_key)
_run_plugin(client=client, plugin_run_id=run.id)
```
%% Output
running
%% Cell type:markdown id: tags:
### CLI
%% Cell type:code id: tags:
``` python
# export
# hide
def _parse_env(env):
try:
pod_full_address = env.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
plugin_run_id = str(env[RUN_UID_ENV])
pod_service_payload = json.loads(env[POD_SERVICE_PAYLOAD_ENV])
database_key = pod_service_payload[DATABASE_KEY_ENV]
owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_service_payload, database_key, owner_key
plugin_run_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
print(plugin_run_json)
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
# database_key = pod_service_payload[DATABASE_KEY_ENV]
# owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
```
%% Cell type:code id: tags:
``` python
# export
from fastscript import *
import os
@call_parse
def run_plugin(pod_full_address:Param("The pod full address", str)=None,
plugin_run_id:Param("Run id of the plugin to be executed", str)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None):
owner_key:Param("Owner key of the pod", str)=None,
from_pod:Param("Run by calling the pod", bool)=False,
container:Param("Pod container to run frod", str)=None):
env = os.environ
params = [pod_full_address, plugin_run_id, database_key, owner_key]
if all([p is None for p in params]):
print("Reading `run_plugin()` parameters from environment variables")
pod_full_address, plugin_run_id, pod_service_payload, database_key, owner_key = _parse_env(env)
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env(env)
database_key=None
else:
print("Used arguments passed to `run_plugin()` (ignoring environment)")
pod_auth_json=None
if (None in params):
raise ValueError(f"Defined some params to run indexer, but not all. Missing \
{[p for p in params if p is None]}")
_run_plugin(pod_full_address=pod_full_address, plugin_run_id=plugin_run_id,
database_key=database_key, owner_key=owner_key, verbose=True)
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
auth_json=pod_auth_json)
for name, val in [("pod_full_address", pod_full_address), ("plugin_run_id", plugin_run_id),
("owner_key", owner_key), ("auth_json", pod_auth_json)]:
print(f"{name}={val}")
print()
if from_pod:
print(f"calling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {plugin_run_id}.")
print(f"*Check the pod log/console for debug output.*")
client.start_plugin("pymemri", plugin_run_id)
else:
_run_plugin(client=client, plugin_run_id=plugin_run_id)
```
%% Cell type:markdown id: tags:
To start a plugin on your local machine, you can use the CLI. This will create a client for you, and run the code defined in `<myplugin>.run()`
%% Cell type:code id: tags:
``` python
!run_plugin --pod_full_address=$DEFAULT_POD_ADDRESS --plugin_run_id=$run.id --owner_key=$client.owner_key \
--database_key=$client.database_key
```
%% Output
Used arguments passed to `run_plugin()` (ignoring environment)
pod_full_address=http://localhost:3030
plugin_run_id=50a298368a040592adfd8c1c801af959
database_key=6184197375534391316725372949916181999760839224725764742871635308
owner_key=3780693072719641989650646548306482314355897960660743337420186235
plugin_run_id=0f32df2e21355877f6a7a5a52121ff9e
owner_key=1818730013390615608004006544815007352548160219130568937676646480
auth_json=None
running
%% Cell type:markdown id: tags:
## Run plugin from pod -
%% Cell type:code id: tags:
``` python
# export
# hide
class StartPlugin(Item):
properties = Item.properties + ["container", "targetItemId"]
edges = Item.edges
def __init__(self, container=None, targetItemId=None, **kwargs):
super().__init__(**kwargs)
self.container = container
self.targetItemId = targetItemId
```
%% Cell type:markdown id: tags:
In production, we start plugins by making an API call to the pod, which in turn creates an environment for the plugin and starts it (currently on docker is supported). We can start this process using the CLI by provding `--from_pod==True` and providing a `--container` (the docker container used by the pod). **Note that the provided docker container should be installed within the Pod environemnt (e.g. `docker build -t pymemri .` for this repo) in order to start it.**
%% Cell type:code id: tags:
``` python
!run_plugin --pod_full_address=$DEFAULT_POD_ADDRESS --plugin_run_id=$run.id --owner_key=$client.owner_key \
--database_key=$client.database_key --from_pod=True, --container="pymemri"
```
%% Output
Used arguments passed to `run_plugin()` (ignoring environment)
pod_full_address=http://localhost:3030
plugin_run_id=0f32df2e21355877f6a7a5a52121ff9e
owner_key=1818730013390615608004006544815007352548160219130568937676646480
auth_json=None
calling the `create` api on http://localhost:3030 to make your Pod start a plugin with id 0f32df2e21355877f6a7a5a52121ff9e.
*Check the pod log/console for debug output.*
%% Cell type:markdown id: tags:
> Note: The data that was created earlier (PluginRun, plugin) should be in the pod in order for this to work
%% Cell type:code id: tags:
``` python
# hide
# client.start_plugin("pymemri", run.id)
```
%% Cell type:markdown id: tags:
## Running a Plugin by providing environment variables -
%% Cell type:code id: tags:
``` python
# hide
# # export
# def generate_test_env(client, indexer_run):
# payload = json.dumps({DATABASE_KEY_ENV: client.database_key, OWNER_KEY_ENV: client.owner_key})
# return {POD_FULL_ADDRESS_ENV: DEFAULT_POD_ADDRESS,
# RUN_UID_ENV: indexer_run.id,
# POD_TARGET_ITEM: indexer_run.id,
# POD_SERVICE_PAYLOAD_ENV: payload}
```
%% Cell type:code id: tags:
``` python
# hide
# run_plugin(env=generate_test_env(client, run))
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted pod.client.ipynb.
%% Cell type:code id: tags:
``` python
```
......
%% Cell type:code id: tags:
``` python
# default_exp pod.client
%load_ext autoreload
%autoreload 2
```
%% Cell type:markdown id: tags:
# Pod Client
%% Cell type:code id: tags:
``` python
# export
from pymemri.data.basic import *
from pymemri.data.schema import *
from pymemri.data.itembase import Edge, ItemBase, Item
from pymemri.data.photo import Photo
from pymemri.imports import *
from hashlib import sha256
```
%% Cell type:code id: tags:
``` python
# export
DEFAULT_POD_ADDRESS = "http://localhost:3030"
POD_VERSION = "v3"
```
%% Cell type:code id: tags:
``` python
# export
class PodClient:
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None):
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None):
self.url = url
self.version = POD_VERSION
self.test_connection(verbose=False)
self.database_key=database_key if database_key is not None else self.generate_random_key()
self.owner_key=owner_key if owner_key is not None else self.generate_random_key()
self.base_url = f"{url}/{version}/{self.owner_key}"
self.auth_json = {"type":"ClientAuth", "databaseKey": self.database_key}
self.auth_json = {"type":"ClientAuth","databaseKey":self.database_key} if auth_json is None \
else {**{"type": "PluginAuth"}, **auth_json}
self.registered_classes=dict()
@staticmethod
def generate_random_key():
return "".join([str(random.randint(0, 9)) for i in range(64)])
def test_connection(self, verbose=True):
try:
res = requests.get(self.url)
if verbose: print("Succesfully connected to pod")
return True
except requests.exceptions.RequestException as e:
print("Could no connect to backend")
return False
def create(self, node):
if isinstance(node, Photo) and not self.create_photo_file(node): return False
try:
properties = self.get_properties_json(node)
properties = {k:v for k, v in properties.items() if v != []}
body = {"auth": self.auth_json, "payload":properties}
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
ItemBase.add_to_db(node)
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def add_to_schema(self, node):
self.registered_classes[node.__class__.__name__] = type(node)
attributes = self.get_properties_json(node)
for k, v in attributes.items():
if not isinstance(v, list) and k != "type":
if isinstance(v, str):
value_type = "Text"
elif isinstance(v, int):
value_type = "Integer"
payload = {"type": "ItemPropertySchema", "itemType": attributes["type"],
"propertyName": k, "valueType": value_type}
body = {"auth": self.auth_json, "payload": payload }
try:
result = requests.post(f"{self.base_url}/create_item", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
id = result.json()
node.id = id
ItemBase.add_to_db(node)
except requests.exceptions.RequestException as e:
print(e)
return False
return True
def create_photo_file(self, photo):
file = photo.file[0]
self.create(file)
return self._upload_image(photo.data)
def _upload_image(self, arr):
return self.upload_file(arr.tobytes())
def upload_file(self, file):
# TODO: currently this only works for numpy images
try:
sha = sha256(file).hexdigest()
result = requests.post(f"{self.base_url}/upload_file/{self.database_key}/{sha}", data=file)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def get_file(self, sha):
# TODO: currently this only works for numpy images
try:
body= {"auth": self.auth_json,
"payload": {"sha256": sha}}
result = requests.post(f"{self.base_url}/get_file", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
return result.content
except requests.exceptions.RequestException as e:
print(e)
return None
def get_photo(self, id, size=640):
photo = self.get(id)
self._load_photo_data(photo, size=size)
return photo
def _load_photo_data(self, photo, size=None):
if len(photo.file) > 0 and photo.data is None:
file = self.get_file(photo.file[0].sha256)
if file is None:
print(f"Could not load data of {photo} attached file item does not have data in pod")
return
data = np.frombuffer(file, dtype=np.uint8)
c = photo.channels
shape = (photo.height,photo.width, c) if c is not None and c > 1 else (photo.height, photo.width)
data = data.reshape(shape)
if size is not None: data = resize(data, size)
photo.data = data
return
print(f"could not load data of {photo}, no file attached")
def create_if_external_id_not_exists(self, node):
if not self.external_id_exists(node):
self.create(node)
def external_id_exists(self, node):
if node.externalId is None: return False
existing = self.search({"externalId": node.externalId})
return len(existing) > 0
def create_edges(self, edges):
"""Create edges between nodes, edges should be of format [{"_type": "friend", "_source": 1, "_target": 2}]"""
create_edges = []
for e in edges:
src, target = e.source.id, e.target.id
if src is None or target is None:
print(f"Could not create edge {e} missing source or target id")
return False
data = {"_source": src, "_target": target, "_type": e._type}
if e.label is not None: data[LABEL] = e.label
if e.sequence is not None: data[SEQUENCE] = e.sequence
if e.reverse:
data2 = copy(data)
data2["_source"] = target
data2["_target"] = src
data2["_type"] = "~" + data2["_type"]
create_edges.append(data2)
create_edges.append(data)
return self.bulk_action(create_items=[], update_items=[],create_edges=create_edges)
def delete_items(self, items):
ids = [i.id for i in items]
return self.bulk_action(delete_items=ids)
def delete_all(self):
items = self.get_all_items()
self.delete_items(items)
def bulk_action(self, create_items=None, update_items=None, create_edges=None, delete_items=None):
create_items = create_items if create_items is not None else []
update_items = update_items if update_items is not None else []
create_edges = create_edges if create_edges is not None else []
delete_items = delete_items if delete_items is not None else []
edges_data = {"databaseKey": self.database_key, "payload": {
"createItems": create_items, "updateItems": update_items,
"createEdges": create_edges, "deleteItems": delete_items}}
try:
result = requests.post(f"{self.base_url}/bulk_action",
json=edges_data)
if result.status_code != 200:
if "UNIQUE constraint failed" in str(result.content):
print(result.status_code, "Edge already exists")
else:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
def create_edge(self, edge):
payload = {"_source": edge.source.id, "_target": edge.target.id, "_name": edge._type}
body = {"auth": self.auth_json,
"payload": payload}
try:
result = requests.post(f"{self.base_url}/create_edge", json=body)
if result.status_code != 200:
print(result, result.content)
return False
else:
return True
except requests.exceptions.RequestException as e:
print(e)
return False
return self.create_edges([edge])
def get(self, id, expanded=True):
if not expanded:
res = self._get_item_with_properties(id)
else:
res = self._get_item_expanded(id)
if res is None:
return None
elif res.deleted == True:
print(f"Item with id {id} does not exist anymore")
return None
else:
return res
def get_all_items(self):
raise NotImplementedError()
try:
body = { "databaseKey": self.database_key, "payload":None}
result = requests.post(f"{self.base_url}/get_all_items", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
res = [self.item_from_json(x) for x in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
print(e)
return None
def filter_deleted(self, items):
return [i for i in items if not i.deleted == True]
def _get_item_expanded(self, id):
item = self.get(id, expanded=False)
edges = self.get_edges(id)
for e in edges:
item.add_edge(e["name"], e["item"])
return item
# body = {"payload": [id],
# "databaseKey": self.database_key}
# try:
# result = requests.post(f"{self.base_url}/get_items_with_edges",
# json=body)
# if result.status_code != 200:
# print(result, result.content)
# return None
# else:
# json = result.json()[0]
# res = self.item_from_json(json)
# return res
# except requests.exceptions.RequestException as e:
# print(e)
# return None
def get_edges(self, id):
body = {"payload": {"item": str(id),
"direction": "Outgoing",
"expandItems": True},
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/get_edges", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
for d in json:
d["item"] = self.item_from_json(d["item"])
# res = self.item_from_json(json[0])
return json
except requests.exceptions.RequestException as e:
print(e)
return None
def _get_item_with_properties(self, id):
try:
body = {"auth": self.auth_json,
"payload": str(id)}
result = requests.post(f"{self.base_url}/get_item", json=body)
if result.status_code != 200:
print(result, result.content)
return None
else:
json = result.json()
if json == []:
return None
else:
res = self.item_from_json(json[0])
return res
except requests.exceptions.RequestException as e:
print(e)
return None
def get_properties_json(self, node, dates=True):
DATE_KEYS = ['dateCreated', 'dateModified', 'dateServerModified']
res = dict()
private = getattr(node, "private", [])
for k, v in node.__dict__.items():
if k[:1] != '_' and k != "private" and k not in private and not (isinstance(v, list)) \
and v is not None and (not (dates == False and k in DATE_KEYS)):
res[k] = v
res["type"] = self._get_schema_type(node)
return res
@staticmethod
def _get_schema_type(node):
for cls in node.__class__.mro():
# if cls.__module__ == "pymemri.data.schema" and cls.__name__ != "ItemBase":
if cls.__name__ != "ItemBase":
return cls.__name__
raise ValueError
def update_item(self, node):
data = self.get_properties_json(node, dates=False)
if "type" in data:
del data["type"]
if "deleted" in data:
del data["deleted"]
id = data["id"]
body = {"payload": data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/update_item",
json=body)
if result.status_code != 200:
print(result, result.content)
except requests.exceptions.RequestException as e:
print(e)
def search(self, fields_data):
body = {"payload": fields_data,
"auth": self.auth_json}
try:
result = requests.post(f"{self.base_url}/search", json=body)
json = result.json()
res = [self.item_from_json(item) for item in json]
return self.filter_deleted(res)
except requests.exceptions.RequestException as e:
return None
def search_last_added(self, type=None, with_prop=None, with_val=None):
query = {"_limit": 1, "_sortOrder": "Desc"}
if type is not None:
query["type"] = type
if with_prop is not None:
query[f"{with_prop}=="] = with_val
return client.search(query)[0]
def item_from_json(self, json):
plugin_class = json.get("pluginClass", None)
plugin_package = json.get("pluginPackage", None)
constructor = get_constructor(json["type"], plugin_class, plugin_package=plugin_package,
extra=self.registered_classes)
new_item = constructor.from_json(json)
existing = ItemBase.global_db.get(new_item.id)
# TODO: cleanup
if existing is not None:
if not existing.is_expanded() and new_item.is_expanded():
for edge_name in new_item.get_all_edge_names():
edges = new_item.get_edges(edge_name)
for e in edges:
e.source = existing
existing.__setattr__(edge_name, edges)
for prop_name in new_item.get_property_names():
existing.__setattr__(prop_name, new_item.__getattribute__(prop_name))
return existing
else:
return new_item
def get_properties(self, expanded):
properties = copy(expanded)
if ALL_EDGES in properties: del properties[ALL_EDGES]
return properties
def run_importer(self, id, servicePayload):
def start_plugin(self, container: str, target_item_id):
# to prevent circular dependency: REFACTOR
from pymemri.plugin.pluginbase import StartPlugin
start_plugin_item = StartPlugin(container=container, targetItemId=target_item_id)
self.create(start_plugin_item)
body = dict()
body["databaseKey"] = servicePayload["databaseKey"]
body["payload"] = {"id": id, "servicePayload": servicePayload}
print(body)
try:
res = requests.post(f"{self.base_url}/run_importer", json=body)
if res.status_code != 200:
print(f"Failed to start importer on {url}:\n{res.status_code}: {res.text}")
else:
print("Starting importer")
except requests.exceptions.RequestException as e:
print("Error with calling importer {e}")
# def run_importer(self, id, servicePayload):
# body = dict()
# body["databaseKey"] = servicePayload["databaseKey"]
# body["payload"] = {"id": id, "servicePayload": servicePayload}
# print(body)
# try:
# res = requests.post(f"{self.base_url}/run_importer", json=body)
# if res.status_code != 200:
# print(f"Failed to start importer on {url}:\n{res.status_code}: {res.text}")
# else:
# print("Starting importer")
# except requests.exceptions.RequestException as e:
# print("Error with calling importer {e}")
```
%% Cell type:markdown id: tags:
Pymemri communicates with the pod via the `PodClient`. The PodClient requires you to provide a [database key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials) and an [owner key](https://gitlab.memri.io/memri/pod/-/blob/dev/docs/HTTP_API.md#user-content-api-authentication-credentials). During development, you don't have to worry about these keys, you can just omit the keys when initializing the `PodClient`, which creates a new user by defining random keys. *Note that this will create a new database for your every time you create a PodClient, if you want to access the same database with multiple PodClients, you have to set the same keys* When you are using the app, setting the keys in the pod, and passing them when calling an integrator is handled for you by the app itself.
%% Cell type:code id: tags:
``` python
client = PodClient()
success = client.test_connection()
assert success
```
%% Output
Succesfully connected to pod
%% Cell type:markdown id: tags:
## Creating Items and Edges
%% Cell type:markdown id: tags:
Now that we have access to the pod, we can create items here and upload them to the pod. All items are defined in the schema of the pod. When Initializing an Item, always make sure to use the from_data classmethod to initialize.
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
email_item
```
%% Output
EmailMessage (#None)
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(email_item)
assert succes
```
%% Cell type:code id: tags:
``` python
email_item = EmailMessage.from_data(content="example content field")
client.create(email_item)
```
%% Output
True
%% Cell type:markdown id: tags:
<!-- [08:09:30 vasya@vn971 pod] curl -X POST -H "Content-Type: application/json" --insecure "http://localhost:3030/v3/03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314/create_item" -d '{"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99", "payload": {"type": "ItemPropertySchema", "itemType": "Person", "propertyName": "age", "valueType": "integer"}}'
16[08:09:31 vasya@vn971 pod]
[08:09:32 vasya@vn971 pod]
[08:09:32 vasya@vn971 pod]
[08:09:33 vasya@vn971 pod] curl -X POST -H "Content-Type: application/json" --insecure "http://localhost:3030/v3/03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314/create_item" -d '{"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99", "payload": {"type": "Person", "age": 20}}'
17[08:10:02 vasya@vn971 pod]
[08:10:04 vasya@vn971 pod]
[08:10:04 vasya@vn971 pod]
[08:10:05 vasya@vn971 pod] curl -X POST -H "Content-Type: application/json" --insecure "http://localhost:3030/v3/03170a2e7597b7b7e3d84c05391d139a62b157e78786d8c082f29dcf4c111314/create_item" -d '{"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99", "payload": {"type": "Person", "myAge": 20}}'
Property myAge not defined in Schema (attempted to use it for json value 20) -->
%% Cell type:markdown id: tags:
We can easily define our own types, and use them in the pod.
%% Cell type:code id: tags:
``` python
class Dog(Item):
properties = Item.properties + ["name", "age"]
edges = Item.edges
def __init__(self, name=None, age=None, **kwargs):
super().__init__(**kwargs)
self.name = name
self.age = age
```
%% Cell type:code id: tags:
``` python
dog = Dog("max", 2)
client.add_to_schema(dog);
dog2 = Dog("bob", 3)
client.create(dog2);
```
%% Cell type:code id: tags:
``` python
dog_from_db = client.get(dog2.id, expanded=False)
```
%% Cell type:markdown id: tags:
We can connect items using edges. Let's create another item, a person, and connect the email and the person.
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
succes = client.add_to_schema(person_item)
assert succes
```
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice", lastName="X")
item_succes = client.create(person_item)
edge = Edge(email_item, person_item, "sender")
edge_succes = client.create_edge(edge)
assert item_succes and edge_succes
```
%% Cell type:code id: tags:
``` python
client.get_edges(email_item.id)
```
%% Output
[{'item': Person (#806a88787321a9a81054e63d17ad2fbb), 'name': 'sender'}]
[{'item': Person (#1282de00d7dbd7d3e24d36ffe180b0d3), 'name': 'sender'}]
%% Cell type:markdown id: tags:
If we use the normal `client.get` (without `expanded=False`), we also get items directly connected to the Item.
%% Cell type:code id: tags:
``` python
email_from_db = client.get(email_item.id)
```
%% Cell type:code id: tags:
``` python
assert isinstance(email_from_db.sender[0], Person)
```
%% Cell type:markdown id: tags:
# Fetching and updating Items
%% Cell type:markdown id: tags:
## Normal Items
%% Cell type:markdown id: tags:
We can use the client to fetch data from the database. This is in particular useful for indexers, which often use data in the database as input for their models. The simplest form of querying the database is by querying items in the pod by their id (unique identifier).
%% Cell type:code id: tags:
``` python
person_item = Person.from_data(firstName="Alice")
assert client.create(person_item)
```
%% Cell type:code id: tags:
``` python
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db is not None
assert person_from_db == person_item
assert person_from_db.id is not None
```
%% Cell type:markdown id: tags:
Appart from creating, we might want to update existing items:
%% Cell type:code id: tags:
``` python
person_item.lastName = "Awesome"
client.update_item(person_item)
person_from_db = client.get(person_item.id, expanded=False)
assert person_from_db.lastName == "Awesome"
```
%% Cell type:markdown id: tags:
When we don't know the ids of the items we want to fetch, we can also search by property. We can use this for instance when we want to query all items from a particular type to perform some indexing on. We can get all `Person` Items from the db by:
%% Cell type:markdown id: tags:
## Search
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Bob")
client.create(person_item2);
all_people = client.search({"type": "Person"})
assert all([isinstance(p, Person) for p in all_people]) and len(all_people) > 0
all_people[:3]
```
%% Output
[Person (#806a88787321a9a81054e63d17ad2fbb),
Person (#1f8e823a9b9ddaedf8dd37c9682beb0b),
Person (#cba8e451efdfe7be87dfeb10444ee670)]
[Person (#1282de00d7dbd7d3e24d36ffe180b0d3),
Person (#4726309701370ded76ba0a734cf04b55),
Person (#9241966f2df21487b5f83a0926c7bd49)]
%% Cell type:markdown id: tags:
## Search last added items
%% Cell type:code id: tags:
``` python
person_item2 = Person.from_data(firstName="Last Person")
client.create(person_item2);
```
%% Cell type:code id: tags:
``` python
assert client.search_last_added(type="Person").firstName == "Last Person"
```
%% Cell type:markdown id: tags:
In the near future, Pod will support searching by user defined properties as well. This will allow for the following. **warning, this is currently not supported**
%% Cell type:markdown id: tags:
```client.search_last_added(type="Person", with_prop="ImportedBy", with_val="EmailImporter")```
%% Cell type:markdown id: tags:
## Uploading & downloading files
%% Cell type:markdown id: tags:
### File API
%% Cell type:markdown id: tags:
To work with files, the `PodClient` has a file api. The file api works by posting a blob to the `upload_file` endpoint, and creating an Item with a property with the same sha256 as the sha used in the endpoint.
%% Cell type:code id: tags:
``` python
from pymemri.data.photo import *
```
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
file = photo.file[0]
succes = client.create(file)
succes2 = client._upload_image(x)
assert succes
assert succes2
```
%% Cell type:code id: tags:
``` python
data = client.get_file(file.sha256)
arr = np.frombuffer(data, dtype=np.uint8)
assert (arr.reshape(640,640) == x).all()
```
%% Cell type:markdown id: tags:
### Photo API
%% Cell type:markdown id: tags:
For photos we do this automatically using `PodClient.create` on a Photo and `PodClient.get_photo`:
%% Cell type:code id: tags:
``` python
x = np.random.randint(0, 255+1, size=(640, 640), dtype=np.uint8)
photo = IPhoto.from_np(x)
```
%% Cell type:code id: tags:
``` python
succes = client.add_to_schema(IPhoto.from_np(x))
```
%% Cell type:code id: tags:
``` python
assert client.create(photo)
```
%% Cell type:code id: tags:
``` python
res = client.get_photo(photo.id, size=640)
```
%% Cell type:code id: tags:
``` python
res
```
%% Output
IPhoto (#424cce3929a63f5849c40c181e0ce252)
IPhoto (#84f2b73f6ea6326a9cb26b47ba62cc41)
%% Cell type:code id: tags:
``` python
assert (res.data == x).all()
```
%% Cell type:markdown id: tags:
# Check if an item exists -
%% Cell type:code id: tags:
``` python
# hide
# person_item = Person.from_data(firstName="Eve", externalId="gmail_1")
# person_item2 = Person.from_data(firstName="Eve2", externalId="gmail_1")
# client.create_if_external_id_not_exists(person_item)
# client.create_if_external_id_not_exists(person_item2)
# existing = client.search({"externalId": "gmail_1"})
# assert len(existing) == 1
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Resetting the db -
%% Cell type:code id: tags:
``` python
# client.delete_all()
```
%% Cell type:markdown id: tags:
# Export -
%% Cell type:code id: tags:
``` python
# hide
from nbdev.export import *
notebook2script()
```
%% Output
Converted basic.ipynb.
Converted data.photo.ipynb.
Converted importers.Importer.ipynb.
Converted importers.util.ipynb.
Converted index.ipynb.
Converted indexers.indexer.ipynb.
Converted itembase.ipynb.
Converted plugin.pluginbase.ipynb.
Converted pod.client.ipynb.
%% Cell type:code id: tags:
``` python
```
......
......@@ -25,10 +25,10 @@ index = {"read_file": "basic.ipynb",
"get_indexer_run_data": "indexers.indexer.ipynb",
"test_registration": "indexers.indexer.ipynb",
"POD_FULL_ADDRESS_ENV": "plugin.pluginbase.ipynb",
"RUN_UID_ENV": "plugin.pluginbase.ipynb",
"POD_SERVICE_PAYLOAD_ENV": "plugin.pluginbase.ipynb",
"DATABASE_KEY_ENV": "plugin.pluginbase.ipynb",
"OWNER_KEY_ENV": "plugin.pluginbase.ipynb",
"RUN_UID_ENV": "indexers.indexer.ipynb",
"POD_SERVICE_PAYLOAD_ENV": "indexers.indexer.ipynb",
"DATABASE_KEY_ENV": "indexers.indexer.ipynb",
"OWNER_KEY_ENV": "indexers.indexer.ipynb",
"run_importer": "indexers.indexer.ipynb",
"run_integrator_from_run_id": "indexers.indexer.ipynb",
"run_integrator": "indexers.indexer.ipynb",
......@@ -39,6 +39,9 @@ index = {"read_file": "basic.ipynb",
"Edge": "itembase.ipynb",
"ItemBase": "itembase.ipynb",
"Item": "itembase.ipynb",
"POD_TARGET_ITEM_ENV": "plugin.pluginbase.ipynb",
"POD_OWNER_KEY_ENV": "plugin.pluginbase.ipynb",
"POD_AUTH_JSON_ENV": "plugin.pluginbase.ipynb",
"PluginBase": "plugin.pluginbase.ipynb",
"PluginRun": "plugin.pluginbase.ipynb",
"MyItem": "plugin.pluginbase.ipynb",
......@@ -46,6 +49,7 @@ index = {"read_file": "basic.ipynb",
"run_plugin_from_run_id": "plugin.pluginbase.ipynb",
"register_base_classes": "plugin.pluginbase.ipynb",
"run_plugin": "plugin.pluginbase.ipynb",
"StartPlugin": "plugin.pluginbase.ipynb",
"DEFAULT_POD_ADDRESS": "pod.client.ipynb",
"POD_VERSION": "pod.client.ipynb",
"PodClient": "pod.client.ipynb"}
......
# AUTOGENERATED! DO NOT EDIT! File to edit: nbs/plugin.pluginbase.ipynb (unless otherwise specified).
__all__ = ['POD_FULL_ADDRESS_ENV', 'RUN_UID_ENV', 'POD_SERVICE_PAYLOAD_ENV', 'DATABASE_KEY_ENV', 'OWNER_KEY_ENV',
'PluginBase', 'PluginRun', 'MyItem', 'MyPlugin', 'run_plugin_from_run_id', 'register_base_classes',
'run_plugin']
__all__ = ['POD_FULL_ADDRESS_ENV', 'POD_TARGET_ITEM_ENV', 'POD_OWNER_KEY_ENV', 'POD_AUTH_JSON_ENV', 'PluginBase',
'PluginRun', 'MyItem', 'MyPlugin', 'run_plugin_from_run_id', 'register_base_classes', 'run_plugin',
'StartPlugin']
# Cell
from ..data.schema import *
......@@ -11,13 +11,13 @@ from ..imports import *
from os import environ
from abc import ABCMeta
import abc
import json
# Cell
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
RUN_UID_ENV = 'RUN_ID'
POD_SERVICE_PAYLOAD_ENV = 'POD_SERVICE_PAYLOAD'
DATABASE_KEY_ENV = 'databaseKey'
OWNER_KEY_ENV = 'ownerKey'
POD_FULL_ADDRESS_ENV = 'POD_FULL_ADDRESS'
POD_TARGET_ITEM_ENV = 'POD_TARGET_ITEM'
POD_OWNER_KEY_ENV = 'POD_OWNER'
POD_AUTH_JSON_ENV = 'POD_AUTH_JSON'
# Cell
# hide
......@@ -108,17 +108,9 @@ def register_base_classes(client):
raise ValueError("Could not add base schema")
# Cell
def _run_plugin(pod_full_address=None, plugin_run_id=None, database_key=None, owner_key=None,
verbose=False):
def _run_plugin(client, plugin_run_id=None, verbose=False):
"""Runs an plugin, you can either provide the run settings as parameters to this function (for local testing)
or via environment variables (this is how the pod communicates with plugins)."""
if verbose:
for name, val in [("pod_full_address", pod_full_address), ("plugin_run_id", plugin_run_id),
("database_key", database_key), ("owner_key", owner_key)]:
print(f"{name}={val}")
print()
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key)
register_base_classes(client)
run_plugin_from_run_id(plugin_run_id, client)
......@@ -128,12 +120,14 @@ def _run_plugin(pod_full_address=None, plugin_run_id=None, database_key=None, ow
def _parse_env(env):
try:
pod_full_address = env.get(POD_FULL_ADDRESS_ENV, DEFAULT_POD_ADDRESS)
plugin_run_id = str(env[RUN_UID_ENV])
pod_service_payload = json.loads(env[POD_SERVICE_PAYLOAD_ENV])
database_key = pod_service_payload[DATABASE_KEY_ENV]
owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_service_payload, database_key, owner_key
plugin_run_json = json.loads(str(env[POD_TARGET_ITEM_ENV]))
print(plugin_run_json)
plugin_run_id = plugin_run_json["id"]
owner_key = env.get(POD_OWNER_KEY_ENV)
pod_auth_json = json.loads(str(env.get(POD_AUTH_JSON_ENV)))
# database_key = pod_service_payload[DATABASE_KEY_ENV]
# owner_key = pod_service_payload[OWNER_KEY_ENV]
return pod_full_address, plugin_run_id, pod_auth_json, owner_key
except KeyError as e:
raise Exception('Missing parameter: {}'.format(e)) from None
......@@ -146,19 +140,43 @@ import os
def run_plugin(pod_full_address:Param("The pod full address", str)=None,
plugin_run_id:Param("Run id of the plugin to be executed", str)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None):
owner_key:Param("Owner key of the pod", str)=None,
from_pod:Param("Run by calling the pod", bool)=False,
container:Param("Pod container to run frod", str)=None):
env = os.environ
params = [pod_full_address, plugin_run_id, database_key, owner_key]
if all([p is None for p in params]):
print("Reading `run_plugin()` parameters from environment variables")
pod_full_address, plugin_run_id, pod_service_payload, database_key, owner_key = _parse_env(env)
pod_full_address, plugin_run_id, pod_auth_json, owner_key = _parse_env(env)
database_key=None
else:
print("Used arguments passed to `run_plugin()` (ignoring environment)")
pod_auth_json=None
if (None in params):
raise ValueError(f"Defined some params to run indexer, but not all. Missing \
{[p for p in params if p is None]}")
client = PodClient(url=pod_full_address, database_key=database_key, owner_key=owner_key,
auth_json=pod_auth_json)
for name, val in [("pod_full_address", pod_full_address), ("plugin_run_id", plugin_run_id),
("owner_key", owner_key), ("auth_json", pod_auth_json)]:
print(f"{name}={val}")
print()
if from_pod:
print(f"calling the `create` api on {pod_full_address} to make your Pod start "
f"a plugin with id {plugin_run_id}.")
print(f"*Check the pod log/console for debug output.*")
client.start_plugin("pymemri", plugin_run_id)
else:
_run_plugin(client=client, plugin_run_id=plugin_run_id)
_run_plugin(pod_full_address=pod_full_address, plugin_run_id=plugin_run_id,
database_key=database_key, owner_key=owner_key, verbose=True)
\ No newline at end of file
# Cell
# hide
class StartPlugin(Item):
properties = Item.properties + ["container", "targetItemId"]
edges = Item.edges
def __init__(self, container=None, targetItemId=None, **kwargs):
super().__init__(**kwargs)
self.container = container
self.targetItemId = targetItemId
\ No newline at end of file
......@@ -17,14 +17,17 @@ POD_VERSION = "v3"
# Cell
class PodClient:
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None):
def __init__(self, url=DEFAULT_POD_ADDRESS, version=POD_VERSION, database_key=None, owner_key=None,
auth_json=None):
self.url = url
self.version = POD_VERSION
self.test_connection(verbose=False)
self.database_key=database_key if database_key is not None else self.generate_random_key()
self.owner_key=owner_key if owner_key is not None else self.generate_random_key()
self.base_url = f"{url}/{version}/{self.owner_key}"
self.auth_json = {"type":"ClientAuth", "databaseKey": self.database_key}
self.auth_json = {"type":"ClientAuth","databaseKey":self.database_key} if auth_json is None \
else {**{"type": "PluginAuth"}, **auth_json}
self.registered_classes=dict()
@staticmethod
......@@ -418,18 +421,25 @@ class PodClient:
if ALL_EDGES in properties: del properties[ALL_EDGES]
return properties
def run_importer(self, id, servicePayload):
def start_plugin(self, container: str, target_item_id):
# to prevent circular dependency: REFACTOR
from ..plugin.pluginbase import StartPlugin
start_plugin_item = StartPlugin(container=container, targetItemId=target_item_id)
self.create(start_plugin_item)
body = dict()
body["databaseKey"] = servicePayload["databaseKey"]
body["payload"] = {"id": id, "servicePayload": servicePayload}
print(body)
try:
res = requests.post(f"{self.base_url}/run_importer", json=body)
if res.status_code != 200:
print(f"Failed to start importer on {url}:\n{res.status_code}: {res.text}")
else:
print("Starting importer")
except requests.exceptions.RequestException as e:
print("Error with calling importer {e}")
\ No newline at end of file
# def run_importer(self, id, servicePayload):
# body = dict()
# body["databaseKey"] = servicePayload["databaseKey"]
# body["payload"] = {"id": id, "servicePayload": servicePayload}
# print(body)
# try:
# res = requests.post(f"{self.base_url}/run_importer", json=body)
# if res.status_code != 200:
# print(f"Failed to start importer on {url}:\n{res.status_code}: {res.text}")
# else:
# print("Starting importer")
# except requests.exceptions.RequestException as e:
# print("Error with calling importer {e}")
\ No newline at end of file
......@@ -2,5 +2,5 @@
set -euETo pipefail
# run bash to play around
exec docker run --rm -it --init --name memri-pyintegrators memri-pyintegrators:latest bash
docker run --rm -it --init --name pymemri pymemri:latest bash
from fastscript import *
import os
from pymemri.plugin.pluginbase import run_plugin
@call_parse
def main(pod_full_address:Param("The pod full address", str)=None,
integrator_run_uid:Param("Run uid of the integrator to be executed", int)=None,
database_key:Param("Database key of the pod", str)=None,
owner_key:Param("Owner key of the pod", str)=None):
environment = os.environ
run_plugin(environ=environment, pod_full_address=pod_full_address, integrator_run_uid=integrator_run_uid,
database_key=database_key, owner_key=owner_key, verbose=True)
\ No newline at end of file
#!/usr/bin/env bash
export owner="SRANDOMSRANDOMSRANDOM" # replace with desired owner, or leave as-is for tests
export dbkey="" # note that the Plugin will not have access to this key, it'll only have `POD_AUTH_JSON`
export container="test"
data=$(cat <<-END
{
"auth": {
"type": "ClientAuth",
"databaseKey": "$dbkey"
},
"payload": {
"createItems": [
{"type": "Person", "id": "38583224e56e6d2385d36e05af9caa5e"},
{"type": "StartPlugin", "container": "$container", "targetItemId": "38583224e56e6d2385d36e05af9caa5e"}
],
"updateItems": [],
"deleteItems": []
}
}
END
)
echo "abc"
echo $owner
echo "http://localhost:3030/v3/${owner}/bulk"
curl -X POST -H "Content-Type: application/json" --insecure "http://localhost:3030/v3/$owner/bulk" -d "$data"
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment