Commit 5ec31c9a authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

Sz/shared plugins

parent 2424473d
Showing with 737 additions and 179 deletions
+737 -179
{
"cSpell.words": [
"errno"
]
}
\ No newline at end of file
......@@ -262,13 +262,46 @@ version = "2.34.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0610544180c38b88101fecf2dd634b174a62eef6946f84dfc6a7127512b381c"
dependencies = [
"ansi_term",
"atty",
"bitflags",
"strsim 0.8.0",
"textwrap",
"unicode-width",
"vec_map",
]
[[package]]
name = "clap"
version = "4.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2148adefda54e14492fb9bddcc600b4344c5d1a3123bd666dcb939c6f0e0e57e"
dependencies = [
"atty",
"bitflags",
"clap_derive",
"clap_lex",
"once_cell",
"strsim",
"termcolor",
]
[[package]]
name = "clap_derive"
version = "4.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0177313f9f02afc995627906bbd8967e2be069f5261954222dac78290c2b9014"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "clap_lex"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d4198f73e42b4936b35b5bb248d81d2b595ecb170da0bac7655c54eedfa8da8"
dependencies = [
"os_str_bytes",
]
[[package]]
......@@ -317,7 +350,7 @@ checksum = "b01d6de93b2b6c65e17c634a26653a29d107b3c98c607c765bf38d041531cd8f"
dependencies = [
"atty",
"cast",
"clap",
"clap 2.34.0",
"criterion-plot",
"csv",
"itertools",
......@@ -442,7 +475,7 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
"strsim 0.10.0",
"strsim",
"syn",
]
......@@ -768,12 +801,9 @@ dependencies = [
[[package]]
name = "heck"
version = "0.3.3"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621efb26863f0e9924c6ac577e8275e5e6b77455db64ffa6c65c904e9e132c"
dependencies = [
"unicode-segmentation",
]
checksum = "2540771e65fc8cb83cd6e8a237f70c319bd5c29f78ed1084ba5d50eeac86f7f9"
[[package]]
name = "hermit-abi"
......@@ -1012,6 +1042,7 @@ name = "libpod"
version = "0.4.4"
dependencies = [
"chacha20poly1305",
"clap 4.0.26",
"criterion",
"duct",
"futures",
......@@ -1036,7 +1067,6 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"sha2",
"structopt",
"test-log",
"time",
"tokio",
......@@ -1342,6 +1372,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "os_str_bytes"
version = "6.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e"
[[package]]
name = "parking_lot"
version = "0.12.1"
......@@ -2080,42 +2116,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "strsim"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ea5119cdb4c55b55d432abb513a0429384878c15dde60cc77b1c99de1a95a6a"
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "structopt"
version = "0.3.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c6b5c64445ba8094a6ab0c3cd2ad323e07171012d9c98b0b15651daf1787a10"
dependencies = [
"clap",
"lazy_static",
"structopt-derive",
]
[[package]]
name = "structopt-derive"
version = "0.4.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb5ae327f9cc13b68763b5749770cb9e048a99bd9dfdfa58d0cf05d5f64afe0"
dependencies = [
"heck",
"proc-macro-error",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "subtle"
version = "2.4.1"
......@@ -2147,6 +2153,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "termcolor"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755"
dependencies = [
"winapi-util",
]
[[package]]
name = "test-context"
version = "0.1.4"
......@@ -2524,12 +2539,6 @@ dependencies = [
"tinyvec",
]
[[package]]
name = "unicode-segmentation"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8820f5d777f6224dc4be3632222971ac30164d4a258d595640799554ebfd99"
[[package]]
name = "unicode-width"
version = "0.1.9"
......@@ -2592,12 +2601,6 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
[[package]]
name = "vec_map"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1bddf1187be692e79c5ffeab891132dfb0f236ed36a43c7ed39f1165ee20191"
[[package]]
name = "version_check"
version = "0.9.4"
......
......@@ -5,11 +5,15 @@ Plugins are various "external" components that can enrich your data,
help you import your data from external services, push some data outside if you want, etc.
This page explains how Pod runs plugins, and how other components can interact with a running plugin.
## Types of plugins
The pod supports two types of plugins: *user plugins* and *shared plugins*. *User plugins* can be accesed only by the user that created them, while *shared plugins* can be accessed by any user of the pod.
## Starting a plugin
The pod launches a plugin when a `PluginRun` item is created by a client. This item contains the address of the docker image the Pod should launch, and an optional run configuration. Additionally, the plugin can record its `status` and `progress` in this `PluginRun` Item. The full schema of a `PluginRun` can be found in the [Memri schema](https://gitlab.memri.io/memri/schema/-/blob/dev/types/PluginRun.json).
The [Next section](Plugins.md#manually-trigger-a-plugin-via-pod-api) describes how to manually start any plugin. However, both the [Pymemri](https://gitlab.memri.io/memri/pymemri) and [Flutter](https://gitlab.memri.io/memri/flutter-app) clients have functionalties that handle this for you. **If you just want to run a specific plugin, the readme file of a plugin contains all information to run your plugin.** For example, check out the [readme of our Whatsapp importer](https://gitlab.memri.io/memri/plugins/whatsapp-multi-device/-/blob/dev/README.md).
The [Next section](Plugins.md#manually-trigger-a-plugin-via-pod-api) describes how to manually start a user plugin. However, both the [Pymemri](https://gitlab.memri.io/memri/pymemri) and [Flutter](https://gitlab.memri.io/memri/flutter-app) clients have functionalties that handle this for you. **If you just want to run a specific plugin, the readme file of a plugin contains all information to run your plugin.** For example, check out the [readme of our Whatsapp importer](https://gitlab.memri.io/memri/plugins/whatsapp-multi-device/-/blob/dev/README.md).
## Obtaining plugin logs
......@@ -22,7 +26,7 @@ Some plugins have a webserver running that other parts of the stack can communic
## Inspecting imported data
When running a plugin, you might want to inspect what data are imported to your Pod. To do this, you can go to the Pod explorer at https://data.memri.io/. Here you can browse all items present in your database.
# Manually start a plugin via the Pod HTTP API
# Manually start a user plugin via the Pod HTTP API
During development, you can use the following script to make Pod start a new Plugin (container)
```sh
owner="$RANDOM$RANDOM$RANDOM$RANDOM" # replace with desired owner, or leave as-is for tests
......@@ -93,6 +97,19 @@ docker run \
"$containerImage"
```
# Starting a shared plugin
Shared plugins are started when we launch the pod. In order to start a shared plugin we need to pass 1) global pod credentials (pod owner key and pod db key) 2) metadata for the shared plugin that we want to start. The structure of the docker command to do this is the following:
```
docker run \
--network=host \
--env=POD_FULL_ADDRESS="http://$network:3030" \
--db-key-for-pod=7320341416590555278508317261703440721943536547535252249031848567 \
--owner-key-for-pod=6720591057213996348155174697790657950314144236776158336148993643 \
--shared-plugins='{"type": "PluginRun","id": "566d75e8-6163-4f04-a5eb-7a6239dc7459", "containerImage": "gitlab.memri.io:5050/memri/<my-repo>:main-latest","pluginModule": "<my_module>","pluginName": "<my_class>","status": "idle","targetItemId": "566d75e8-6163-4f04-a5eb-7a6239dc7459"}' \
```
# Development script overrides
During development, you can override Pod to execute a local script in your system
instead of a docker container. This allows you testing new versions of your plugin incrementally,
......
......@@ -43,7 +43,7 @@ serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_path_to_error = { workspace = true }
sha2 = { workspace = true }
structopt = { version = "0.3.25", features = ["color", "suggestions"] }
clap = {version = "4", features = ["derive", "env", "color"]}
time = { version = "0.3.5", features = ["formatting", "macros"] }
tokio = { workspace = true }
tracing = { workspace = true }
......
......@@ -22,5 +22,8 @@ pub fn default_cli() -> CliOptions {
email_smtp_port: 465,
email_smtp_user: None,
email_smtp_password: None,
owner_key_for_pod: None,
db_key_for_pod: None,
shared_plugins: Vec::new(),
}
}
-- pluginModule
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"5156888c-d388-4680-a6b0-653f35b709ad",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "5156888c-d388-4680-a6b0-653f35b709ad"),
"itemType", "PluginRun"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "5156888c-d388-4680-a6b0-653f35b709ad"),
"propertyName", "pluginModule"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "5156888c-d388-4680-a6b0-653f35b709ad"),
"valueType", "Text"
);
-- pluginName
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"1009572e-4d42-4119-9f72-aaa423ed27a6",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1009572e-4d42-4119-9f72-aaa423ed27a6"),
"itemType", "PluginRun"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1009572e-4d42-4119-9f72-aaa423ed27a6"),
"propertyName", "pluginName"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1009572e-4d42-4119-9f72-aaa423ed27a6"),
"valueType", "Text"
);
-- status
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"968607b0-f5d7-4743-8172-3900d2454db6",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "968607b0-f5d7-4743-8172-3900d2454db6"),
"itemType", "PluginRun"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "968607b0-f5d7-4743-8172-3900d2454db6"),
"propertyName", "status"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "968607b0-f5d7-4743-8172-3900d2454db6"),
"valueType", "Text"
);
-- containerId
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"6c350679-d15a-4d5f-9c46-6c6c6ba5cbfe",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "6c350679-d15a-4d5f-9c46-6c6c6ba5cbfe"),
"itemType", "PluginRun"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "6c350679-d15a-4d5f-9c46-6c6c6ba5cbfe"),
"propertyName", "containerId"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "6c350679-d15a-4d5f-9c46-6c6c6ba5cbfe"),
"valueType", "Text"
);
......@@ -283,7 +283,7 @@ pub struct PluginStatusReq {
pub plugins: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub enum PluginStatus {
Running(Value),
......@@ -294,6 +294,7 @@ pub enum PluginStatus {
#[serde(rename_all = "camelCase")]
pub struct PluginStatusRes {
pub plugins: HashMap<String, PluginStatus>,
pub shared_plugins: HashMap<String, PluginStatus>,
}
#[derive(Serialize, Deserialize, Debug)]
......@@ -328,8 +329,8 @@ pub enum PluginCallMethod {
Patch,
}
impl From<PluginCallMethod> for Method {
fn from(plugin_call_method: PluginCallMethod) -> Self {
impl From<&PluginCallMethod> for Method {
fn from(plugin_call_method: &PluginCallMethod) -> Self {
match plugin_call_method {
PluginCallMethod::Get => Method::GET,
PluginCallMethod::Post => Method::POST,
......@@ -367,6 +368,17 @@ impl Debug for PluginCallApiRes {
}
}
#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PluginRunReq {
pub id: String,
pub container_image: String,
pub plugin_module: String,
pub plugin_name: String,
pub status: Option<String>,
pub target_item_id: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SortOrder {
/// Ascending
......
use clap::Parser;
use lazy_static::lazy_static;
use std::{error::Error, net::IpAddr};
use structopt::{clap::AppSettings, StructOpt};
use std::net::IpAddr;
#[derive(StructOpt, Debug, Clone)]
#[structopt(
#[derive(Parser, Debug, Clone)]
#[command(
name = "Pod, the open-source backend for Memri project.",
setting = AppSettings::DeriveDisplayOrder,
setting = AppSettings::UnifiedHelpMessage,
// setting = AppSettings::DeriveDisplayOrder,
// setting = AppSettings::UnifiedHelpMessage,
version=std::env!("GIT_DESCRIBE")
)]
#[structopt(version=std::env!("GIT_DESCRIBE"))]
pub struct CliOptions {
/// Port to listen to.
#[structopt(short, long, default_value = "3030")]
#[arg(short, long, default_value = "3030")]
pub port: u16,
/// Comma-separated list of Pod owners (hex-encoded hashes of public keys).
......@@ -24,8 +24,8 @@ pub struct CliOptions {
/// Pod does not store any data on owners in any external databases.
///
/// A magic value of "ANY" will allow any owner to connect to the Pod.
#[structopt(
short = "o",
#[arg(
short = 'o',
long,
name = "OWNERS",
required = true,
......@@ -35,11 +35,11 @@ pub struct CliOptions {
/// If specified, all Plugin containers will be started using kubernetes (`kubectl`).
/// Otherwise and by default, docker containers are used.
#[structopt(long)]
#[arg(long)]
pub use_kubernetes: bool,
/// Sets maximum number of plugins that can be run at once per one POD owner.
#[structopt(
#[arg(
long,
name = "K8S_LIMIT",
env = "POD_KUBERNETES_PLUGIN_LIMIT_PER_OWNER",
......@@ -48,7 +48,7 @@ pub struct CliOptions {
pub kubernetes_plugin_limit_per_owner: usize,
/// Ignores limits on kubernetes containers, it takes precedence to 'kubernetes-plugin-limit-per-owner' flag
#[structopt(long, env = "POD_KUBERNETES_IGNORE_LIMITS")]
#[arg(long, env = "POD_KUBERNETES_IGNORE_LIMITS")]
pub kubernetes_ignore_limits: bool,
/// Set the callback address for plugins launched from within Pod.
......@@ -56,8 +56,8 @@ pub struct CliOptions {
/// It defaults to "pod:3030" if Pod is inside docker,
/// or "localhost:3030" on Linux,
/// or "host.docker.internal:3030" on other operating systems.
#[structopt(
short = "s",
#[arg(
short = 's',
long,
name = "ADDRESS",
env = "POD_PLUGINS_CALLBACK_ADDRESS"
......@@ -68,7 +68,7 @@ pub struct CliOptions {
/// This is passed to the Plugins so that they can know (and advertise) where they are.
///
/// If not specified, Pod will assume the plugin runs on `localhost`.
#[structopt(long, env)]
#[arg(long, env)]
pub plugins_public_domain: Option<String>,
/// Docker network to use when running plugins, e.g. `docker run --network=XXX ...`
......@@ -77,7 +77,7 @@ pub struct CliOptions {
/// If Pod itself is running inside docker, please run both Pod and plugins
/// in identical network that will then not be shared with the host system
/// (this is covered in docker-compose.yml by default).
#[structopt(
#[arg(
long,
name = "PLUGINS_DOCKER_NETWORK",
env = "POD_PLUGINS_DOCKER_NETWORK"
......@@ -85,8 +85,8 @@ pub struct CliOptions {
pub plugins_docker_network: Option<String>,
/// File to read https public certificate from.
#[structopt(
short = "c",
#[arg(
short = 'c',
long,
default_value = "./data/certs/pod.crt",
name = "CERTIFICATE_FILE"
......@@ -94,8 +94,8 @@ pub struct CliOptions {
pub tls_pub_crt: String,
/// File to read https private key from.
#[structopt(
short = "k",
#[arg(
short = 'k',
long,
default_value = "./data/certs/pod.key",
name = "KEY_FILE"
......@@ -114,18 +114,14 @@ pub struct CliOptions {
/// Note that running scripts instead of containers is not secure. A container is limited
/// in its access to the filesystem, but running a script is only secure if you know and trust
/// the script.
#[structopt(
long,
parse(try_from_str = parse_key_val),
number_of_values = 1,
)]
#[arg(long, value_parser(parse_key_val), action(clap::ArgAction::Append))]
pub insecure_plugin_script: Vec<(String, String)>,
/// Do not use https when starting the server, instead run on http://127.0.0.1.
/// Running on loopback interface (127.0.0.1) means that only apps
/// from within the same computer will be able to access Pod.
/// This option might be used during development as an alternative to self-signed certificates.
#[structopt(short = "t", long, env = "POD_NON_TLS")]
#[arg(short = 't', long, env = "POD_NON_TLS")]
pub non_tls: bool,
/// Unsafe version of --non-tls that runs on a public network, e.g. "http://0.0.0.0".
......@@ -136,46 +132,58 @@ pub struct CliOptions {
/// could spoof the traffic sent to the server and do a MiTM attack.
/// Please consider running Pod on a non-public network (--non-tls),
/// or use Pod with https encryption.
#[structopt(long, name = "NETWORK_INTERFACE", env = "POD_INSECURE_NON_TLS")]
#[arg(long, name = "NETWORK_INTERFACE", env = "POD_INSECURE_NON_TLS")]
pub insecure_non_tls: Option<IpAddr>,
/// Run server as a "SharedServer". See `/docs/SharedServer.md` documentation
/// for details on what it is, and how it works.
#[structopt(long, env)]
#[arg(long, env)]
pub shared_server: bool,
/// SMTP relay server to use (advanced option).
#[structopt(long, env)]
#[arg(long, env)]
pub email_smtp_relay: Option<String>,
/// SMTP relay server port to use (advanced option).
#[structopt(long, default_value = "465", env)]
#[arg(long, default_value = "465", env)]
pub email_smtp_port: u16,
/// SMTP relay server user (advanced option).
#[structopt(long, env)]
#[arg(long, env)]
pub email_smtp_user: Option<String>,
/// SMTP relay server password (advanced option).
#[structopt(long, env)]
#[arg(long, env)]
pub email_smtp_password: Option<String>,
/// DB credentials used by the POD to store non volatile information
#[arg(long, env = "POD_OWNER_KEY_FOR_POD", requires = "db_key_for_pod")]
pub owner_key_for_pod: Option<String>,
/// DB credentials used by the POD to store non volatile information
#[arg(long, env = "POD_DB_KEY_FOR_POD", requires = "owner_key_for_pod")]
pub db_key_for_pod: Option<String>,
#[arg(long, env = "POD_SHARED_PLUGINS", action(clap::ArgAction::Append))]
pub shared_plugins: Vec<String>,
}
fn parse_key_val<T, U>(s: &str) -> Result<(T, U), Box<dyn Error>>
where
T: std::str::FromStr,
T::Err: Error + 'static,
U: std::str::FromStr,
U::Err: Error + 'static,
{
fn parse_key_val(s: &str) -> Result<(String, String), String> {
let pos = s
.find('=')
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{}`", s))?;
Ok((s[..pos].parse()?, s[pos + 1..].parse()?))
Ok((
s[..pos]
.parse()
.map_err(|e| format!("failed to parse KEY {e}"))?,
s[pos + 1..]
.parse()
.map_err(|e| format!("failed to parse value {e}"))?,
))
}
lazy_static! {
pub static ref PARSED: CliOptions = CliOptions::from_args();
pub static ref PARSED: CliOptions = CliOptions::parse();
}
#[cfg(test)]
......@@ -205,6 +213,9 @@ pub mod tests {
email_smtp_port: 465,
email_smtp_user: None,
email_smtp_password: None,
owner_key_for_pod: None,
db_key_for_pod: None,
shared_plugins: Vec::new(),
}
}
}
......@@ -6,12 +6,15 @@ use crate::{
command_line_interface::CliOptions,
database_api,
database_api::{
get_incoming_edges, get_outgoing_edges, DatabaseSearch, EdgePointer, GQLSearchArgs, Rowid,
get_incoming_edges, get_outgoing_edges, in_write_transaction, DatabaseSearch, EdgePointer,
GQLSearchArgs, Rowid,
},
database_pool::PooledConnection,
database_utils::{add_item_edge_properties, insert_edge, insert_property, item_base_to_json},
error::{Error, ErrorContext, Result},
graphql_utils,
graphql_utils::QueryASTNode,
plugin_auth_crypto::DatabaseKey,
schema,
schema::{validate_property_name, Schema},
triggers,
......@@ -1291,3 +1294,27 @@ mod tests {
);
}
}
// TODO: there is no proper way of starting plugins, it happens by creating the item in the DB
// Add dedicated endpoints for that
pub fn create_item(
payload: CreateItem,
conn: &mut PooledConnection,
owner: &str,
database_key: &DatabaseKey,
cli: &CliOptions,
) -> Result<String> {
// Commit the item to the DB
let item_id = in_write_transaction(conn, |tx| {
let mut schema = database_api::get_schema(tx)?;
create_item_tx(tx, &mut schema, payload, owner, cli)
});
if let Ok(id) = &item_id {
// After transaction commit, the item is visible to all readers, now it's safe to call triggers
triggers::actions_after_item_create(conn, &[id], owner, cli, database_key)?;
}
item_id
}
......@@ -18,4 +18,5 @@ pub mod plugin_auth_crypto;
pub mod plugin_run;
mod plugin_trigger;
mod schema;
pub mod shared_plugins;
pub mod triggers;
use crate::{
api_model::{PluginAuth, PluginAuthData},
api_model::{AuthKey, PluginAuth, PluginAuthData},
error::{Error, Result},
global_static,
};
......@@ -61,6 +61,13 @@ pub fn extract_database_key(plugin_auth: &PluginAuth) -> Result<DatabaseKey> {
})
}
pub fn auth_to_database_key(auth: AuthKey) -> Result<DatabaseKey> {
match auth {
AuthKey::ClientAuth(c) => DatabaseKey::from(c.database_key),
AuthKey::PluginAuth(p) => extract_database_key(&p),
}
}
/// Database key stored in raw String format (as supplied to sqlcipher).
pub struct DatabaseKey {
/// PRIVATE key (intentionally not public)!
......
use crate::{
api_model::{
CreateItem, PluginCallApiReq, PluginCallApiRes, PluginGetApiReq, PluginGetApiRes,
PluginStatus, PluginStatusReq, PluginStatusRes, Search,
PluginStatus, PluginStatusReq, Search,
},
command_line_interface::CliOptions,
database_api::{self, in_read_transaction},
......@@ -175,6 +175,7 @@ fn run_docker_container(
let mut args: Vec<String> = vec![
"run".to_string(),
"--rm".to_string(),
"--pull=always".to_string(),
format!("-h={triggered_by_item_id}"),
format!(
"--env=POD_FULL_ADDRESS={}",
......@@ -715,8 +716,8 @@ where
pub async fn get_plugins_status(
conn: &mut PooledConnection,
cli: &CliOptions,
payload: PluginStatusReq,
) -> Result<PluginStatusRes> {
payload: &PluginStatusReq,
) -> Result<HashMap<String, PluginStatus>> {
debug!("Get status for {payload:?}");
let plugins = find_plugins(conn, payload.plugins.as_ref())?;
......@@ -732,9 +733,7 @@ pub async fn get_plugins_status(
debug!("Plugin statuses {statuses:#?}");
Ok(PluginStatusRes {
plugins: statuses.into_iter().collect(),
})
Ok(statuses.into_iter().collect())
}
async fn query_for_status(
......@@ -779,7 +778,7 @@ pub fn get_plugin_base_endpoint(plugin_instance: &PluginRunItem, cli: &CliOption
pub async fn get_plugin_api(
conn: &mut PooledConnection,
cli: &CliOptions,
payload: PluginGetApiReq,
payload: &PluginGetApiReq,
) -> Result<PluginGetApiRes> {
debug!("Get plugin API for {}", payload.id);
......@@ -833,7 +832,7 @@ async fn query_for_api(
pub async fn call_plugin_api(
conn: &mut PooledConnection,
cli: &CliOptions,
payload: PluginCallApiReq,
payload: &PluginCallApiReq,
) -> Result<PluginCallApiRes> {
debug!("Call plugin API for {}", payload.id);
......@@ -850,7 +849,7 @@ pub async fn call_plugin_api(
let client = reqwest::Client::new();
let req = client
.request(payload.method.into(), endpoint)
.request((&payload.method).into(), endpoint)
.json(&payload.json_body)
.query(&payload.query)
.build()?;
......@@ -858,7 +857,7 @@ pub async fn call_plugin_api(
debug!("Request: {req:#?}");
debug!(
"body: {:#}",
payload.json_body.unwrap_or_else(|| json!("None"))
payload.json_body.clone().unwrap_or_else(|| json!("None"))
);
let resp = client
......
use crate::{
api_model::{AuthKey, ClientAuth, CreateItem, Search},
command_line_interface::PARSED,
database_api::{self, dangerous_permament_remove_item, in_write_transaction},
database_pool::{get_db_connection, initialize_db, InitDb, PooledConnection},
db_model::ItemBase,
error::Result,
internal_api::{self, search},
plugin_auth_crypto::auth_to_database_key,
};
use std::ops::Deref;
use tracing::{debug, info, warn};
// TODO: there is no proper plugin state management
// if pod restarts - plugins are unable to reach it, plugin auth is broken
// plugins listens for that, and shuts down
// garbage in the db stays
// if plugin dies, pod does nothing with it
pub async fn db_connection(init_db: &InitDb) -> Result<Option<PooledConnection>> {
if let (Some(owner), Some(database_key)) = (
PARSED.owner_key_for_pod.clone(),
PARSED.db_key_for_pod.clone(),
) {
let database_key = auth_to_database_key(AuthKey::ClientAuth(ClientAuth { database_key }))?;
let conn = get_db_connection(&owner, init_db, &database_key).await?;
Ok(Some(conn))
} else {
Ok(None)
}
}
pub async fn initialize(init_db: &InitDb) -> Result<()> {
let (Some(owner_key), Some(database_key)) = (PARSED.owner_key_for_pod.clone(), PARSED.db_key_for_pod.clone()) else {
info!("No POD credentials provided, skipping startup of shared plugins");
return Ok(());
};
let database_key = auth_to_database_key(AuthKey::ClientAuth(ClientAuth { database_key }))?;
let _ = initialize_db(&owner_key, init_db, &database_key).await;
let mut conn = get_db_connection(&owner_key, init_db, &database_key).await?;
// Remove PluginRun from the DB before starting new plugins
// Currently plugins have listeners that will detect lack of POD connection
// and will close themselves.
// This is very simple approach, that might become invalid in the future.
// But for now it behaves well.
clean_db_from_plugins(&mut conn)?;
for plugin in &PARSED.shared_plugins {
let payload: CreateItem = serde_json::from_str(plugin)?;
let plugin_name = payload.fields.get("pluginName").cloned();
let id = payload.id.clone();
match internal_api::create_item(
payload,
&mut conn,
&owner_key,
&database_key,
PARSED.deref(),
) {
Ok(id) => info!("Started plugin {plugin_name:?} with id {id}"),
// TODO: check if plugin still exists, if not - delete from the db, using kubectl probably, because
// what if is in status pending - health endpoint would not work
Err(e) => warn!("Plugin with id {id:?} not started, reason {e}",),
}
}
Ok(())
}
fn clean_db_from_plugins(conn: &mut PooledConnection) -> Result<()> {
in_write_transaction(conn, |tx| {
let schema = database_api::get_schema(tx)?;
let query = Search {
_type: Some("PluginRun".to_string()),
limit: u64::MAX,
..Default::default()
};
let plugins = search(tx, &schema, query)?
.into_iter()
.map(|element| {
serde_json::from_value::<ItemBase>(element).unwrap_or_else(|err| {
panic!("Unable to deserialize value into ItemBase, reason {err:?}")
})
})
.collect::<Vec<_>>();
debug!("Number of plugins to remove {}", plugins.len());
for plugin in plugins {
debug!("Removing {}", plugin.id);
if let Err(e) = dangerous_permament_remove_item(tx, plugin.rowid) {
warn!(
"Failed to remove PluginRun with id {}, reason {e:?}",
plugin.id
);
}
}
Ok(())
})
}
......@@ -56,6 +56,12 @@ name = "test_plugin"
path = "tests/test_plugin.rs"
required-features = ["include_slow_tests"]
[[test]]
name = "test_shared_plugin"
path = "tests/test_shared_plugin.rs"
required-features = ["include_slow_tests"]
[[test]]
name = "test_create_account"
path = "tests/test_create_account.rs"
......
......@@ -5,7 +5,7 @@ use libpod::{
command_line_interface::CliOptions,
database_pool::{ConnectionPool, InitDb},
error::Result,
internal_api,
internal_api, shared_plugins,
};
use std::{
......@@ -31,8 +31,9 @@ use warp::{
/// Start web framework with specified APIs.
pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handle<EnvFilter, S>) {
let package_name = env!("CARGO_PKG_NAME").to_uppercase();
info!("Starting {} HTTP server", package_name);
let dbs_arc = Arc::new(tokio::sync::RwLock::new(
HashMap::<String, ConnectionPool>::new(),
));
let mut headers = HeaderMap::new();
headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
......@@ -64,14 +65,9 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
}
};
let with_init_db = {
let dbs_arc = Arc::new(tokio::sync::RwLock::new(
HashMap::<String, ConnectionPool>::new(),
));
move || {
let db = dbs_arc.clone();
warp::any().map(move || db.clone())
}
let with_init_db = || {
let db = dbs_arc.clone();
warp::any().map(move || db.clone())
};
let with_trace_reload_handler = move || warp::any().map(move || trace_handle.clone());
......@@ -499,7 +495,7 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
let filters = always_enabled_filters.or(sensitive_filters).or(not_found);
if cli_options.non_tls || cli_options.insecure_non_tls.is_some() {
let server_handle = if cli_options.non_tls || cli_options.insecure_non_tls.is_some() {
let ip = if let Some(ip) = cli_options.insecure_non_tls {
if ip.is_loopback() {
info!(
......@@ -522,7 +518,7 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
IpAddr::from([127, 0, 0, 1])
};
let socket = SocketAddr::new(ip, cli_options.port);
warp::serve(filters).run(socket).await
tokio::spawn(warp::serve(filters).run(socket))
} else {
let cert_path = &cli_options.tls_pub_crt;
let key_path = &cli_options.tls_priv_key;
......@@ -541,13 +537,21 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
std::process::exit(1)
};
let socket = SocketAddr::new(IpAddr::from([0, 0, 0, 0]), cli_options.port);
warp::serve(filters)
.tls()
.cert_path(cert_path)
.key_path(key_path)
.run(socket)
.await;
tokio::spawn(
warp::serve(filters)
.tls()
.cert_path(cert_path)
.key_path(key_path)
.run(socket),
)
};
if let Err(e) = shared_plugins::initialize(dbs_arc.deref()).await {
warn!("Failed to initialize shared plugins: {e}");
}
let _ = server_handle.await;
}
fn respond_with_result<T: Reply>(result: Result<T>) -> Response {
......
......@@ -16,9 +16,9 @@ use libpod::{
error::{Error, Result},
file_api, internal_api, oauth1_api,
oauth2_api::{self},
plugin_auth_crypto,
plugin_auth_crypto::auth_to_database_key,
plugin_auth_crypto::DatabaseKey,
plugin_run, triggers,
plugin_run, shared_plugins, triggers,
};
use serde_json::Value;
use sha2::{
......@@ -89,20 +89,7 @@ async fn _create_item(
let mut conn = check_owner_and_initialize_db(&owner, init_db, &database_key).await?;
let owner = owner;
// Commit the item to the DB
let item_id = in_write_transaction(&mut conn, |tx| {
let mut schema = database_api::get_schema(tx)?;
internal_api::create_item_tx(tx, &mut schema, payload, &owner, cli.deref())
});
if let Ok(id) = &item_id {
// After transaction commit, the item is visible to all readers, now it's safe to call triggers
triggers::actions_after_item_create(&mut conn, &[id], &owner, cli.deref(), &database_key)?;
}
item_id
internal_api::create_item(payload, &mut conn, &owner, &database_key, cli.deref())
}
#[instrument(skip(body))]
......@@ -687,7 +674,18 @@ async fn _plugins_status(
let database_key = auth_to_database_key(auth)?;
let mut conn = check_owner_and_initialize_db(&owner, init_db, &database_key).await?;
plugin_run::get_plugins_status(&mut conn, cli, body.payload).await
let plugins = plugin_run::get_plugins_status(&mut conn, cli, &body.payload).await?;
let shared_plugins = if let Some(mut conn) = shared_plugins::db_connection(init_db).await? {
plugin_run::get_plugins_status(&mut conn, cli, &body.payload).await?
} else {
HashMap::new()
};
Ok(PluginStatusRes {
plugins,
shared_plugins,
})
}
#[instrument(fields(uid=trace_uid(), %owner), skip_all)]
......@@ -715,7 +713,16 @@ async fn _plugin_api(
let database_key = auth_to_database_key(auth)?;
let mut conn = check_owner_and_initialize_db(&owner, init_db, &database_key).await?;
plugin_run::get_plugin_api(&mut conn, cli, body.payload).await
let mut plugin_api = plugin_run::get_plugin_api(&mut conn, cli, &body.payload).await;
if plugin_api.is_err() {
// Plugin not found, maybe it's shared plugin?
if let Some(mut conn) = shared_plugins::db_connection(init_db).await? {
plugin_api = plugin_run::get_plugin_api(&mut conn, cli, &body.payload).await
}
}
plugin_api
}
#[instrument(fields(uid=trace_uid(), %owner), skip_all)]
......@@ -743,20 +750,22 @@ async fn _plugin_api_call(
let database_key = auth_to_database_key(auth)?;
let mut conn = check_owner_and_initialize_db(&owner, init_db, &database_key).await?;
plugin_run::call_plugin_api(&mut conn, cli, body.payload).await
let mut api_response = plugin_run::call_plugin_api(&mut conn, cli, &body.payload).await;
if api_response.is_err() {
// Plugin not found, maybe it's shared plugin?
if let Some(mut conn) = shared_plugins::db_connection(init_db).await? {
api_response = plugin_run::call_plugin_api(&mut conn, cli, &body.payload).await
}
}
api_response
}
//
// helper functions:
//
fn auth_to_database_key(auth: AuthKey) -> Result<DatabaseKey> {
match auth {
AuthKey::ClientAuth(c) => DatabaseKey::from(c.database_key),
AuthKey::PluginAuth(p) => plugin_auth_crypto::extract_database_key(&p),
}
}
/// Two methods combined into one to prevent creating a database connection without owner checks.
/// As additional failsafe to the fact that non-owners don't have the database key.
async fn check_owner_and_initialize_db(
......
......@@ -7,8 +7,8 @@ use serde::Serialize;
use sha2::Digest;
use std::{
collections::HashMap, fs, hash::Hash, path::PathBuf, process::Stdio, sync::atomic::AtomicU32,
time::Duration,
collections::HashMap, ffi::OsStr, fs, hash::Hash, path::PathBuf, process::Stdio,
sync::atomic::AtomicU32, time::Duration,
};
use test_context::AsyncTestContext;
use tokio::{
......@@ -33,21 +33,21 @@ pub struct TestData {
}
/// Get next free port for POD instance
fn pod_port() -> u32 {
pub fn pod_port() -> u32 {
// NOTE: this is not isolated among processes
static NEXT_PORT: AtomicU32 = AtomicU32::new(3040);
NEXT_PORT.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
fn owner_key(port: u32) -> String {
pub fn owner_key(port: u32) -> String {
format!(
"133700000000000000000000000000000000000000000000000000000000{}",
port
)
}
fn database_key(port: u32) -> String {
pub fn database_key(port: u32) -> String {
format!(
"DBDBD0000000000000000000000000000000000000000000000000000000{}",
port
......@@ -55,7 +55,7 @@ fn database_key(port: u32) -> String {
}
/// Get absolute path to root of the POD project
fn root_path() -> PathBuf {
pub fn root_path() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
}
......@@ -82,15 +82,14 @@ fn get_pod_path_executable() -> PathBuf {
}
/// Creates a POD process for given owner, returns process handle, together with log handle
pub async fn create_pod(owner_key: &str, port: u32) -> (Child, UnboundedReceiver<String>) {
let owner_hash = sha2::Sha256::new_with_prefix(hex::decode(owner_key).unwrap()).finalize();
let owner_hash = hex::encode(owner_hash);
pub async fn create_pod<T, U>(args: T) -> (Child, UnboundedReceiver<String>)
where
T: IntoIterator<Item = U>,
U: AsRef<OsStr>,
{
let mut pod_handle = Command::new(get_pod_path_executable())
.env("RUST_LOG", option_env!("RUST_LOG").unwrap_or("info"))
.arg("--non-tls=true")
.arg(format!("--owners={owner_hash}"))
.arg(format!("--port={}", port))
.args(args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
......@@ -167,7 +166,15 @@ impl AsyncTestContext for TestData {
let pod_port = pod_port();
let owner_key = owner_key(pod_port);
let (pod_handle, pod_logs) = create_pod(&owner_key, pod_port).await;
let owner_hash = sha2::Sha256::new_with_prefix(hex::decode(&owner_key).unwrap()).finalize();
let owner_hash = hex::encode(owner_hash);
let (pod_handle, pod_logs) = create_pod([
format!("--owners={}", owner_hash),
"--non-tls".to_string(),
format!("--port={}", pod_port),
])
.await;
let database_key = database_key(pod_port);
let pod_url = format!("http://localhost:{pod_port}");
......
......@@ -163,7 +163,12 @@ async fn test_account_works_after_pod_restart(ctx: &mut TestData) {
let _ = ctx.pod_handle.kill().await;
ctx.pod_logs.close();
let (pod_handle, pod_logs) = create_pod(&ctx.owner_key, ctx.pod_port).await;
let (pod_handle, pod_logs) = create_pod([
"--owners=ANY",
"--non-tls",
&format!("--port={}", ctx.pod_port),
])
.await;
ctx.pod_handle = pod_handle;
ctx.pod_logs = pod_logs;
......
mod common;
use crate::common::{
pod_request::{migrate_plugin_definition, register_account},
test_data::root_path,
};
use bollard::{
container::{ListContainersOptions, StopContainerOptions},
Docker,
};
use common::{
pod_client::PodClient,
test_data::{create_pod, database_key, owner_key, pod_port, TestData},
};
use libpod::{
api_model::{PluginStatus, PluginStatusRes},
database_pool::owner_database_path,
};
use serde_json::json;
use std::{collections::HashMap, fs, time::Duration};
use test_context::{test_context, AsyncTestContext};
use tokio::time::timeout;
use tracing::info;
use tracing_subscriber::{EnvFilter, FmtSubscriber};
const SHARED_PLUGIN1: &str = "566d75e8-6163-4f04-a5eb-7a6239dc7437";
const SHARED_PLUGIN2: &str = "8562b3fd-f227-43ba-9d21-657a075651a9";
#[test_context(TestDataSharedPlugin)]
#[tokio::test]
async fn test_shared_plugin(ctx: &mut TestDataSharedPlugin) {
// POD started with 2 shared plugins, user creates own plugin:
register_account(&ctx.base.pod_client).await;
migrate_plugin_definition(&ctx.base.pod_client).await;
let payload = json!(
{
"createItems":[
{
// Valid plugin
"type": "PluginRun",
"id": "d669792f-cc3f-4598-984e-63f63931925d",
"containerImage": "gitlab.memri.io:5050/szimnowoda/plugin_to_trigger:main-latest",
"pluginModule": "plugin_to_trigger.plugin",
"pluginName": "ClassifierPlugin",
"status": "idle",
"targetItemId": "d669792f-cc3f-4598-984e-63f63931925d"
}
]
}
);
let resp = ctx.base.pod_client.post_to(payload, "bulk").await;
assert_eq!(resp.status(), reqwest::StatusCode::OK);
timeout(Duration::from_secs(30), async {
loop {
// Request status of all plugins, should contain shared, and user specific
let payload = json!(
{
"plugins" : []
}
);
let response = ctx.base.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
let response_plugins = response.json::<PluginStatusRes>().await.unwrap();
info!("plugins status: {response_plugins:#?}");
if response_plugins.plugins.len() == 1
&& response_plugins.shared_plugins.len() == 2
&& response_plugins
.plugins
.iter()
.all(|(_id, status)| status != &PluginStatus::Unreachable)
&& response_plugins
.shared_plugins
.iter()
.all(|(_id, status)| status != &PluginStatus::Unreachable)
{
break;
}
info!("Waiting for plugins to settle...");
tokio::time::sleep(Duration::from_secs(1)).await;
}
})
.await
.unwrap();
// It is possible to get API of shared plugin
let payload = json!({ "id": SHARED_PLUGIN1 });
let response = ctx.base.pod_client.post_to(payload, "plugin/api").await;
assert!(response.status().is_success());
// It is possible to call that API
let payload = json!({
"id": SHARED_PLUGIN1,
"method": "POST",
"endpoint": "/v1/echo",
"query": {
"x": 1234
},
"jsonBody": {
"a": ["string"],
"b": {},
"req": {
"x": 112233,
"y": false,
"d": "2022-10-24T19:38:09.718Z"
},
"t": [3,2,1]
}
});
let response = ctx
.base
.pod_client
.post_to(payload, "plugin/api/call")
.await;
assert!(response.status().is_success());
// TODO: check for shared plugins
// get health for all
// get api for shared
}
struct TestDataSharedPlugin {
pub base: TestData,
pub pod_owner_key: String,
}
impl TestDataSharedPlugin {
async fn stop_containers(&self) {
self.base.stop_containers().await;
let containers = self
.base
.list_containers(ListContainersOptions {
filters: HashMap::from([("name", vec![self.pod_owner_key.as_ref()])]),
..Default::default()
})
.await;
for container in containers {
info!("Stop shared container: {:?}", container.id);
let _res = self
.base
.docker_api
.stop_container(
container.id.as_ref().unwrap(),
Some(StopContainerOptions { t: 5 }),
)
.await;
}
}
async fn remove_db(&self) {
self.base.remove_db().await;
let database_path = owner_database_path(&self.pod_owner_key);
let path = root_path().join(database_path);
info!("Path to remove {path:?}");
let _ = fs::remove_file(path);
}
}
#[async_trait::async_trait]
impl AsyncTestContext for TestDataSharedPlugin {
async fn setup() -> Self {
let subscriber = FmtSubscriber::builder()
.with_env_filter(
EnvFilter::builder()
.with_default_directive("info".parse().unwrap())
.from_env_lossy(),
)
.without_time()
.finish();
let _ = tracing::subscriber::set_global_default(subscriber);
let pod_owner_key = owner_key(pod_port());
let pod_db_key = "9684544858277421540522235608665732630370684094198630717255861961";
let pod_port = pod_port();
let owner_key = owner_key(pod_port);
let args = vec![
"--non-tls".to_string(),
format!("--port={}", pod_port),
format!("--db-key-for-pod={pod_db_key}"),
format!("--owner-key-for-pod={pod_owner_key}"),
format!(
"--shared-plugins={}",
json!({
"type": "PluginRun",
"id": SHARED_PLUGIN1,
"containerImage": "gitlab.memri.io:5050/szimnowoda/plugin_to_trigger:main-latest",
"pluginModule": "plugin_to_trigger.plugin",
"pluginName": "ClassifierPlugin",
"status": "idle",
"targetItemId": SHARED_PLUGIN1
})
),
format!(
"--shared-plugins={}",
json!({
"type": "PluginRun",
"id": SHARED_PLUGIN2,
"containerImage": "gitlab.memri.io:5050/szimnowoda/plugin_to_trigger:main-latest",
"pluginModule": "plugin_to_trigger.plugin",
"pluginName": "ClassifierPlugin",
"status": "idle",
"targetItemId": SHARED_PLUGIN2
})
),
format!("--owners=ANY"),
];
let (pod_handle, pod_logs) = create_pod(args).await;
let database_key = database_key(pod_port);
let pod_url = format!("http://localhost:{pod_port}");
// Take some time to start the plugins, and the pod webserver
timeout(Duration::from_secs(30), async {
while reqwest::get(format!("{pod_url}/version")).await.is_err() {
tokio::time::sleep(Duration::from_millis(200)).await;
}
})
.await
.unwrap();
let docker_api = Docker::connect_with_local_defaults().unwrap();
Self {
base: TestData {
pod_client: PodClient::new(&database_key, &owner_key, &pod_url),
pod_logs,
pod_port,
owner_key,
database_key,
pod_url,
pod_handle,
docker_api,
},
pod_owner_key,
}
}
async fn teardown(mut self) {
self.stop_containers().await;
let _ = self.base.pod_handle.kill().await;
self.remove_db().await;
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment