• Szymon Zimnowoda's avatar
    Sz/qnd twitter limiting · 3a2b1cd1
    Szymon Zimnowoda authored
    Added uniqueId for PluginRun, that, if set will enforce one and only one plugin with given id be running.
    It's a QnD workaround for twitter API rate limiting.
    3a2b1cd1
plugin_run.rs 31.33 KiB
use crate::{
    api_model::{
        CreateItem, PluginCallApiReq, PluginCallApiRes, PluginGetApiReq, PluginGetApiRes,
        PluginIdentifier, PluginStatus, PluginStatusMetadata, PluginStatusReq, Search,
    },
    command_line_interface::CliOptions,
    database_api::{self, in_read_transaction},
    database_pool::PooledConnection,
    db_model::{ItemWithBase, PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
    error::{Error, ErrorContext, Result},
    internal_api,
    internal_api::{new_random_string, search},
    plugin_auth_crypto::DatabaseKey,
    schema::Schema,
use futures::future;
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
    collections::HashMap,
    ffi::OsStr,
    fmt::Debug,
    process::{Child, Command},
    sync::atomic::AtomicU32,
    time::Duration,
use tracing::{debug, info, warn};
use warp::http::status::StatusCode;
use duct;
const UNIQUE_ID: &str = "unique_id";
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
/// to the containerization / deployment processes.
///
/// Internally passes to docker / kubernetes / scripts
/// depending on how Pod is configured by the user.
#[allow(clippy::too_many_arguments)]
pub fn run_plugin_container(
    tx: &Transaction,
    schema: &Schema,
    triggered_by_item_id: &str,
    plugin: &PluginRunItem,
    pod_owner: &str,
    database_key: &DatabaseKey,
    cli_options: &CliOptions,
) -> Result<()> {
    info!(
        "Trying to run plugin container for target_item_id {}",
        plugin.target_item_id
    let target_item = internal_api::get_item_tx(tx, schema, plugin.target_item_id.as_str())?;
    let target_item = target_item.into_iter().next().ok_or_else(|| Error {
        code: StatusCode::BAD_REQUEST,
        msg: format!(
            "Failed to find target item {} to run a plugin against",
            plugin.target_item_id
    })?;
    // TODO: that should be not passed to the plugin, plugin should request that data from the DB
    // passing plugin run id is enough.
    let target_item_json = serde_json::to_string(&target_item)?;
    let auth = database_key.create_plugin_auth()?;
    let auth = serde_json::to_string(&auth)?;
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
let script_override = cli_options .insecure_plugin_script .iter() .find(|(image, _script)| image == &plugin.container_image) .map(|(_image, script)| script); if let Some(script_path) = script_override { run_local_script( &plugin.container_image, script_path, &target_item_json, pod_owner, &auth, triggered_by_item_id, cli_options, ) } else if cli_options.use_kubernetes { if !cli_options.kubernetes_ignore_limits { check_kubernetes_limits( plugin, pod_owner, cli_options.kubernetes_plugin_limit_per_owner, )?; } if let Err(e) = k8s_enforce_unique_instance(plugin) { warn!("Enforcing K8s container unique instance failed because: {e:?}"); } run_kubernetes_container( &target_item_json, pod_owner, &auth, triggered_by_item_id, plugin, cli_options, ) } else { if let Err(e) = docker_enforce_unique_instance(plugin) { warn!("Enforcing docker container unique instance failed because: {e:?}"); } run_docker_container( &target_item_json, pod_owner, &auth, triggered_by_item_id, plugin, cli_options, ) } } fn run_local_script( _container: &str, plugin_path: &str, target_item: &str, pod_owner: &str, pod_auth: &str, triggered_by_item_id: &str, cli_options: &CliOptions, ) -> Result<()> { let pod_full_address = callback_address(cli_options, false); let args: Vec<String> = Vec::new(); let mut env_vars = HashMap::new(); env_vars.insert("POD_FULL_ADDRESS", pod_full_address.as_str()); env_vars.insert("POD_TARGET_ITEM", target_item); env_vars.insert("POD_PLUGINRUN_ID", triggered_by_item_id); env_vars.insert("POD_OWNER", pod_owner); env_vars.insert("POD_AUTH_JSON", pod_auth);
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
run_any_command(plugin_path, &args, &env_vars, triggered_by_item_id) } // Example: // docker run \ // --network=host \ // --env=POD_FULL_ADDRESS="http://localhost:3030" \ // --env=POD_TARGET_ITEM="{...json...}" \ // --env=POD_OWNER="...64-hex-chars..." \ // --env=POD_AUTH_JSON="{...json...}" \ // // --name="$containerImage-$trigger_item_id" \ // --rm \ // -- \ // "$containerImage" fn run_docker_container( target_item_json: &str, pod_owner: &str, pod_auth: &str, triggered_by_item_id: &str, plugin: &PluginRunItem, cli_options: &CliOptions, ) -> Result<()> { #[cfg(target_os = "macos")] let os_specific_args = [ "--platform".to_string(), "linux/amd64".to_string(), // macos docker does not support --network=host // we are exposing the port to the host instead "-p".to_string(), format!("{}:{}", plugin.webserver_port, plugin.webserver_port), ]; #[cfg(target_os = "linux")] let os_specific_args = { let docker_network = match &cli_options.plugins_docker_network { Some(net) => net.to_string(), None => "host".to_string(), }; [format!("--network={}", docker_network)] }; let mut args: Vec<String> = vec![ "run".to_string(), "--rm".to_string(), "--pull=always".to_string(), format!("-h={triggered_by_item_id}"), format!( "--env=POD_FULL_ADDRESS={}", callback_address(cli_options, true) ), format!("--env=POD_TARGET_ITEM={}", target_item_json), format!("--env=POD_PLUGINRUN_ID={}", triggered_by_item_id), format!("--env=POD_OWNER={}", pod_owner), format!("--env=POD_AUTH_JSON={}", pod_auth), // Makes logs from the Python visible on the POD console immediately "--env=PYTHONUNBUFFERED=1".to_string(), format!( "--name={}", sanitize_docker_name(plugin.container_id.as_str()) ), ]; args.extend_from_slice(&os_specific_args); if let Some(ref unique_id) = plugin.unique_id { args.extend_from_slice(&["--label".to_string(), format!("{UNIQUE_ID}={unique_id}")]); }
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
args.extend_from_slice(&["--".to_string(), plugin.container_image.clone()]); let envs: HashMap<&str, &str> = HashMap::new(); run_any_command("docker", &args, &envs, triggered_by_item_id) } static IMPORTERS_PLUGINS: [&str; 3] = [ "gitlab.memri.io:5050/memri/plugins/whatsapp-multi-device", "gitlab.memri.io:5050/memri/plugins/twitter", "registry.digitalocean.com/polis/memri-plugins", ]; const K8S_CONTAINER_PORT: u32 = 8080; /// Example: /// kubectl run $owner-$containerImage-$targetItem-$randomHex /// --image="$containerImage" \ /// --env=POD_FULL_ADDRESS="http://localhost:3030" \ /// --env=POD_TARGET_ITEM="{...json...}" \ /// --env=POD_OWNER="...64-hex-chars..." \ /// --env=POD_AUTH_JSON="{...json...}" \ fn run_kubernetes_container( target_item_json: &str, pod_owner: &str, pod_auth: &str, triggered_by_item_id: &str, plugin: &PluginRunItem, cli_options: &CliOptions, ) -> Result<()> { let resources = if cli_options.kubernetes_ignore_limits { json!({}) } else if IMPORTERS_PLUGINS .iter() .any(|importer| plugin.container_image.contains(importer)) { // Importer plugin json!({ "requests": {"cpu":"0.5", "memory": "200Mi"}, "limits": {"cpu":"1", "memory": "500Mi"} }) } else { // Non-importer like plugin json!({ "requests": {"cpu":"1","memory": "2Gi",}, "limits": {"cpu":"2", "memory": "3.5Gi"} }) }; let overrides = json!({ "apiVersion": "v1", "kind": "Pod", "spec": { "containers": [{ "name": plugin.container_id, "image": plugin.container_image, "ports": [{ "containerPort": K8S_CONTAINER_PORT, "protocol": "TCP" }], "env": [ {"name": "POD_FULL_ADDRESS", "value": callback_address(cli_options, false)}, {"name": "POD_TARGET_ITEM", "value":target_item_json}, {"name": "POD_PLUGINRUN_ID", "value":triggered_by_item_id}, {"name": "POD_OWNER", "value":pod_owner}, {"name": "POD_AUTH_JSON", "value":pod_auth}, {"name": "PLUGIN_DNS", "value":plugin.webserver_url}, {"name": "PYTHONUNBUFFERED", "value":"1"} ], "imagePullPolicy":"Always", "resources": resources }]
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
} }); let mut labels = format!( "app={},type=plugin,owner={:x}", plugin.container_id, // Owner key exceeds a limit of number of characters that k8s label can keep. // Make it shorter by doing a md5 hash. md5::compute(pod_owner) ); if let Some(ref unique_id) = plugin.unique_id { labels = format!("{labels},{UNIQUE_ID}={unique_id}"); } let args: Vec<String> = vec![ "run".to_string(), "--restart=Never".to_string(), plugin.container_id.clone(), format!("--labels={labels}"), format!("--port={K8S_CONTAINER_PORT}"), "--image-pull-policy=Always".to_string(), format!("--image={}", plugin.container_image), format!("--overrides={}", overrides), ]; run_any_command("kubectl", &args, &HashMap::new(), triggered_by_item_id) } pub fn get_logs(tx: &Transaction, plugin_run_id: &str, cli_options: &CliOptions) -> Result<Value> { let schema = database_api::get_schema(tx).unwrap(); let json = json!({ "id": plugin_run_id, }); let query = serde_json::from_value(json).unwrap(); let result = search(tx, &schema, query).unwrap(); let result = result.into_iter().next().unwrap(); let container_id = result.get("containerId").unwrap().as_str().unwrap(); let args: Vec<String> = vec!["logs".to_string(), container_id.to_string()]; if cli_options.use_kubernetes { //kubernetes logs let cmd = "kubectl".to_string(); let logs = run_any_command_for_logs(cmd, args); Ok(json!({ "logs": logs, })) } else { //docker logs let cmd = "docker".to_string(); let logs = run_any_command_for_logs(cmd, args); Ok(json!({ "logs": logs, })) } } fn run_any_command_for_logs(cmd: String, args: Vec<String>) -> String { let debug_args = args .iter() .map(|p| escape_bash_arg(p)) .collect::<Vec<_>>() .join(" "); info!("Getting logs {} {}", cmd, debug_args); duct::cmd(cmd, args) .stderr_to_stdout() .read() .expect("failed to fetch logs")
351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
} fn run_any_command( cmd: &str, args: &[String], envs: &HashMap<&str, &str>, container_id: &str, ) -> Result<()> { let debug_envs = envs .iter() .map(|(a, b)| format!("{}={} ", escape_bash_arg(a), escape_bash_arg(b))) .collect::<Vec<_>>() .join(""); let debug_args = args .iter() .map(|p| escape_bash_arg(p)) .collect::<Vec<_>>() .join(" "); info!("Starting command {}{} {}", debug_envs, cmd, debug_args); tokio::task::block_in_place(|| { let command = Command::new(cmd).args(args).envs(envs).spawn(); match command { Ok(mut child) => { debug!( "Successfully started {} process for Plugin container item {}", cmd, container_id ); // Give some time for command to execute std::thread::sleep(Duration::from_secs(1)); check_command_status(&mut child) } Err(err) => Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!( "Failed to execute {}, a plugin container triggered by item.rowid {}, {}", cmd, container_id, err ), }), } }) } fn check_command_status(child: &mut Child) -> Result<()> { match child.try_wait() { Ok(Some(status)) => { info!("Command exited with status {status}"); if status.success() { Ok(()) } else { Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!("Command execution failed with status {status}"), }) } } Ok(None) => { // Command still executes, fetches an image, or is already running user code Ok(()) } Err(e) => Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!("Error while attempting to wait for command status, error {e:?}"), }), } } /// Determine the callback address to use for plugins.
421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
/// A callback address is an environment variable passed to the Plugin /// so that the plugin can call back Pod when it needs to. fn callback_address(cli_options: &CliOptions, docker_magic: bool) -> String { if let Some(address_override) = &cli_options.plugins_callback_address { address_override.to_string() } else { // The plugin container needs to have access to the host // This is currently done differently on MacOS and Linux // https://stackoverflow.com/questions/24319662/from-inside-of-a-docker-container-how-do-i-connect-to-the-localhost-of-the-mach let pod_domain = if docker_magic && !cfg!(target_os = "linux") { "host.docker.internal" } else { "localhost" }; let is_https = cli_options.insecure_non_tls.is_none() && !cli_options.non_tls; let schema = if is_https { "https" } else { "http" }; format!("{}://{}:{}", schema, pod_domain, cli_options.port) } } pub fn sanitize_docker_name(input: &str) -> String { input .chars() .map(|c| { if c.is_ascii_alphanumeric() || "-_".contains(c) { c } else { '_' } }) .collect() } /// Debug-print a bash argument. /// Never use this for running real code, but for debugging that's good enough. /// /// From bash manual: /// > Enclosing characters in single quotes preserves the literal value of each character /// > within the quotes. A single quote may not occur between single quotes, /// > even when preceded by a backslash. pub fn escape_bash_arg(str: &str) -> String { let ok = str .chars() .all(|c| c.is_ascii_alphanumeric() || "_-+=%".contains(c)); if ok { str.to_string() } else { let quoted = str.replace('\'', "'\\''"); // end quoting, append the literal, start quoting format!("'{}'", quoted) } } /// Get next available port number for PluginRun instance pub fn get_free_port(cli: &CliOptions) -> u32 { if (cli.use_kubernetes && cli.plugins_public_domain.is_some()) || cli.plugins_docker_network.is_some() { // Container have unique IPs, same port might be used return K8S_CONTAINER_PORT; } // Container use host network stack, give unique port number static FREE_PORT_NUMBER: AtomicU32 = AtomicU32::new(8008); FREE_PORT_NUMBER.fetch_add(1, std::sync::atomic::Ordering::Relaxed) } /// Figure out correct URL depending on the environnement the Plugin will be run on. pub fn resolve_webserver_url(item: &CreateItem, cli: &CliOptions) -> Result<String> { let plugin_run_id = item.id.as_ref().ok_or(Error { code: StatusCode::BAD_REQUEST,
491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
msg: "Could not get required field Id".to_string(), })?; let container_id = item .fields .get(CONTAINER_ID) .and_then(|val| val.as_str()) .ok_or(Error { code: StatusCode::BAD_REQUEST, msg: format!("Could not get required field {CONTAINER_ID}"), })?; let webserver_url = if cli.use_kubernetes { // Kubernetes if let Some(base) = &cli.plugins_public_domain { format!("https://{}.{}", container_id, base) } else { warn!("Plugin public DNS is not configured for the Pod but a kubernetes Plugin is run. Using localhost as the public domain..."); "http://localhost".to_string() } } else { // Docker match &cli.plugins_docker_network { // There is a custom network provided, access the container from the POD using it's hostname, // Which is the ID of PluginRun object in the Database. // Note: it will work out of the box, if the POD is also running in the container, // If POD runs on the host, additional DNS service must run to resolve containers hostnames to IPs // The DNS can be executed by calling examples/run_docker_dns_service.sh // TODO: use docker API to start such a service? That will change host network configuration. Some(_net) => format!("http://{plugin_run_id}"), // Network is not set, use host mode, and access the container from the POD using localhost None => "http://localhost".to_string(), } }; Ok(webserver_url) } pub fn get_container_id(item: &CreateItem, cli: &CliOptions, pod_owner: &str) -> Result<String> { let container_image = item .fields .get(CONTAINER_IMAGE) .and_then(|val| val.as_str()) .ok_or(Error { code: StatusCode::BAD_REQUEST, msg: format!("Could not get required field {CONTAINER_IMAGE}"), })?; let plugin_run_id = item.id.as_ref().ok_or(Error { code: StatusCode::BAD_REQUEST, msg: "Could not get required field Id".to_string(), })?; let container_id = if cli.use_kubernetes { format!( "c{}-{}-{}-{}", pod_owner .chars() .filter(|c| c.is_ascii_alphanumeric()) .take(10) .collect::<String>(), container_image .chars() .filter(|c| c.is_ascii_alphanumeric()) .take(20) .collect::<String>(), plugin_run_id .chars() .filter(|c| c.is_ascii_alphanumeric()) .take(20)
561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
.collect::<String>(), new_random_string(8) ) } else { format!( "{}-{}-{}", pod_owner .chars() .filter(|c| c.is_ascii_alphanumeric()) .collect::<String>(), container_image .chars() .filter(|c| c.is_ascii_alphanumeric()) .collect::<String>(), plugin_run_id .chars() .filter(|c| c.is_ascii_alphanumeric()) .collect::<String>() ) }; Ok(container_id) } /// Current limits are: /// Exactly one plugin o given kind running, /// Total number of plugins running: 5 fn check_kubernetes_limits( plugin: &PluginRunItem, pod_owner: &str, plugin_limit: usize, ) -> Result<()> { // The IO operation as well as output processing, might take some time, // move it out of the async context to not block the executor. // Yes, we are in the async context, even tough function is not marked async. let response = tokio::task::block_in_place(|| -> Result<K8SPodsInfo> { let owner_digest = format!("{:x}", md5::compute(pod_owner)); let args = [ "-l".to_string(), format!("owner={owner_digest}"), "--field-selector=status.phase=Running".to_string(), ]; let mut response = kubernetes_get_pods_info(&args)?; // We haven't found a way to filter by status Running OR Pending, so separate call need to be done let args = [ "-l".to_string(), format!("owner={owner_digest}"), "--field-selector=status.phase=Pending".to_string(), ]; let mut pending = kubernetes_get_pods_info(&args)?; response.items.append(&mut pending.items); Ok(response) })?; if response.items.len() > plugin_limit { Err(Error { code: StatusCode::BAD_REQUEST, msg: "You have reached max number of running plugins ".to_string(), }) } else if response.items.iter().any(|el| { el.spec .containers .first() .expect("k8s pod does not have a container") .image == plugin.container_image
631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
}) { Err(Error { code: StatusCode::BAD_REQUEST, msg: format!("Plugin from {} is already running", plugin.container_image), }) } else { Ok(()) } } /// Look for containers for given unique_id, kill them if any exist. /// Currently it's used for mitigating rate limiting twitter api has. fn k8s_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> { let Some(ref unique_id) = plugin.unique_id else { return Ok(()) }; info!( "Going to cleanup k8s containers with unique_id {}...", unique_id ); let args = vec![ "delete".to_string(), "pod".to_string(), "-l".to_string(), format!("{UNIQUE_ID}={unique_id}"), "--ignore-not-found=true".to_string(), "--now".to_string(), ]; let std_out = run_command("kubectl", args)?; debug!("Cleanup result: {std_out}"); Ok(()) } /// Look for containers for given unique_id, kill them if any exist. /// Currently it's used for mitigating rate limiting twitter api has. fn docker_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> { let Some(ref unique_id) = plugin.unique_id else { return Ok(()) }; info!( "Going to cleanup docker containers with unique_id {}...", unique_id ); // Get containers with given label let args = vec![ "ps".to_string(), "--filter".to_string(), format!("label={UNIQUE_ID}={unique_id}"), "-q".to_string(), ]; let std_out = run_command("docker", args)?; let running_containers: Vec<&str> = std_out.split_terminator('\n').collect(); debug!("Running containers with unique_id {unique_id}: {running_containers:?}"); // Stop containers with given label let args = ["container", "stop"] .iter() .chain(running_containers.iter()); let _ = run_command("docker", args)?; Ok(()) }
701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
/// Runs command, waits for finish, returns Ok(std_out) or Err(std_err) fn run_command<I, S>(command: &str, args: I) -> Result<String> where I: IntoIterator<Item = S> + Debug, S: AsRef<OsStr>, { // The IO operation as well as output processing, might take some time, // move it out of the async context to not block the executor. // Yes, we are in the async context, even tough function is not marked async. tokio::task::block_in_place(|| { let cmd_with_args = format!("{command} {args:?}"); info!("Running command {cmd_with_args}"); let output = Command::new(command) .args(args) .output() .context_str("failed to start command")?; if output.status.success() { let std_out = std::str::from_utf8(&output.stdout) .context_str("failed to retrieve command stdout")?; Ok(std_out.to_string()) } else { let std_err = std::str::from_utf8(&output.stderr) .context_str("failed to retrieve command stderr")?; warn!("Command {cmd_with_args} stderr {std_err}"); Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!("command failed, {std_err}"), }) } }) } #[derive(Deserialize, Debug)] struct K8SPodsInfo { items: Vec<Item>, } #[derive(Deserialize, Debug)] struct Item { spec: Spec, } #[derive(Deserialize, Debug)] struct Spec { containers: Vec<Container>, } #[derive(Deserialize, Debug)] struct Container { image: String, } fn kubernetes_get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> { let mut args = vec![ "get".to_string(), "pods".to_string(), "-o".to_string(), "json".to_string(), ]; args.extend_from_slice(filter); debug!("Running kubectl with args {args:?}"); let output = Command::new("kubectl") .args(&args) .output() .context_str("failed to start k8s command")?;
771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
if !output.status.success() { let std_err = std::str::from_utf8(&output.stderr).context_str("failed to retrieve kubectl stderr")?; warn!("k8s stderr {std_err}"); return Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!("k8s command failed, {std_err}"), }); } match serde_json::from_slice::<K8SPodsInfo>(&output.stdout) { Ok(response) => { debug!("kubectl response: {response:#?}"); Ok(response) } Err(e) => { warn!("k8s stdout {:?}", std::str::from_utf8(&output.stdout)); Err(e.into()) } } } fn find_plugins<T>( conn: &mut PooledConnection, filter: &[T], ) -> Result<Vec<ItemWithBase<PluginRunItem>>> where T: AsRef<PluginIdentifier>, { in_read_transaction(conn, move |tx| { let schema = database_api::get_schema(tx)?; let query = Search { _type: Some("PluginRun".to_string()), deleted: Some(false), limit: u64::MAX, ..Default::default() }; let mut plugins = search(tx, &schema, query)? .into_iter() .map(|element| { serde_json::from_value::<ItemWithBase<PluginRunItem>>(element).unwrap_or_else( |err| panic!("Unable to deserialize value into PluginRun, reason {err:?}"), ) }) .collect::<Vec<_>>(); // Narrow the result to requested plugins if !filter.is_empty() { plugins.retain(|plugin| { filter.iter().any(|plugin_id| match plugin_id.as_ref() { PluginIdentifier::Id(id) => &plugin.base.id == id, PluginIdentifier::Alias(alias) => { if let Some(ref plugin_alias) = plugin.item.plugin_alias { plugin_alias == alias } else { false } } }) }) } Ok(plugins) }) } pub async fn get_plugins_status(
841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
conn: &mut PooledConnection, cli: &CliOptions, payload: &PluginStatusReq, ) -> Result<HashMap<String, PluginStatusMetadata>> { debug!("Get status for {payload:?}"); let plugins = find_plugins(conn, payload.plugins.as_ref())?; let client = reqwest::Client::new(); let statuses = plugins.into_iter().map(|el| async { let status = query_for_status(&client, &el, cli).await; ( el.base.id, PluginStatusMetadata { status, alias: el.item.plugin_alias, }, ) }); let statuses = future::join_all(statuses).await; debug!("Plugin statuses {statuses:#?}"); Ok(statuses.into_iter().collect()) } async fn query_for_status( client: &reqwest::Client, plugin_instance: &ItemWithBase<PluginRunItem>, cli: &CliOptions, ) -> PluginStatus { let endpoint = format!( "{}/v1/health", get_plugin_base_endpoint(&plugin_instance.item, cli) ); match client.get(endpoint).send().await { Ok(response) => { // Able to reach the webserver and get the response back // TODO: to be more granular in the future let plugin_state = match response.json::<Value>().await { Ok(state) => state, Err(_e) => { json!("UnknownState") } }; PluginStatus::Running(plugin_state) } Err(_) => PluginStatus::Unreachable, } } pub fn get_plugin_base_endpoint(plugin_instance: &PluginRunItem, cli: &CliOptions) -> String { let host = &plugin_instance.webserver_url; if cli.use_kubernetes && cli.plugins_public_domain.is_some() { // On kubernetes reach the plugin via 80/443 port host.to_string() } else { let port = plugin_instance.webserver_port; format!("{host}:{port}") } } pub async fn get_plugin_api( conn: &mut PooledConnection, cli: &CliOptions,
911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
payload: &PluginGetApiReq, ) -> Result<PluginGetApiRes> { debug!("Get plugin API for {:?}", payload.id); let plugin = find_plugins(conn, &[&payload.id])?.pop().ok_or(Error { code: StatusCode::BAD_REQUEST, msg: format!("failed to find plugin with id {:?}", payload.id), })?; let client = reqwest::Client::new(); let api = query_for_api(&client, &plugin, cli).await?; debug!("Plugin api {api:#?}"); Ok(api) } async fn query_for_api( client: &reqwest::Client, plugin_instance: &ItemWithBase<PluginRunItem>, cli: &CliOptions, ) -> Result<PluginGetApiRes> { const API: &str = "/v1/api"; let endpoint = format!( "{}{API}", get_plugin_base_endpoint(&plugin_instance.item, cli) ); let resp = client .get(endpoint) .send() .await .context(|| format!("cannot reach webapi while accessing {API}"))?; if resp.status().is_success() { resp.json::<PluginGetApiRes>() .await .context(|| format!("failed to parse {API} response")) } else { Err(Error { code: StatusCode::INTERNAL_SERVER_ERROR, msg: format!( "Unsuccessful call to {API}, status {}, reason {:#?}", resp.status(), resp.bytes().await ), }) } } pub async fn call_plugin_api( conn: &mut PooledConnection, cli: &CliOptions, payload: &PluginCallApiReq, ) -> Result<PluginCallApiRes> { debug!("Call plugin API for {:?}", payload.id); let plugin = find_plugins(conn, &[&payload.id])?.pop().ok_or(Error { code: StatusCode::BAD_REQUEST, msg: format!("failed to find plugin with id {:?}", payload.id), })?; let endpoint = format!( "{}{}", get_plugin_base_endpoint(&plugin.item, cli), payload.endpoint ); let client = reqwest::Client::new();
98198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
let req = client .request((&payload.method).into(), endpoint) .json(&payload.json_body) .query(&payload.query) .build()?; debug!("Request: {req:#?}"); debug!( "body: {:#}", payload.json_body.clone().unwrap_or_else(|| json!("None")) ); let resp = client .execute(req) .await .context_str("cannot reach webapi")?; let resp = PluginCallApiRes { status_code: resp.status().as_u16(), body: resp .json::<Value>() .await .context_str("failed to parse response from plugin api call")?, }; debug!("response {resp:#?}"); Ok(resp) }