Commit e7b006e1 authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

wip strategy pattern?

No related merge requests found
Showing with 165 additions and 118 deletions
+165 -118
use std::process::Command;
use reqwest::StatusCode;
use serde::Deserialize;
use crate::db_model::PluginRunItem;
use crate::error::{Error, Result, ErrorContext};
use tracing::{debug, warn};
/// Current limits are:
/// Exactly one plugin o given kind running,
/// Total number of plugins running: 5
pub fn check_limits(
plugin: &PluginRunItem,
pod_owner: &str,
plugin_limit: usize,
) -> Result<()> {
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
let response = tokio::task::block_in_place(|| -> Result<K8SPodsInfo> {
let owner_digest = format!("{:x}", md5::compute(pod_owner));
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Running".to_string(),
];
let mut response = get_pods_info(&args)?;
// We haven't found a way to filter by status Running OR Pending, so separate call need to be done
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Pending".to_string(),
];
let mut pending = get_pods_info(&args)?;
response.items.append(&mut pending.items);
Ok(response)
})?;
if response.items.len() > plugin_limit {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: "You have reached max number of running plugins ".to_string(),
})
} else if response.items.iter().any(|el| {
el.spec
.containers
.first()
.expect("k8s pod does not have a container")
.image
== plugin.container_image
}) {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Plugin from {} is already running", plugin.container_image),
})
} else {
Ok(())
}
}
#[derive(Deserialize, Debug)]
struct K8SPodsInfo {
items: Vec<Item>,
}
#[derive(Deserialize, Debug)]
struct Item {
spec: Spec,
}
#[derive(Deserialize, Debug)]
struct Spec {
containers: Vec<Container>,
}
#[derive(Deserialize, Debug)]
struct Container {
image: String,
}
fn get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> {
let mut args = vec![
"get".to_string(),
"pods".to_string(),
"-o".to_string(),
"json".to_string(),
];
args.extend_from_slice(filter);
debug!("Running kubectl with args {args:?}");
let output = Command::new("kubectl")
.args(&args)
.output()
.context_str("failed to start k8s command")?;
if !output.status.success() {
let std_err =
std::str::from_utf8(&output.stderr).context_str("failed to retrieve kubectl stderr")?;
warn!("k8s stderr {std_err}");
return Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("k8s command failed, {std_err}"),
});
}
match serde_json::from_slice::<K8SPodsInfo>(&output.stdout) {
Ok(response) => {
debug!("kubectl response: {response:#?}");
Ok(response)
}
Err(e) => {
warn!("k8s stdout {:?}", std::str::from_utf8(&output.stdout));
Err(e.into())
}
}
}
......@@ -13,6 +13,7 @@ mod global_static;
mod graphql_utils;
pub mod internal_api;
pub mod json_utils;
mod kubernetes;
pub mod oauth1_api;
pub mod oauth2_api;
pub mod plugin_auth_crypto;
......
use crate::{
api_model::CreateItem,
command_line_interface::CliOptions,
command_line_interface::{CliOptions, PARSED},
database_api,
db_model::{PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, ErrorContext, Result},
error::{Error, Result},
internal_api,
internal_api::{new_random_string, search},
kubernetes,
plugin_auth_crypto::DatabaseKey,
schema::Schema,
};
use duct;
use lazy_static::lazy_static;
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::{json, Value};
use std::{
collections::HashMap,
......@@ -21,8 +23,44 @@ use std::{
use tracing::{debug, info, warn};
use warp::http::status::StatusCode;
use duct;
pub trait PluginExecutor : Sync {
}
pub struct Kubernetes;
impl Kubernetes {
pub fn new() -> Self {
Self
}
}
impl PluginExecutor for Kubernetes {
}
pub struct Docker;
impl Docker {
pub fn new() -> Self {
Self
}
}
impl PluginExecutor for Docker {
}
lazy_static!{
static ref EXECUTOR : Box<dyn PluginExecutor> =
if PARSED.use_kubernetes {
Box::new(Kubernetes::new())
} else {
Box::new(Docker::new())
};
}
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
......@@ -77,7 +115,7 @@ pub fn run_plugin_container(
)
} else if cli_options.use_kubernetes {
if !cli_options.kubernetes_ignore_limits {
check_kubernetes_limits(
kubernetes::check_limits(
plugin,
pod_owner,
cli_options.kubernetes_plugin_limit_per_owner,
......@@ -385,7 +423,7 @@ pub fn sanitize_docker_name(input: &str) -> String {
/// > Enclosing characters in single quotes preserves the literal value of each character
/// > within the quotes. A single quote may not occur between single quotes,
/// > even when preceded by a backslash.
pub fn escape_bash_arg(str: &str) -> String {
fn escape_bash_arg(str: &str) -> String {
let ok = str
.chars()
.all(|c| c.is_ascii_alphanumeric() || "_-+=%".contains(c));
......@@ -508,115 +546,3 @@ pub fn get_container_id(item: &CreateItem, cli: &CliOptions, pod_owner: &str) ->
Ok(container_id)
}
/// Current limits are:
/// Exactly one plugin o given kind running,
/// Total number of plugins running: 5
fn check_kubernetes_limits(
plugin: &PluginRunItem,
pod_owner: &str,
plugin_limit: usize,
) -> Result<()> {
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
let response = tokio::task::block_in_place(|| -> Result<K8SPodsInfo> {
let owner_digest = format!("{:x}", md5::compute(pod_owner));
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Running".to_string(),
];
let mut response = kubernetes_get_pods_info(&args)?;
// We haven't found a way to filter by status Running OR Pending, so separate call need to be done
let args = [
"-l".to_string(),
format!("owner={owner_digest}"),
"--field-selector=status.phase=Pending".to_string(),
];
let mut pending = kubernetes_get_pods_info(&args)?;
response.items.append(&mut pending.items);
Ok(response)
})?;
if response.items.len() > plugin_limit {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: "You have reached max number of running plugins ".to_string(),
})
} else if response.items.iter().any(|el| {
el.spec
.containers
.first()
.expect("k8s pod does not have a container")
.image
== plugin.container_image
}) {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Plugin from {} is already running", plugin.container_image),
})
} else {
Ok(())
}
}
#[derive(Deserialize, Debug)]
struct K8SPodsInfo {
items: Vec<Item>,
}
#[derive(Deserialize, Debug)]
struct Item {
spec: Spec,
}
#[derive(Deserialize, Debug)]
struct Spec {
containers: Vec<Container>,
}
#[derive(Deserialize, Debug)]
struct Container {
image: String,
}
fn kubernetes_get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> {
let mut args = vec![
"get".to_string(),
"pods".to_string(),
"-o".to_string(),
"json".to_string(),
];
args.extend_from_slice(filter);
debug!("Running kubectl with args {args:?}");
let output = Command::new("kubectl")
.args(&args)
.output()
.context_str("failed to start k8s command")?;
if !output.status.success() {
let std_err =
std::str::from_utf8(&output.stderr).context_str("failed to retrieve kubectl stderr")?;
warn!("k8s stderr {std_err}");
return Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("k8s command failed, {std_err}"),
});
}
match serde_json::from_slice::<K8SPodsInfo>(&output.stdout) {
Ok(response) => {
debug!("kubectl response: {response:#?}");
Ok(response)
}
Err(e) => {
warn!("k8s stdout {:?}", std::str::from_utf8(&output.stdout));
Err(e.into())
}
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment