Commit 4e78437e authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

Add a field in regiter trigger request that specifies what

item property should be used during predicate evaluation
parent 512cbfd0
Pipeline #9424 passed with stage
in 6 minutes and 41 seconds
Showing with 142 additions and 88 deletions
+142 -88
-- Trigger {
-- action: "path/to/plugin/endpoint",
-- filterCreatedAfter: "DateTime INTEGER!"
-- filterCreatedAfterPropertyName: "string"
-- pluginId: "uuidv4",
-- triggerOn: "Type of item, for example, Message"
-- }
-- action: "path/to/endpoint"
-- what to do if trigger fired
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
......@@ -32,6 +31,7 @@ INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, delet
"1ad79d26-990a-48b1-8fda-7533a9ba173c",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1ad79d26-990a-48b1-8fda-7533a9ba173c"),
"itemType", "Trigger"
......@@ -41,12 +41,33 @@ INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1ad79d26-990a-48b1-8fda-7533a9ba173c"),
"propertyName", "filterCreatedAfter"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "1ad79d26-990a-48b1-8fda-7533a9ba173c"),
"valueType", "DateTime"
);
-- pluginId: "uuidv4",
-- filterCreatedAfterPropertyName: String
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"78552d91-19cb-4b8e-b686-d5b9e85e729d",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "78552d91-19cb-4b8e-b686-d5b9e85e729d"),
"itemType", "Trigger"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "78552d91-19cb-4b8e-b686-d5b9e85e729d"),
"propertyName", "filterCreatedAfterPropertyName"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "78552d91-19cb-4b8e-b686-d5b9e85e729d"),
"valueType", "Text"
);
-- pluginId: "uuidv4",
-- which plugin instance to use upon triggering
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"b44be0e9-7b41-475b-ac07-45be12546d1c",
......
......@@ -249,6 +249,7 @@ mod tests {
conn.query_row("pragma synchronous;", [], |row| row.get::<usize, u32>(0))
.unwrap()
);
assert_eq!(
MEMORY,
conn.query_row("pragma temp_store;", [], |row| row.get::<usize, u32>(0))
......
......@@ -40,6 +40,7 @@ pub struct PluginRunItem {
pub struct Trigger {
pub action: String,
pub filter_created_after: i64,
pub filter_created_after_property_name: String,
// TODO: use base item
pub id: String,
pub plugin_run_id: String,
......
use reqwest::StatusCode;
use serde_json::Value;
use crate::error::Error;
use crate::error::Result;
// Treat item as an JSON object and retrieve value from the property
pub fn get_property_value<'a>(item: &'a Value, property_name: &str) -> Result<&'a Value> {
item.as_object()
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: "Value is not an object".to_string(),
})?
.get(property_name)
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Could not get property {}", property_name),
})
}
......@@ -39,4 +39,5 @@ pub mod oauth2_api;
pub mod api_model;
pub mod constants;
pub mod db_model;
pub mod json_utils;
pub mod plugin_trigger;
......@@ -14,6 +14,7 @@ mod file_api;
mod global_static;
mod graphql_utils;
mod internal_api;
mod json_utils;
mod oauth2_api;
mod plugin_auth_crypto;
mod plugin_run;
......
......@@ -3,16 +3,22 @@ use crate::{
db_model::{ItemBase, PluginRunItem, Trigger},
error::{Error, ErrorContext, Result},
internal_api::{get_item_tx, search},
json_utils::get_property_value,
schema::Schema,
};
use log::debug;
use reqwest::header::CONTENT_TYPE;
use rusqlite::Transaction;
use serde_json::json;
use serde_json::{json, Value};
use tokio::task;
use warp::http::status::StatusCode;
pub fn on_new_item(tx: &Transaction, schema: &Schema, item_created: &ItemBase) -> Result<()> {
pub fn on_new_item(
tx: &Transaction,
schema: &Schema,
item_base: &ItemBase,
item_created: &Value,
) -> Result<()> {
let query = Search {
_type: Some("Trigger".to_string()),
deleted: Some(false),
......@@ -29,61 +35,78 @@ pub fn on_new_item(tx: &Transaction, schema: &Schema, item_created: &ItemBase) -
.context_str("Failed to retrieve Triggers from the DB")?;
for trigger in triggers {
if trigger.trigger_on == item_created._type
&& item_created.date_created > trigger.filter_created_after
{
let plugin_instance = get_item_tx(tx, schema, &trigger.plugin_run_id)?
.pop()
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Could not find PluginRun {}", trigger.plugin_run_id),
})?;
if trigger.trigger_on == item_base._type {
let created_after =
get_property_value(item_created, &trigger.filter_created_after_property_name)?
.as_i64()
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: "Cannot parse property to i64".to_string(),
})?;
let plugin_instance: PluginRunItem = serde_json::from_value(plugin_instance)
.context(|| format!("Cannot parse PluginRun with id {}", trigger.plugin_run_id))?;
if created_after > trigger.filter_created_after {
let plugin_instance = get_item_tx(tx, schema, &trigger.plugin_run_id)?
.pop()
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Could not find PluginRun {}", trigger.plugin_run_id),
})?;
let host = &plugin_instance.webserver_url;
let port = plugin_instance.webserver_port;
let endpoint = format!("{host}:{port}/{}", trigger.action);
let plugin_instance: PluginRunItem = serde_json::from_value(plugin_instance)
.context(|| {
format!("Cannot parse PluginRun with id {}", trigger.plugin_run_id)
})?;
let body =
json!({ "item_id": item_created.id.clone(), "trigger_id": trigger.id.clone() })
.to_string();
notify_plugin(&trigger, item_base, &plugin_instance)?;
}
}
}
debug!(
"Going to trigger PluginRun instance {} by Trigger {} because of arrival of {}",
trigger.plugin_run_id, trigger.id, item_created._type
);
debug!("POST: {endpoint}, with body {body:#}");
Ok(())
}
task::block_in_place(move || {
let client = reqwest::blocking::Client::new();
let res = client
.post(&endpoint)
.header(CONTENT_TYPE, "application/json")
.body(body.clone())
.send()
.context(|| {
format!(
"Failed to trigger plugin instance {}, endpoint {endpoint}",
trigger.plugin_run_id
)
})?;
fn notify_plugin(
trigger: &Trigger,
item_base: &ItemBase,
plugin_instance: &PluginRunItem,
) -> Result<()> {
let host = &plugin_instance.webserver_url;
let port = plugin_instance.webserver_port;
let endpoint = format!("{host}:{port}/{}", trigger.action);
debug!("Got POST {} response {res:?}", trigger.action);
let body =
json!({ "item_id": item_base.id.clone(), "trigger_id": trigger.id.clone() }).to_string();
if res.status() != StatusCode::OK {
return Err(Error {
code: res.status(),
msg: format!("Failure while triggering the plugin. PluginRun = {}, action = {} body = {}, error = {:?}",
trigger.plugin_run_id, trigger.action, body, res.text())
});
}
debug!(
"Going to trigger PluginRun instance {} by Trigger {} because of arrival of {}",
trigger.plugin_run_id, trigger.id, item_base._type
);
debug!("POST: {endpoint}, with body {body:#}");
Ok(())
task::block_in_place(move || {
let client = reqwest::blocking::Client::new();
let res = client
.post(&endpoint)
.header(CONTENT_TYPE, "application/json")
.body(body.clone())
.send()
.context(|| {
format!(
"Failed to trigger plugin instance {}, endpoint {endpoint}",
trigger.plugin_run_id
)
})?;
debug!("Got POST {} response {res:?}", trigger.action);
if res.status() != StatusCode::OK {
return Err(Error {
code: res.status(),
msg: format!("Failure while triggering the plugin. PluginRun = {}, action = {} body = {}, error = {:?}",
trigger.plugin_run_id, trigger.action, body, res.text())
});
}
}
Ok(())
Ok(())
})
}
......@@ -14,6 +14,7 @@ use crate::{
schema::{Schema, SchemaEdge, SchemaItem, ITEM_EDGE_SCHEMA, ITEM_PROPERTY_SCHEMA},
triggers::SchemaAdditionChange::*,
};
use rusqlite::Transaction as Tx;
use serde_json::json;
use warp::http::StatusCode;
......@@ -184,7 +185,7 @@ pub fn execute_actions_after_item_create(
cli,
)
}
_ => plugin_trigger::on_new_item(tx, schema, &item_base),
_ => plugin_trigger::on_new_item(tx, schema, &item_base, &item),
}
}
......
......@@ -6,7 +6,7 @@ use bollard::container::ListContainersOptions;
use common::test_data::TestData;
use common::pod_client::PodClient;
use pod::api_model::Bulk;
use pod::api_model::{Bulk, CreateItem};
use common::pod_client::find_in_logs;
use test_context::test_context;
......@@ -28,10 +28,7 @@ async fn deploy_plugin(client: &PodClient) {
{
"type":"Plugin",
"id":"b8d3cd38-1a0f-492d-8cbb-39dde5b5e090",
"dateCreated":1655388190477,
"dateModified":1655388190901,
"dateServerModified":null,
"deleted":false,
"gitProjectId":234,
"config":"{\"item_type\":\"Message\",\"model_name\":\"my_plugin_model\",\"model_version\":\"0.1\",\"isMock\":true}",
"configJson":"[\n {\n \"name\": \"item_type\",\n \"display\": \"Item Type\",\n \"data_type\": \"Text\",\n \"type\": \"textbox\",\n \"default\": \"Message\",\n \"optional\": true\n },\n {\n \"name\": \"item_service\",\n \"display\": \"Item Service\",\n \"data_type\": \"Text\",\n \"type\": \"textbox\",\n \"default\": null,\n \"optional\": true\n },\n {\n \"name\": \"model_name\",\n \"display\": \"Model Name\",\n \"data_type\": \"Text\",\n \"type\": \"textbox\",\n \"default\": \"my_plugin_model\",\n \"optional\": true\n },\n {\n \"name\": \"model_version\",\n \"display\": \"Model Version\",\n \"data_type\": \"Text\",\n \"type\": \"textbox\",\n \"default\": \"0.1\",\n \"optional\": true\n },\n {\n \"name\": \"isMock\",\n \"display\": \"Ismock\",\n \"data_type\": \"Bool\",\n \"type\": \"textbox\",\n \"default\": true,\n \"optional\": true\n }\n]",
......@@ -44,20 +41,12 @@ async fn deploy_plugin(client: &PodClient) {
},
{
"type":"Edge",
"id":"5c6ec341-182e-4e8f-9c31-5745a829b4b6",
"dateCreated":1655388190808,
"dateModified":1655388190808,
"dateServerModified":null,
"deleted":false
"id":"5c6ec341-182e-4e8f-9c31-5745a829b4b6"
},
{
"type": "Project",
"id": "e9ad0695-e874-4f01-86e0-08339bf1ee8a",
"dateCreated":1655388190808,
"dateModified":1655388190808,
"dateServerModified":null,
"deleted":false
"id": "e9ad0695-e874-4f01-86e0-08339bf1ee8a"
}
],
"createEdges":[
......@@ -84,10 +73,7 @@ async fn start_plugin(ctx: &mut TestData) {
{
"type": "PluginRun",
"id": "4fa2094c-51d6-4874-a135-9017fcdedf16",
"dateCreated": 1655376490634,
"dateModified": 1655376490848,
"dateServerModified": null,
"deleted": false,
"containerImage": "gitlab.memri.io:5050/szimnowoda/plugin_to_trigger:main-latest",
"pluginModule": "plugin_to_trigger.plugin",
"pluginName": "ClassifierPlugin",
......@@ -96,11 +82,7 @@ async fn start_plugin(ctx: &mut TestData) {
},
{
"type": "Edge",
"id": "f6632a28-312e-486e-a790-dd3b415098e3",
"dateCreated": 1655376490717,
"dateModified": 1655376490717,
"dateServerModified": null,
"deleted": false
"id": "f6632a28-312e-486e-a790-dd3b415098e3"
}
],
"createEdges": [
......@@ -181,22 +163,16 @@ async fn register_trigger(client: &PodClient) {
{
"type": "Trigger",
"id": "0a4a06a4-fb6c-4ff0-b12a-f64fb1f84e41",
"dateCreated": null,
"dateModified": null,
"dateServerModified": null,
"deleted": false,
"action": "v1/item/trigger",
"filterCreatedAfter": 1655170839992,
"pluginRunId": "4fa2094c-51d6-4874-a135-9017fcdedf16",
"triggerOn": "Message"
"triggerOn": "Message",
"filterCreatedAfter": 1655170839992,
"filterCreatedAfterPropertyName": "dateSent"
},
{
"type": "Edge",
"id": "68c13d4a-b8cc-49ff-ae56-cf4d0d9cf89b",
"dateCreated": null,
"dateModified": null,
"dateServerModified": null,
"deleted": false
"id": "68c13d4a-b8cc-49ff-ae56-cf4d0d9cf89b"
}
],
"createEdges": [
......@@ -216,7 +192,17 @@ async fn register_trigger(client: &PodClient) {
}
async fn send_fake_msg(client: &PodClient) {
let payload = HashMap::from([("type", "Message"), ("content", "this is a fake content")]);
let payload = serde_json::from_str::<CreateItem>(
r#"
{
"type": "Message",
"content": "this is a fake content",
"dateSent": 1655170839993
}
"#,
)
.unwrap();
let resp = client.post_to(payload, "create_item").await;
assert_eq!(resp.status(), reqwest::StatusCode::OK);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment