Commit 1ab51b98 authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

Merge branch 'plugin-limits' into 'dev'

Plugin limits

See merge request memri/pod!349
parents 73447cc5 268ab252
Showing with 250 additions and 25 deletions
+250 -25
......@@ -978,6 +978,7 @@ dependencies = [
"lazy_static",
"lettre",
"libc",
"md5",
"num_cpus",
"oauth-client",
"oauth2",
......@@ -1048,6 +1049,12 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a3e378b66a060d48947b590737b30a1be76706c8dd7b8ba0f2fe3989c68a853f"
[[package]]
name = "md5"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
[[package]]
name = "memchr"
version = "2.5.0"
......
......@@ -50,6 +50,7 @@ graphql-parser = { git = "https://github.com/memri/graphql-parser" }
reqwest = { version = "0.11.11", features = ["blocking", "json"] }
num_cpus = "1.13.1"
oauth2 = "4.2.3"
md5 = "0.7.0"
[dev-dependencies]
criterion = "0.3.5"
......
......@@ -7,6 +7,8 @@ pub fn default_cli() -> CliOptions {
port: 3030,
owners: "ANY".to_string(),
use_kubernetes: false,
kubernetes_plugin_limit_per_owner: 5,
kubernetes_ignore_limits: false,
plugins_callback_address: None,
plugins_public_domain: None,
plugins_docker_network: None,
......
......@@ -38,6 +38,19 @@ pub struct CliOptions {
#[structopt(long)]
pub use_kubernetes: bool,
/// Sets maximum number of plugins that can be run at once per one POD owner.
#[structopt(
long,
name = "K8S_LIMIT",
env = "POD_KUBERNETES_PLUGIN_LIMIT_PER_OWNER",
default_value = "5"
)]
pub kubernetes_plugin_limit_per_owner: usize,
/// Ignores limits on kubernetes containers, it takes precedence to 'kubernetes-plugin-limit-per-owner' flag
#[structopt(long, env = "POD_KUBERNETES_IGNORE_LIMITS")]
pub kubernetes_ignore_limits: bool,
/// Set the callback address for plugins launched from within Pod.
/// This should be the Pod-s address as seen by external plugins.
/// It defaults to "pod:3030" if Pod is inside docker,
......@@ -177,6 +190,8 @@ pub mod tests {
port: 3030,
owners: "ANY".to_string(),
use_kubernetes: false,
kubernetes_plugin_limit_per_owner: 5,
kubernetes_ignore_limits: false,
plugins_callback_address: None,
plugins_public_domain: None,
plugins_docker_network: None,
......
......@@ -213,6 +213,27 @@ where
}
}
impl From<std::io::Error> for Error {
fn from(err: std::io::Error) -> Self {
let msg = format!("IO error {}", err);
Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg,
}
}
}
impl From<std::str::Utf8Error> for Error {
fn from(err: std::str::Utf8Error) -> Self {
let msg = format!("UTF8 parse error {}", err);
Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg,
}
}
}
pub trait ErrorContext<T> {
fn context<F>(self, context_add: F) -> Result<T>
where
......
......@@ -3,18 +3,26 @@ use crate::{
command_line_interface::CliOptions,
database_api,
db_model::{PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, Result},
error::{Error, ErrorContext, Result},
internal_api,
internal_api::{new_random_string, search},
plugin_auth_crypto::DatabaseKey,
schema::Schema,
};
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{collections::HashMap, process::Command, sync::atomic::AtomicU32};
use std::{
collections::HashMap,
process::{Child, Command},
sync::atomic::AtomicU32,
};
use tracing::{debug, info, warn};
use warp::http::status::StatusCode;
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
/// to the containerization / deployment processes.
///
......@@ -66,6 +74,14 @@ pub fn run_plugin_container(
cli_options,
)
} else if cli_options.use_kubernetes {
if !cli_options.kubernetes_ignore_limits {
check_kubernetes_limits(
plugin,
pod_owner,
cli_options.kubernetes_plugin_limit_per_owner,
)?;
}
run_kubernetes_container(
&target_item_json,
pod_owner,
......@@ -105,7 +121,7 @@ fn run_local_script(
env_vars.insert("POD_OWNER", pod_owner);
env_vars.insert("POD_AUTH_JSON", pod_auth);
run_any_command(plugin_path, &args, &env_vars, triggered_by_item_id)
run_any_command(plugin_path, &args, &env_vars, triggered_by_item_id).map(|_c| ())
}
// Example:
......@@ -156,9 +172,15 @@ fn run_docker_container(
plugin.container_image.clone(),
];
let envs: HashMap<&str, &str> = HashMap::new();
run_any_command("docker", &args, &envs, triggered_by_item_id)
run_any_command("docker", &args, &envs, triggered_by_item_id).map(|_c| ())
}
static IMPORTERS_PLUGINS: [&str; 2] = [
"gitlab.memri.io:5050/memri/plugins/whatsapp-multi-device",
"gitlab.memri.io:5050/memri/plugins/twitter",
];
const K8S_CONTAINER_PORT: u32 = 8080;
/// Example:
/// kubectl run $owner-$containerImage-$targetItem-$randomHex
/// --image="$containerImage" \
......@@ -174,26 +196,67 @@ fn run_kubernetes_container(
plugin: &PluginRunItem,
cli_options: &CliOptions,
) -> Result<()> {
let resources = if cli_options.kubernetes_ignore_limits {
json!({})
} else if IMPORTERS_PLUGINS
.iter()
.any(|importer| &plugin.container_image == importer)
{
// Importer plugin
json!({
"requests": {"cpu":"0.5", "memory": "200Mi"},
"limits": {"cpu":"1", "memory": "500Mi"}
})
} else {
// Non-importer like plugin
json!({
"requests": {"cpu":"1","memory": "2Gi",},
"limits": {"cpu":"2", "memory": "3.5Gi"}
})
};
let overrides = json!({
"apiVersion": "v1",
"kind": "Pod",
"spec": {
"containers": [{
"name": plugin.container_id,
"image": plugin.container_image,
"ports": [{ "containerPort": K8S_CONTAINER_PORT, "protocol": "TCP" }],
"env": [
{"name": "POD_FULL_ADDRESS", "value": callback_address(cli_options, false)},
{"name": "POD_TARGET_ITEM", "value":target_item_json},
{"name": "POD_PLUGINRUN_ID", "value":triggered_by_item_id},
{"name": "POD_OWNER", "value":pod_owner},
{"name": "POD_AUTH_JSON", "value":pod_auth},
{"name": "PLUGIN_DNS", "value":plugin.webserver_url},
{"name": "PYTHONUNBUFFERED", "value":"1"}
],
"imagePullPolicy":"Always",
"resources": resources
}]
}
});
let args: Vec<String> = vec![
"run".to_string(),
"--restart=Never".to_string(),
plugin.container_id.clone(),
format!("--labels=app={},type=plugin", plugin.container_id),
"--port=8080".to_string(),
"--image-pull-policy=Always".to_string(),
format!("--image={}", plugin.container_image),
format!(
"--env=POD_FULL_ADDRESS={}",
callback_address(cli_options, false)
"--labels=app={},type=plugin,owner={:x}",
plugin.container_id,
// Owner key exceeds a limit of number of characters that k8s label can keep.
// Make it shorter by doing a md5 hash.
md5::compute(pod_owner)
),
format!("--env=POD_TARGET_ITEM={}", target_item_json),
format!("--env=POD_PLUGINRUN_ID={}", triggered_by_item_id),
format!("--env=POD_OWNER={}", pod_owner),
format!("--env=POD_AUTH_JSON={}", pod_auth),
format!("--env=PLUGIN_DNS={}", plugin.webserver_url),
"--env=PYTHONUNBUFFERED=1".to_string(),
format!("--port={K8S_CONTAINER_PORT}"),
"--image-pull-policy=Always".to_string(),
format!("--image={}", plugin.container_image),
format!("--overrides={}", overrides),
];
run_any_command("kubectl", &args, &HashMap::new(), triggered_by_item_id)
run_any_command("kubectl", &args, &HashMap::new(), triggered_by_item_id).map(|_c| ())
}
pub fn get_logs(tx: &Transaction, plugin_run_id: &str, cli_options: &CliOptions) -> Result<Value> {
......@@ -245,7 +308,7 @@ fn run_any_command(
args: &[String],
envs: &HashMap<&str, &str>,
container_id: &str,
) -> Result<()> {
) -> Result<Child> {
let debug_envs = envs
.iter()
.map(|(a, b)| format!("{}={} ", escape_bash_arg(a), escape_bash_arg(b)))
......@@ -260,12 +323,13 @@ fn run_any_command(
let command = Command::new(cmd).args(args).envs(envs).spawn();
match command {
Ok(_child) => {
Ok(child) => {
debug!(
"Successfully started {} process for Plugin container item {}",
cmd, container_id
);
Ok(())
Ok(child)
}
Err(err) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
......@@ -336,7 +400,7 @@ pub fn get_free_port(cli: &CliOptions) -> u32 {
|| cli.plugins_docker_network.is_some()
{
// Container have unique IPs, same port might be used
return 8080;
return K8S_CONTAINER_PORT;
}
// Container use host network stack, give unique port number
......@@ -441,3 +505,115 @@ pub fn get_container_id(item: &CreateItem, cli: &CliOptions, pod_owner: &str) ->
Ok(container_id)
}
/// Current limits are:
/// Exactly one plugin o given kind running,
/// Total number of plugins running: 5
fn check_kubernetes_limits(
plugin: &PluginRunItem,
pod_owner: &str,
plugin_limit: usize,
) -> Result<()> {
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
let response = tokio::task::block_in_place(|| -> Result<K8SPodsInfo> {
let owner_digest = format!("{:x}", md5::compute(pod_owner));
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Running".to_string(),
];
let mut response = kubernetes_get_pods_info(&args)?;
// We haven't found a way to filter by status Running OR Pending, so separate call need to be done
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Pending".to_string(),
];
let mut pending = kubernetes_get_pods_info(&args)?;
response.items.append(&mut pending.items);
Ok(response)
})?;
if response.items.len() > plugin_limit {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: "You have reached max number of running plugins ".to_string(),
})
} else if response.items.iter().any(|el| {
el.spec
.containers
.first()
.expect("k8s pod does not have a container")
.image
== plugin.container_image
}) {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Plugin from {} is already running", plugin.container_image),
})
} else {
Ok(())
}
}
#[derive(Deserialize, Debug)]
struct K8SPodsInfo {
items: Vec<Item>,
}
#[derive(Deserialize, Debug)]
struct Item {
spec: Spec,
}
#[derive(Deserialize, Debug)]
struct Spec {
containers: Vec<Container>,
}
#[derive(Deserialize, Debug)]
struct Container {
image: String,
}
fn kubernetes_get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> {
let mut args = vec![
"get".to_string(),
"pods".to_string(),
"-o".to_string(),
"json".to_string(),
];
args.extend_from_slice(filter);
debug!("Running kubectl with args {args:?}");
let output = Command::new("kubectl")
.args(&args)
.output()
.context_str("failed to start k8s command")?;
if !output.status.success() {
let std_err =
std::str::from_utf8(&output.stderr).context_str("failed to retrieve kubectl stderr")?;
warn!("k8s stderr {std_err}");
return Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("k8s command failed, {std_err}"),
});
}
match serde_json::from_slice::<K8SPodsInfo>(&output.stdout) {
Ok(response) => {
debug!("kubectl response: {response:#?}");
Ok(response)
}
Err(e) => {
warn!("k8s stdout {:?}", std::str::from_utf8(&output.stdout));
Err(e.into())
}
}
}
......@@ -186,14 +186,17 @@ pub fn actions_after_item_create(
}
Ok(())
})?;
})
} else {
Ok(())
}
}
Err(e) => error!("Failed upon item triggering, reason {e}"),
Err(e) => {
error!("Failed upon item triggering, reason {e}");
Err(e)
}
}
Ok(())
}
/// After item is in the DB do appropriate actions.
/// Invariants:
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment