Commit 7ffa112e authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

wip use kubectl to get status, update the db

parent 5d5d6c65
Showing with 138 additions and 25 deletions
+138 -25
use serde::{Deserialize, Serialize};
use crate::api_model::Platforms;
use crate::{api_model::Platforms, plugin_run::PluginState};
pub type Rowid = i64;
pub type DbTime = i64;
......@@ -20,13 +20,23 @@ pub struct ItemBase {
pub deleted: bool,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ItemWithBase<T> {
#[serde(flatten)]
pub base: ItemBase,
#[serde(flatten)]
pub item : T
}
/// Property name of PluginRun object, containing port on which plugin starts to
/// listen it's webserver.
pub const WEBSERVER_PORT: &str = "webserverPort";
pub const WEBSERVER_URL: &str = "webserverUrl";
pub const CONTAINER_ID: &str = "containerId";
pub const CONTAINER_IMAGE: &str = "containerImage";
#[derive(Deserialize, Debug)]
pub const CONTAINER_STATE: &str = "containerState";
#[derive(Deserialize, Debug, Hash, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct PluginRunItem {
pub container_image: String,
......@@ -35,6 +45,7 @@ pub struct PluginRunItem {
pub webserver_url: String,
pub container_id: String,
pub status: Option<String>,
pub container_state: PluginState
}
#[derive(Serialize, Deserialize, Debug)]
......
use crate::{
api_model::CreateItem,
api_model::{CreateItem, Search},
command_line_interface::CliOptions,
database_api,
db_model::{PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, Result},
db_model::{ItemWithBase, PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE, CONTAINER_STATE},
error::{Error, ErrorContext, Result},
internal_api,
internal_api::{new_random_string, search},
plugin_auth_crypto::DatabaseKey,
......@@ -12,8 +12,12 @@ use crate::{
};
use duct;
use rusqlite::Transaction;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::{collections::HashMap, sync::atomic::AtomicU32};
use std::{
collections::{HashMap, HashSet},
sync::atomic::AtomicU32,
};
use tracing::{info, warn};
use warp::http::status::StatusCode;
......@@ -269,7 +273,25 @@ pub fn get_container_id(item: &CreateItem, cli: &CliOptions, pod_owner: &str) ->
Ok(container_id)
}
pub async fn get_plugins_status(owner: &str, cli: &CliOptions) -> Result<()> {
pub type PluginsStatuses = Vec<PluginStatus>;
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Hash)]
#[serde(rename_all = "camelCase")]
pub enum PluginState {
Running,
Waiting,
Terminating,
Terminated,
}
#[derive(Debug, Hash, Eq, PartialEq)]
pub struct PluginStatus {
pub image: String,
pub name: String,
pub state: PluginState,
}
pub async fn get_plugins_status(owner: &str, cli: &CliOptions) -> Result<PluginsStatuses> {
if !cli.use_kubernetes {
return Err(Error {
code: StatusCode::BAD_REQUEST,
......@@ -279,3 +301,56 @@ pub async fn get_plugins_status(owner: &str, cli: &CliOptions) -> Result<()> {
kubernetes::get_plugins_status(owner).await
}
pub fn update_statuses(
tx: &Transaction,
schema: &mut Schema,
statuses: &PluginsStatuses,
) -> Result<()> {
let query = Search {
_type: Some("PluginRun".to_string()),
// TODO: deleted?
// deleted: Some(false),
limit: u64::MAX,
..Default::default()
};
let mut plugins_run = search(tx, schema, query)?
.into_iter()
.map(serde_json::from_value)
.collect::<std::result::Result<Vec<ItemWithBase<PluginRunItem>>, serde_json::Error>>()
.context_str("Failed to retrieve PluginRunItem from the DB")?;
for status in statuses {
let (idx, plugin_run) = plugins_run
.iter()
.enumerate()
.find(|(_idx, plugin_run)| plugin_run.item.container_image == status.image)
// TODO: dont do hard stop
// Element is in runtime, but there is no DB entry - should never happen
.expect(&format!(
"Unable to find a record in DB for plugin that has status {status:?}"
));
internal_api::update_item_tx(
tx,
schema,
&plugin_run.base.id,
HashMap::from([(CONTAINER_STATE.to_string(), json!(status.state))]),
)
.expect(&format!("Failed to update plugin status in the DB"));
// Remove item that occurs in runtime and in DB
plugins_run.remove(idx);
}
// Items that are not in runtime, but still in DB
for plugin_run in plugins_run {
internal_api::delete_item_by_id(tx, &plugin_run.base.id)
.expect("Failed to remove PluginRunItem from DB");
}
// Plugins that are present in k8s but no record in the database
// That should never happen
todo!()
}
......@@ -9,9 +9,11 @@ use crate::command_line_interface::CliOptions;
use crate::db_model::PluginRunItem;
use crate::error::{Error, ErrorContext, Result};
use crate::plugin_run::common::{callback_address, K8S_CONTAINER_PORT};
use crate::plugin_run::PluginStatus;
use tracing::{debug, warn};
use super::common::run_any_command;
use super::{PluginState, PluginsStatuses};
static IMPORTERS_PLUGINS: [&str; 3] = [
"gitlab.memri.io:5050/memri/plugins/whatsapp-multi-device",
......@@ -156,29 +158,42 @@ fn check_limits(plugin: &PluginRunItem, pod_owner: &str, plugin_limit: usize) ->
}
}
pub async fn get_plugins_status(pod_owner: &str) -> Result<()> {
pub async fn get_plugins_status(pod_owner: &str) -> Result<PluginsStatuses> {
let owner_digest = format!("{:x}", md5::compute(pod_owner));
let response: Vec<ContainerStatus> = tokio::task::spawn_blocking(move || -> Result<K8SPodsInfo> {
let args = ["-l".to_string(), format!("owner={owner_digest}")];
// TODO: does not report terminating, wtf
let response: PluginsStatuses = tokio::task::spawn_blocking(move || -> Result<K8SPodsInfo> {
let args = ["-l".to_string(), format!("owner={owner_digest},type=plugin")];
get_pods_info(&args)
}).await??
})
.await??
.items
.into_iter()
.map(|item| {
item.status.container_statuses
// item.status.container_statuses.into_iter().map(|status| {
// status.
// })
// item.status.container_statuses.pop().ok_or(Error {
// code: StatusCode::INTERNAL_SERVER_ERROR,
// msg: "Failed to retrieve container status, list is empty".to_string(),
// })
item.status
.container_statuses
.into_iter()
.map(|container_status| container_status.into())
.collect::<PluginsStatuses>()
})
.flatten()
.collect();
todo!();
Ok(response)
}
impl std::convert::Into<PluginStatus> for ContainerStatus {
fn into(self) -> PluginStatus {
PluginStatus {
image: self.image,
name: self.name,
state: match self.state {
ContainerState::Running(_) => PluginState::Running,
ContainerState::Terminated(_) => PluginState::Terminated,
ContainerState::Waiting(_) => PluginState::Waiting,
},
}
}
}
/// Schema definition at:
......@@ -189,10 +204,21 @@ struct K8SPodsInfo {
}
#[derive(Deserialize, Debug)]
struct Item {
metadata: Metadata,
spec: Spec,
status: Status,
}
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct Metadata {
// It's hard to believe, but "Terminating" state is not taken
// from the field `status.Phase`, but by checking presence of
// deletionTimestamp. Great design decision k8s..
// https://github.com/kubernetes/kubernetes/issues/22839
deletion_timestamp: Option<String>
}
#[derive(Deserialize, Debug)]
struct Spec {
containers: Vec<Container>,
......@@ -209,9 +235,9 @@ struct Status {
struct ContainerStatus {
image: String,
name: String,
ready: bool,
restart_count: u32,
started: bool,
_ready: bool,
_restart_count: u32,
_started: bool,
state: ContainerState,
}
......@@ -226,7 +252,7 @@ enum ContainerState {
#[derive(Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
struct ContainerStateInner {
started_at: String,
_started_at: String,
}
#[derive(Deserialize, Debug)]
......
......@@ -741,6 +741,7 @@ async fn _plugins_status(
let statuses = plugin_run::get_plugins_status(&owner, cli).await?;
in_write_transaction(&mut conn, move |tx| {
todo!()
let mut schema = database_api::get_schema(tx)?;
plugin_run::update_statuses(tx, &mut schema, &statuses)
})
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment