-
Szymon Zimnowoda authored
Added uniqueId for PluginRun, that, if set will enforce one and only one plugin with given id be running. It's a QnD workaround for twitter API rate limiting.
3a2b1cd1
use crate::{
api_model::{
CreateItem, PluginCallApiReq, PluginCallApiRes, PluginGetApiReq, PluginGetApiRes,
PluginIdentifier, PluginStatus, PluginStatusMetadata, PluginStatusReq, Search,
},
command_line_interface::CliOptions,
database_api::{self, in_read_transaction},
database_pool::PooledConnection,
db_model::{ItemWithBase, PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, ErrorContext, Result},
internal_api,
internal_api::{new_random_string, search},
plugin_auth_crypto::DatabaseKey,
schema::Schema,
};
use futures::future;
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
collections::HashMap,
ffi::OsStr,
fmt::Debug,
process::{Child, Command},
sync::atomic::AtomicU32,
time::Duration,
};
use tracing::{debug, info, warn};
use warp::http::status::StatusCode;
use duct;
const UNIQUE_ID: &str = "unique_id";
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
/// to the containerization / deployment processes.
///
/// Internally passes to docker / kubernetes / scripts
/// depending on how Pod is configured by the user.
#[allow(clippy::too_many_arguments)]
pub fn run_plugin_container(
tx: &Transaction,
schema: &Schema,
triggered_by_item_id: &str,
plugin: &PluginRunItem,
pod_owner: &str,
database_key: &DatabaseKey,
cli_options: &CliOptions,
) -> Result<()> {
info!(
"Trying to run plugin container for target_item_id {}",
plugin.target_item_id
);
let target_item = internal_api::get_item_tx(tx, schema, plugin.target_item_id.as_str())?;
let target_item = target_item.into_iter().next().ok_or_else(|| Error {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Failed to find target item {} to run a plugin against",
plugin.target_item_id
),
})?;
// TODO: that should be not passed to the plugin, plugin should request that data from the DB
// passing plugin run id is enough.
let target_item_json = serde_json::to_string(&target_item)?;
let auth = database_key.create_plugin_auth()?;
let auth = serde_json::to_string(&auth)?;
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
let script_override = cli_options
.insecure_plugin_script
.iter()
.find(|(image, _script)| image == &plugin.container_image)
.map(|(_image, script)| script);
if let Some(script_path) = script_override {
run_local_script(
&plugin.container_image,
script_path,
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
cli_options,
)
} else if cli_options.use_kubernetes {
if !cli_options.kubernetes_ignore_limits {
check_kubernetes_limits(
plugin,
pod_owner,
cli_options.kubernetes_plugin_limit_per_owner,
)?;
}
if let Err(e) = k8s_enforce_unique_instance(plugin) {
warn!("Enforcing K8s container unique instance failed because: {e:?}");
}
run_kubernetes_container(
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
plugin,
cli_options,
)
} else {
if let Err(e) = docker_enforce_unique_instance(plugin) {
warn!("Enforcing docker container unique instance failed because: {e:?}");
}
run_docker_container(
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
plugin,
cli_options,
)
}
}
fn run_local_script(
_container: &str,
plugin_path: &str,
target_item: &str,
pod_owner: &str,
pod_auth: &str,
triggered_by_item_id: &str,
cli_options: &CliOptions,
) -> Result<()> {
let pod_full_address = callback_address(cli_options, false);
let args: Vec<String> = Vec::new();
let mut env_vars = HashMap::new();
env_vars.insert("POD_FULL_ADDRESS", pod_full_address.as_str());
env_vars.insert("POD_TARGET_ITEM", target_item);
env_vars.insert("POD_PLUGINRUN_ID", triggered_by_item_id);
env_vars.insert("POD_OWNER", pod_owner);
env_vars.insert("POD_AUTH_JSON", pod_auth);