Commit 3e6af9b4 authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

Merge branch 'sz/qnd_twitter_limiting' into 'dev'

Sz/qnd twitter limiting

See merge request memri/pod!381
parents c4fa8d1b 3a2b1cd1
Showing with 150 additions and 11 deletions
+150 -11
......@@ -1301,9 +1301,9 @@ dependencies = [
[[package]]
name = "once_cell"
version = "1.14.0"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f7254b99e31cad77da24b08ebf628882739a608578bb1bcdfc1f9c21260d7c0"
checksum = "86f0b0d4bf799edbc74508c1e8bf170ff5f41238e5f8225603ca7caaae2b7860"
[[package]]
name = "oorandom"
......
-- uniqueId
INSERT INTO items(id, type, dateCreated, dateModified, dateServerModified, deleted) VALUES(
"0cbd6502-4a6c-4be8-823f-c101cfaae933",
"ItemPropertySchema", 0, 0, 0, 0
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "0cbd6502-4a6c-4be8-823f-c101cfaae933"),
"itemType", "PluginRun"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "0cbd6502-4a6c-4be8-823f-c101cfaae933"),
"propertyName", "uniqueId"
);
INSERT INTO strings(item, name, value) VALUES(
(SELECT rowid FROM items WHERE id = "0cbd6502-4a6c-4be8-823f-c101cfaae933"),
"valueType", "Text"
);
......@@ -45,6 +45,7 @@ pub struct PluginRunItem {
pub container_id: String,
pub status: Option<String>,
pub plugin_alias: Option<String>,
pub unique_id: Option<String>,
}
#[derive(Serialize, Deserialize, Debug)]
......
......@@ -20,6 +20,8 @@ use serde::Deserialize;
use serde_json::{json, Value};
use std::{
collections::HashMap,
ffi::OsStr,
fmt::Debug,
process::{Child, Command},
sync::atomic::AtomicU32,
time::Duration,
......@@ -29,6 +31,7 @@ use warp::http::status::StatusCode;
use duct;
const UNIQUE_ID: &str = "unique_id";
// TODO: this is barely maintainable, split to strategy pattern: Docker, K8S, sharing probably one trait
/// Run a plugin, making sure that the correct ENV variables and settings are passed
......@@ -90,6 +93,10 @@ pub fn run_plugin_container(
)?;
}
if let Err(e) = k8s_enforce_unique_instance(plugin) {
warn!("Enforcing K8s container unique instance failed because: {e:?}");
}
run_kubernetes_container(
&target_item_json,
pod_owner,
......@@ -99,6 +106,9 @@ pub fn run_plugin_container(
cli_options,
)
} else {
if let Err(e) = docker_enforce_unique_instance(plugin) {
warn!("Enforcing docker container unique instance failed because: {e:?}");
}
run_docker_container(
&target_item_json,
pod_owner,
......@@ -195,6 +205,10 @@ fn run_docker_container(
args.extend_from_slice(&os_specific_args);
if let Some(ref unique_id) = plugin.unique_id {
args.extend_from_slice(&["--label".to_string(), format!("{UNIQUE_ID}={unique_id}")]);
}
args.extend_from_slice(&["--".to_string(), plugin.container_image.clone()]);
let envs: HashMap<&str, &str> = HashMap::new();
......@@ -267,17 +281,23 @@ fn run_kubernetes_container(
}
});
let mut labels = format!(
"app={},type=plugin,owner={:x}",
plugin.container_id,
// Owner key exceeds a limit of number of characters that k8s label can keep.
// Make it shorter by doing a md5 hash.
md5::compute(pod_owner)
);
if let Some(ref unique_id) = plugin.unique_id {
labels = format!("{labels},{UNIQUE_ID}={unique_id}");
}
let args: Vec<String> = vec![
"run".to_string(),
"--restart=Never".to_string(),
plugin.container_id.clone(),
format!(
"--labels=app={},type=plugin,owner={:x}",
plugin.container_id,
// Owner key exceeds a limit of number of characters that k8s label can keep.
// Make it shorter by doing a md5 hash.
md5::compute(pod_owner)
),
format!("--labels={labels}"),
format!("--port={K8S_CONTAINER_PORT}"),
"--image-pull-policy=Always".to_string(),
format!("--image={}", plugin.container_image),
......@@ -618,6 +638,103 @@ fn check_kubernetes_limits(
}
}
/// Look for containers for given unique_id, kill them if any exist.
/// Currently it's used for mitigating rate limiting twitter api has.
fn k8s_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> {
let Some(ref unique_id) = plugin.unique_id else {
return Ok(())
};
info!(
"Going to cleanup k8s containers with unique_id {}...",
unique_id
);
let args = vec![
"delete".to_string(),
"pod".to_string(),
"-l".to_string(),
format!("{UNIQUE_ID}={unique_id}"),
"--ignore-not-found=true".to_string(),
"--now".to_string(),
];
let std_out = run_command("kubectl", args)?;
debug!("Cleanup result: {std_out}");
Ok(())
}
/// Look for containers for given unique_id, kill them if any exist.
/// Currently it's used for mitigating rate limiting twitter api has.
fn docker_enforce_unique_instance(plugin: &PluginRunItem) -> Result<()> {
let Some(ref unique_id) = plugin.unique_id else {
return Ok(())
};
info!(
"Going to cleanup docker containers with unique_id {}...",
unique_id
);
// Get containers with given label
let args = vec![
"ps".to_string(),
"--filter".to_string(),
format!("label={UNIQUE_ID}={unique_id}"),
"-q".to_string(),
];
let std_out = run_command("docker", args)?;
let running_containers: Vec<&str> = std_out.split_terminator('\n').collect();
debug!("Running containers with unique_id {unique_id}: {running_containers:?}");
// Stop containers with given label
let args = ["container", "stop"]
.iter()
.chain(running_containers.iter());
let _ = run_command("docker", args)?;
Ok(())
}
/// Runs command, waits for finish, returns Ok(std_out) or Err(std_err)
fn run_command<I, S>(command: &str, args: I) -> Result<String>
where
I: IntoIterator<Item = S> + Debug,
S: AsRef<OsStr>,
{
// The IO operation as well as output processing, might take some time,
// move it out of the async context to not block the executor.
// Yes, we are in the async context, even tough function is not marked async.
tokio::task::block_in_place(|| {
let cmd_with_args = format!("{command} {args:?}");
info!("Running command {cmd_with_args}");
let output = Command::new(command)
.args(args)
.output()
.context_str("failed to start command")?;
if output.status.success() {
let std_out = std::str::from_utf8(&output.stdout)
.context_str("failed to retrieve command stdout")?;
Ok(std_out.to_string())
} else {
let std_err = std::str::from_utf8(&output.stderr)
.context_str("failed to retrieve command stderr")?;
warn!("Command {cmd_with_args} stderr {std_err}");
Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("command failed, {std_err}"),
})
}
})
}
#[derive(Deserialize, Debug)]
struct K8SPodsInfo {
items: Vec<Item>,
......
......@@ -253,6 +253,7 @@ where {
if item["type"] == "PluginRun" {
if let Err(e) = (|| {
let plugin: ItemWithBase<PluginRunItem> = serde_json::from_value(item.clone())?;
plugin_run::run_plugin_container(
tx,
schema,
......
......@@ -15,7 +15,8 @@ spec:
command:
- /pod
- '--owners=ANY'
- '--use-kubernetes=true'
- '--use-kubernetes'
# - '--kubernetes-ignore-limits'
- '--insecure-non-tls=0.0.0.0'
- '--plugins-callback-address=http://pod.test'
# - '--plugins-docker-network=pod_memri-net'
......@@ -28,7 +29,7 @@ spec:
protocol: TCP
env:
- name: RUST_LOG
value: pod=debug,info
value: pod=debug,libpod::plugin_run=debug,info
resources: {}
# volumeMounts:
# - name: data
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment