-
Szymon Zimnowoda authored
Added uniqueId for PluginRun, that, if set will enforce one and only one plugin with given id be running. It's a QnD workaround for twitter API rate limiting.
3a2b1cd1
use crate::{
api_model::{
CreateItem, PluginCallApiReq, PluginCallApiRes, PluginGetApiReq, PluginGetApiRes,
PluginIdentifier, PluginStatus, PluginStatusMetadata, PluginStatusReq, Search,
},
command_line_interface::CliOptions,
database_api::{self, in_read_transaction},
database_pool::PooledConnection,
db_model::{ItemWithBase, PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, ErrorContext, Result},
internal_api,
internal_api::{new_random_string, search},
plugin_auth_crypto::DatabaseKey,
schema::Schema,
};
use futures::future;
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
collections::HashMap,
ffi::OsStr,
fmt::Debug,
process::{Child, Command},
sync::atomic::AtomicU32,
time::Duration,
};
use tracing::{debug, info, warn};
use warp::http::status::StatusCode;
use duct;
const UNIQUE_ID: &str = "unique_id";
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
/// to the containerization / deployment processes.
///
/// Internally passes to docker / kubernetes / scripts
/// depending on how Pod is configured by the user.
#[allow(clippy::too_many_arguments)]
pub fn run_plugin_container(
tx: &Transaction,
schema: &Schema,
triggered_by_item_id: &str,
plugin: &PluginRunItem,
pod_owner: &str,
database_key: &DatabaseKey,
cli_options: &CliOptions,
) -> Result<()> {
info!(
"Trying to run plugin container for target_item_id {}",
plugin.target_item_id
);
let target_item = internal_api::get_item_tx(tx, schema, plugin.target_item_id.as_str())?;
let target_item = target_item.into_iter().next().ok_or_else(|| Error {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Failed to find target item {} to run a plugin against",
plugin.target_item_id
),
})?;
// TODO: that should be not passed to the plugin, plugin should request that data from the DB
// passing plugin run id is enough.
let target_item_json = serde_json::to_string(&target_item)?;
let auth = database_key.create_plugin_auth()?;
let auth = serde_json::to_string(&auth)?;
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
let script_override = cli_options
.insecure_plugin_script
.iter()
.find(|(image, _script)| image == &plugin.container_image)
.map(|(_image, script)| script);
if let Some(script_path) = script_override {
run_local_script(
&plugin.container_image,
script_path,
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
cli_options,
)
} else if cli_options.use_kubernetes {
if !cli_options.kubernetes_ignore_limits {
check_kubernetes_limits(
plugin,
pod_owner,
cli_options.kubernetes_plugin_limit_per_owner,
)?;
}
if let Err(e) = k8s_enforce_unique_instance(plugin) {
warn!("Enforcing K8s container unique instance failed because: {e:?}");
}
run_kubernetes_container(
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
plugin,
cli_options,
)
} else {
if let Err(e) = docker_enforce_unique_instance(plugin) {
warn!("Enforcing docker container unique instance failed because: {e:?}");
}
run_docker_container(
&target_item_json,
pod_owner,
&auth,
triggered_by_item_id,
plugin,
cli_options,
)
}
}
fn run_local_script(
_container: &str,
plugin_path: &str,
target_item: &str,
pod_owner: &str,
pod_auth: &str,
triggered_by_item_id: &str,
cli_options: &CliOptions,
) -> Result<()> {
let pod_full_address = callback_address(cli_options, false);
let args: Vec<String> = Vec::new();
let mut env_vars = HashMap::new();
env_vars.insert("POD_FULL_ADDRESS", pod_full_address.as_str());
env_vars.insert("POD_TARGET_ITEM", target_item);
env_vars.insert("POD_PLUGINRUN_ID", triggered_by_item_id);
env_vars.insert("POD_OWNER", pod_owner);
env_vars.insert("POD_AUTH_JSON", pod_auth);
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
run_any_command(plugin_path, &args, &env_vars, triggered_by_item_id)
}
// Example:
// docker run \
// --network=host \
// --env=POD_FULL_ADDRESS="http://localhost:3030" \
// --env=POD_TARGET_ITEM="{...json...}" \
// --env=POD_OWNER="...64-hex-chars..." \
// --env=POD_AUTH_JSON="{...json...}" \
//
// --name="$containerImage-$trigger_item_id" \
// --rm \
// -- \
// "$containerImage"
fn run_docker_container(
target_item_json: &str,
pod_owner: &str,
pod_auth: &str,
triggered_by_item_id: &str,
plugin: &PluginRunItem,
cli_options: &CliOptions,
) -> Result<()> {
#[cfg(target_os = "macos")]
let os_specific_args = [
"--platform".to_string(),
"linux/amd64".to_string(),
// macos docker does not support --network=host
// we are exposing the port to the host instead
"-p".to_string(),
format!("{}:{}", plugin.webserver_port, plugin.webserver_port),
];
#[cfg(target_os = "linux")]
let os_specific_args = {
let docker_network = match &cli_options.plugins_docker_network {
Some(net) => net.to_string(),
None => "host".to_string(),
};
[format!("--network={}", docker_network)]
};
let mut args: Vec<String> = vec![
"run".to_string(),
"--rm".to_string(),
"--pull=always".to_string(),
format!("-h={triggered_by_item_id}"),
format!(
"--env=POD_FULL_ADDRESS={}",
callback_address(cli_options, true)
),
format!("--env=POD_TARGET_ITEM={}", target_item_json),
format!("--env=POD_PLUGINRUN_ID={}", triggered_by_item_id),
format!("--env=POD_OWNER={}", pod_owner),
format!("--env=POD_AUTH_JSON={}", pod_auth),
// Makes logs from the Python visible on the POD console immediately
"--env=PYTHONUNBUFFERED=1".to_string(),
format!(
"--name={}",
sanitize_docker_name(plugin.container_id.as_str())
),
];
args.extend_from_slice(&os_specific_args);
if let Some(ref unique_id) = plugin.unique_id {
args.extend_from_slice(&["--label".to_string(), format!("{UNIQUE_ID}={unique_id}")]);
}
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
args.extend_from_slice(&["--".to_string(), plugin.container_image.clone()]);
let envs: HashMap<&str, &str> = HashMap::new();
run_any_command("docker", &args, &envs, triggered_by_item_id)
}
static IMPORTERS_PLUGINS: [&str; 3] = [
"gitlab.memri.io:5050/memri/plugins/whatsapp-multi-device",
"gitlab.memri.io:5050/memri/plugins/twitter",
"registry.digitalocean.com/polis/memri-plugins",
];
const K8S_CONTAINER_PORT: u32 = 8080;
/// Example:
/// kubectl run $owner-$containerImage-$targetItem-$randomHex
/// --image="$containerImage" \
/// --env=POD_FULL_ADDRESS="http://localhost:3030" \
/// --env=POD_TARGET_ITEM="{...json...}" \
/// --env=POD_OWNER="...64-hex-chars..." \
/// --env=POD_AUTH_JSON="{...json...}" \
fn run_kubernetes_container(
target_item_json: &str,
pod_owner: &str,
pod_auth: &str,
triggered_by_item_id: &str,
plugin: &PluginRunItem,
cli_options: &CliOptions,
) -> Result<()> {
let resources = if cli_options.kubernetes_ignore_limits {
json!({})
} else if IMPORTERS_PLUGINS
.iter()
.any(|importer| plugin.container_image.contains(importer))
{
// Importer plugin
json!({
"requests": {"cpu":"0.5", "memory": "200Mi"},
"limits": {"cpu":"1", "memory": "500Mi"}
})
} else {
// Non-importer like plugin
json!({
"requests": {"cpu":"1","memory": "2Gi",},
"limits": {"cpu":"2", "memory": "3.5Gi"}
})
};
let overrides = json!({
"apiVersion": "v1",
"kind": "Pod",
"spec": {
"containers": [{
"name": plugin.container_id,
"image": plugin.container_image,
"ports": [{ "containerPort": K8S_CONTAINER_PORT, "protocol": "TCP" }],
"env": [
{"name": "POD_FULL_ADDRESS", "value": callback_address(cli_options, false)},
{"name": "POD_TARGET_ITEM", "value":target_item_json},
{"name": "POD_PLUGINRUN_ID", "value":triggered_by_item_id},
{"name": "POD_OWNER", "value":pod_owner},
{"name": "POD_AUTH_JSON", "value":pod_auth},
{"name": "PLUGIN_DNS", "value":plugin.webserver_url},
{"name": "PYTHONUNBUFFERED", "value":"1"}
],
"imagePullPolicy":"Always",
"resources": resources
}]
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
}
});
let mut labels = format!(
"app={},type=plugin,owner={:x}",
plugin.container_id,
// Owner key exceeds a limit of number of characters that k8s label can keep.
// Make it shorter by doing a md5 hash.
md5::compute(pod_owner)
);
if let Some(ref unique_id) = plugin.unique_id {
labels = format!("{labels},{UNIQUE_ID}={unique_id}");
}
let args: Vec<String> = vec![
"run".to_string(),
"--restart=Never".to_string(),
plugin.container_id.clone(),
format!("--labels={labels}"),
format!("--port={K8S_CONTAINER_PORT}"),
"--image-pull-policy=Always".to_string(),
format!("--image={}", plugin.container_image),
format!("--overrides={}", overrides),
];
run_any_command("kubectl", &args, &HashMap::new(), triggered_by_item_id)
}
pub fn get_logs(tx: &Transaction, plugin_run_id: &str, cli_options: &CliOptions) -> Result<Value> {
let schema = database_api::get_schema(tx).unwrap();
let json = json!({
"id": plugin_run_id,
});
let query = serde_json::from_value(json).unwrap();
let result = search(tx, &schema, query).unwrap();
let result = result.into_iter().next().unwrap();
let container_id = result.get("containerId").unwrap().as_str().unwrap();
let args: Vec<String> = vec!["logs".to_string(), container_id.to_string()];
if cli_options.use_kubernetes {
//kubernetes logs
let cmd = "kubectl".to_string();
let logs = run_any_command_for_logs(cmd, args);
Ok(json!({
"logs": logs,
}))
} else {
//docker logs
let cmd = "docker".to_string();
let logs = run_any_command_for_logs(cmd, args);
Ok(json!({
"logs": logs,
}))
}
}
fn run_any_command_for_logs(cmd: String, args: Vec<String>) -> String {
let debug_args = args
.iter()
.map(|p| escape_bash_arg(p))
.collect::<Vec<_>>()
.join(" ");
info!("Getting logs {} {}", cmd, debug_args);
duct::cmd(cmd, args)
.stderr_to_stdout()
.read()
.expect("failed to fetch logs")
351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
}
fn run_any_command(
cmd: &str,
args: &[String],
envs: &HashMap<&str, &str>,
container_id: &str,
) -> Result<()> {
let debug_envs = envs
.iter()
.map(|(a, b)| format!("{}={} ", escape_bash_arg(a), escape_bash_arg(b)))
.collect::<Vec<_>>()
.join("");
let debug_args = args
.iter()
.map(|p| escape_bash_arg(p))
.collect::<Vec<_>>()
.join(" ");
info!("Starting command {}{} {}", debug_envs, cmd, debug_args);
tokio::task::block_in_place(|| {
let command = Command::new(cmd).args(args).envs(envs).spawn();
match command {
Ok(mut child) => {
debug!(
"Successfully started {} process for Plugin container item {}",
cmd, container_id
);
// Give some time for command to execute
std::thread::sleep(Duration::from_secs(1));
check_command_status(&mut child)
}
Err(err) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!(
"Failed to execute {}, a plugin container triggered by item.rowid {}, {}",
cmd, container_id, err
),
}),
}
})
}
fn check_command_status(child: &mut Child) -> Result<()> {
match child.try_wait() {
Ok(Some(status)) => {
info!("Command exited with status {status}");
if status.success() {
Ok(())
} else {
Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Command execution failed with status {status}"),
})
}
}
Ok(None) => {
// Command still executes, fetches an image, or is already running user code
Ok(())
}
Err(e) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Error while attempting to wait for command status, error {e:?}"),
}),
}
}
/// Determine the callback address to use for plugins.
421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490
/// A callback address is an environment variable passed to the Plugin
/// so that the plugin can call back Pod when it needs to.
fn callback_address(cli_options: &CliOptions, docker_magic: bool) -> String {
if let Some(address_override) = &cli_options.plugins_callback_address {
address_override.to_string()
} else {
// The plugin container needs to have access to the host
// This is currently done differently on MacOS and Linux
// https://stackoverflow.com/questions/24319662/from-inside-of-a-docker-container-how-do-i-connect-to-the-localhost-of-the-mach
let pod_domain = if docker_magic && !cfg!(target_os = "linux") {
"host.docker.internal"
} else {
"localhost"
};
let is_https = cli_options.insecure_non_tls.is_none() && !cli_options.non_tls;
let schema = if is_https { "https" } else { "http" };
format!("{}://{}:{}", schema, pod_domain, cli_options.port)
}
}
pub fn sanitize_docker_name(input: &str) -> String {
input
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || "-_".contains(c) {
c
} else {
'_'
}
})
.collect()
}
/// Debug-print a bash argument.
/// Never use this for running real code, but for debugging that's good enough.
///
/// From bash manual:
/// > Enclosing characters in single quotes preserves the literal value of each character
/// > within the quotes. A single quote may not occur between single quotes,
/// > even when preceded by a backslash.
pub fn escape_bash_arg(str: &str) -> String {
let ok = str
.chars()
.all(|c| c.is_ascii_alphanumeric() || "_-+=%".contains(c));
if ok {
str.to_string()
} else {
let quoted = str.replace('\'', "'\\''"); // end quoting, append the literal, start quoting
format!("'{}'", quoted)
}
}
/// Get next available port number for PluginRun instance
pub fn get_free_port(cli: &CliOptions) -> u32 {
if (cli.use_kubernetes && cli.plugins_public_domain.is_some())
|| cli.plugins_docker_network.is_some()
{
// Container have unique IPs, same port might be used
return K8S_CONTAINER_PORT;
}
// Container use host network stack, give unique port number
static FREE_PORT_NUMBER: AtomicU32 = AtomicU32::new(8008);
FREE_PORT_NUMBER.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
/// Figure out correct URL depending on the environnement the Plugin will be run on.
pub fn resolve_webserver_url(item: &CreateItem, cli: &CliOptions) -> Result<String> {
let plugin_run_id = item.id.as_ref().ok_or(Error {
code: StatusCode::BAD_REQUEST,
491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560
msg: "Could not get required field Id".to_string(),
})?;
let container_id = item
.fields
.get(CONTAINER_ID)
.and_then(|val| val.as_str())
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Could not get required field {CONTAINER_ID}"),
})?;
let webserver_url = if cli.use_kubernetes {
// Kubernetes
if let Some(base) = &cli.plugins_public_domain {
format!("https://{}.{}", container_id, base)
} else {
warn!("Plugin public DNS is not configured for the Pod but a kubernetes Plugin is run. Using localhost as the public domain...");
"http://localhost".to_string()
}
} else {
// Docker
match &cli.plugins_docker_network {
// There is a custom network provided, access the container from the POD using it's hostname,
// Which is the ID of PluginRun object in the Database.
// Note: it will work out of the box, if the POD is also running in the container,
// If POD runs on the host, additional DNS service must run to resolve containers hostnames to IPs
// The DNS can be executed by calling examples/run_docker_dns_service.sh
// TODO: use docker API to start such a service? That will change host network configuration.
Some(_net) => format!("http://{plugin_run_id}"),
// Network is not set, use host mode, and access the container from the POD using localhost
None => "http://localhost".to_string(),
}
};
Ok(webserver_url)
}
pub fn get_container_id(item: &CreateItem, cli: &CliOptions, pod_owner: &str) -> Result<String> {
let container_image = item
.fields
.get(CONTAINER_IMAGE)
.and_then(|val| val.as_str())
.ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Could not get required field {CONTAINER_IMAGE}"),
})?;
let plugin_run_id = item.id.as_ref().ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: "Could not get required field Id".to_string(),
})?;
let container_id = if cli.use_kubernetes {
format!(
"c{}-{}-{}-{}",
pod_owner
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.take(10)
.collect::<String>(),
container_image
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.take(20)
.collect::<String>(),
plugin_run_id
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.take(20)
561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630
.collect::<String>(),
new_random_string(8)
)
} else {
format!(
"{}-{}-{}",
pod_owner
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.collect::<String>(),
container_image
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.collect::<String>(),
plugin_run_id
.chars()
.filter(|c| c.is_ascii_alphanumeric())
.collect::<String>()
)
};
Ok(container_id)
}
/// Current limits are:
/// Exactly one plugin o given kind running,
/// Total number of plugins running: 5
fn check_kubernetes_limits(
plugin: &PluginRunItem,
pod_owner: &str,
plugin_limit: usize,
) -> Result<()> {
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
let response = tokio::task::block_in_place(|| -> Result<K8SPodsInfo> {
let owner_digest = format!("{:x}", md5::compute(pod_owner));
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Running".to_string(),
];
let mut response = kubernetes_get_pods_info(&args)?;
// We haven't found a way to filter by status Running OR Pending, so separate call need to be done
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Pending".to_string(),
];
let mut pending = kubernetes_get_pods_info(&args)?;
response.items.append(&mut pending.items);
Ok(response)
})?;
if response.items.len() > plugin_limit {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: "You have reached max number of running plugins ".to_string(),
})
} else if response.items.iter().any(|el| {
el.spec
.containers
.first()
.expect("k8s pod does not have a container")
.image
== plugin.container_image
631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700
}) {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Plugin from {} is already running", plugin.container_image),
})
} else {
Ok(())
}
}
/// Look for containers for given unique_id, kill them if any exist.
/// Currently it's used for mitigating rate limiting twitter api has.
fn k8s_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> {
let Some(ref unique_id) = plugin.unique_id else {
return Ok(())
};
info!(
"Going to cleanup k8s containers with unique_id {}...",
unique_id
);
let args = vec![
"delete".to_string(),
"pod".to_string(),
"-l".to_string(),
format!("{UNIQUE_ID}={unique_id}"),
"--ignore-not-found=true".to_string(),
"--now".to_string(),
];
let std_out = run_command("kubectl", args)?;
debug!("Cleanup result: {std_out}");
Ok(())
}
/// Look for containers for given unique_id, kill them if any exist.
/// Currently it's used for mitigating rate limiting twitter api has.
fn docker_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> {
let Some(ref unique_id) = plugin.unique_id else {
return Ok(())
};
info!(
"Going to cleanup docker containers with unique_id {}...",
unique_id
);
// Get containers with given label
let args = vec![
"ps".to_string(),
"--filter".to_string(),
format!("label={UNIQUE_ID}={unique_id}"),
"-q".to_string(),
];
let std_out = run_command("docker", args)?;
let running_containers: Vec<&str> = std_out.split_terminator('\n').collect();
debug!("Running containers with unique_id {unique_id}: {running_containers:?}");
// Stop containers with given label
let args = ["container", "stop"]
.iter()
.chain(running_containers.iter());
let _ = run_command("docker", args)?;
Ok(())
}
701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770
/// Runs command, waits for finish, returns Ok(std_out) or Err(std_err)
fn run_command<I, S>(command: &str, args: I) -> Result<String>
where
I: IntoIterator<Item = S> + Debug,
S: AsRef<OsStr>,
{
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
tokio::task::block_in_place(|| {
let cmd_with_args = format!("{command} {args:?}");
info!("Running command {cmd_with_args}");
let output = Command::new(command)
.args(args)
.output()
.context_str("failed to start command")?;
if output.status.success() {
let std_out = std::str::from_utf8(&output.stdout)
.context_str("failed to retrieve command stdout")?;
Ok(std_out.to_string())
} else {
let std_err = std::str::from_utf8(&output.stderr)
.context_str("failed to retrieve command stderr")?;
warn!("Command {cmd_with_args} stderr {std_err}");
Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("command failed, {std_err}"),
})
}
})
}
#[derive(Deserialize, Debug)]
struct K8SPodsInfo {
items: Vec<Item>,
}
#[derive(Deserialize, Debug)]
struct Item {
spec: Spec,
}
#[derive(Deserialize, Debug)]
struct Spec {
containers: Vec<Container>,
}
#[derive(Deserialize, Debug)]
struct Container {
image: String,
}
fn kubernetes_get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> {
let mut args = vec![
"get".to_string(),
"pods".to_string(),
"-o".to_string(),
"json".to_string(),
];
args.extend_from_slice(filter);
debug!("Running kubectl with args {args:?}");
let output = Command::new("kubectl")
.args(&args)
.output()
.context_str("failed to start k8s command")?;
771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840
if !output.status.success() {
let std_err =
std::str::from_utf8(&output.stderr).context_str("failed to retrieve kubectl stderr")?;
warn!("k8s stderr {std_err}");
return Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("k8s command failed, {std_err}"),
});
}
match serde_json::from_slice::<K8SPodsInfo>(&output.stdout) {
Ok(response) => {
debug!("kubectl response: {response:#?}");
Ok(response)
}
Err(e) => {
warn!("k8s stdout {:?}", std::str::from_utf8(&output.stdout));
Err(e.into())
}
}
}
fn find_plugins<T>(
conn: &mut PooledConnection,
filter: &[T],
) -> Result<Vec<ItemWithBase<PluginRunItem>>>
where
T: AsRef<PluginIdentifier>,
{
in_read_transaction(conn, move |tx| {
let schema = database_api::get_schema(tx)?;
let query = Search {
_type: Some("PluginRun".to_string()),
deleted: Some(false),
limit: u64::MAX,
..Default::default()
};
let mut plugins = search(tx, &schema, query)?
.into_iter()
.map(|element| {
serde_json::from_value::<ItemWithBase<PluginRunItem>>(element).unwrap_or_else(
|err| panic!("Unable to deserialize value into PluginRun, reason {err:?}"),
)
})
.collect::<Vec<_>>();
// Narrow the result to requested plugins
if !filter.is_empty() {
plugins.retain(|plugin| {
filter.iter().any(|plugin_id| match plugin_id.as_ref() {
PluginIdentifier::Id(id) => &plugin.base.id == id,
PluginIdentifier::Alias(alias) => {
if let Some(ref plugin_alias) = plugin.item.plugin_alias {
plugin_alias == alias
} else {
false
}
}
})
})
}
Ok(plugins)
})
}
pub async fn get_plugins_status(
841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
conn: &mut PooledConnection,
cli: &CliOptions,
payload: &PluginStatusReq,
) -> Result<HashMap<String, PluginStatusMetadata>> {
debug!("Get status for {payload:?}");
let plugins = find_plugins(conn, payload.plugins.as_ref())?;
let client = reqwest::Client::new();
let statuses = plugins.into_iter().map(|el| async {
let status = query_for_status(&client, &el, cli).await;
(
el.base.id,
PluginStatusMetadata {
status,
alias: el.item.plugin_alias,
},
)
});
let statuses = future::join_all(statuses).await;
debug!("Plugin statuses {statuses:#?}");
Ok(statuses.into_iter().collect())
}
async fn query_for_status(
client: &reqwest::Client,
plugin_instance: &ItemWithBase<PluginRunItem>,
cli: &CliOptions,
) -> PluginStatus {
let endpoint = format!(
"{}/v1/health",
get_plugin_base_endpoint(&plugin_instance.item, cli)
);
match client.get(endpoint).send().await {
Ok(response) => {
// Able to reach the webserver and get the response back
// TODO: to be more granular in the future
let plugin_state = match response.json::<Value>().await {
Ok(state) => state,
Err(_e) => {
json!("UnknownState")
}
};
PluginStatus::Running(plugin_state)
}
Err(_) => PluginStatus::Unreachable,
}
}
pub fn get_plugin_base_endpoint(plugin_instance: &PluginRunItem, cli: &CliOptions) -> String {
let host = &plugin_instance.webserver_url;
if cli.use_kubernetes && cli.plugins_public_domain.is_some() {
// On kubernetes reach the plugin via 80/443 port
host.to_string()
} else {
let port = plugin_instance.webserver_port;
format!("{host}:{port}")
}
}
pub async fn get_plugin_api(
conn: &mut PooledConnection,
cli: &CliOptions,
911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980
payload: &PluginGetApiReq,
) -> Result<PluginGetApiRes> {
debug!("Get plugin API for {:?}", payload.id);
let plugin = find_plugins(conn, &[&payload.id])?.pop().ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("failed to find plugin with id {:?}", payload.id),
})?;
let client = reqwest::Client::new();
let api = query_for_api(&client, &plugin, cli).await?;
debug!("Plugin api {api:#?}");
Ok(api)
}
async fn query_for_api(
client: &reqwest::Client,
plugin_instance: &ItemWithBase<PluginRunItem>,
cli: &CliOptions,
) -> Result<PluginGetApiRes> {
const API: &str = "/v1/api";
let endpoint = format!(
"{}{API}",
get_plugin_base_endpoint(&plugin_instance.item, cli)
);
let resp = client
.get(endpoint)
.send()
.await
.context(|| format!("cannot reach webapi while accessing {API}"))?;
if resp.status().is_success() {
resp.json::<PluginGetApiRes>()
.await
.context(|| format!("failed to parse {API} response"))
} else {
Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!(
"Unsuccessful call to {API}, status {}, reason {:#?}",
resp.status(),
resp.bytes().await
),
})
}
}
pub async fn call_plugin_api(
conn: &mut PooledConnection,
cli: &CliOptions,
payload: &PluginCallApiReq,
) -> Result<PluginCallApiRes> {
debug!("Call plugin API for {:?}", payload.id);
let plugin = find_plugins(conn, &[&payload.id])?.pop().ok_or(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("failed to find plugin with id {:?}", payload.id),
})?;
let endpoint = format!(
"{}{}",
get_plugin_base_endpoint(&plugin.item, cli),
payload.endpoint
);
let client = reqwest::Client::new();
98198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010
let req = client
.request((&payload.method).into(), endpoint)
.json(&payload.json_body)
.query(&payload.query)
.build()?;
debug!("Request: {req:#?}");
debug!(
"body: {:#}",
payload.json_body.clone().unwrap_or_else(|| json!("None"))
);
let resp = client
.execute(req)
.await
.context_str("cannot reach webapi")?;
let resp = PluginCallApiRes {
status_code: resp.status().as_u16(),
body: resp
.json::<Value>()
.await
.context_str("failed to parse response from plugin api call")?,
};
debug!("response {resp:#?}");
Ok(resp)
}