Commit 471b5114 authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

query for health

parent 539c4e5e
Showing with 311 additions and 12 deletions
+311 -12
......@@ -412,6 +412,31 @@ See [Plugins](./Plugins.md) on how plugins are started exactly.
⚠️ UNSTABLE: We might require more properties for Plugins to start in the future,
e.g. permission limitation.
### POST /v4/$OWNER_KEY/plugin/status
```json
{
"auth": $auth_json,
"payload": {
"plugins": []
}
}
```
Returns statuses for selected plugins, `plugins` array accepts `PluginRun` id's.
If `plugins` array is empty, queries all plugins.
If `plugins` contains invalid id it's get discarded in the response
Returns `plugins` map, where key is the `PluginRun` id, and a value is the `PluginStatus`
### Example response
```json
{
"plugins": {
"4fa6094c-51d6-4874-a135-9017fcdedf18": "running"
}
}
```
# Logs API
### POST /v4/$owner_key/get_pluginrun_log
......@@ -560,3 +585,4 @@ Returns currently valid `access_token` for given `platform`
```
......@@ -270,6 +270,31 @@ pub struct SendEmail {
pub body: String,
}
#[derive(Deserialize, Debug)]
pub struct TraceRequest {
pub secret: String,
pub filter: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PluginStatusReq {
pub plugins: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub enum PluginStatus {
Running(Value),
Unreachable,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PluginStatusRes {
pub plugins: HashMap<String, PluginStatus>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum SortOrder {
/// Ascending
......@@ -307,12 +332,6 @@ pub struct GetFile {
pub sha256: String,
}
#[derive(Deserialize, Debug)]
pub struct TraceRequest {
pub secret: String,
pub filter: String,
}
#[cfg(test)]
mod tests {
use super::*;
......
......@@ -20,6 +20,15 @@ pub struct ItemBase {
pub deleted: bool,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ItemWithBase<T> {
#[serde(flatten)]
pub base: ItemBase,
#[serde(flatten)]
pub item: T,
}
/// Property name of PluginRun object, containing port on which plugin starts to
/// listen it's webserver.
pub const WEBSERVER_PORT: &str = "webserverPort";
......
use crate::{
api_model::CreateItem,
api_model::{CreateItem, PluginStatus, PluginStatusReq, PluginStatusRes, Search},
command_line_interface::CliOptions,
database_api,
db_model::{PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
database_api::{self, in_read_transaction},
database_pool::PooledConnection,
db_model::{ItemWithBase, PluginRunItem, CONTAINER_ID, CONTAINER_IMAGE},
error::{Error, ErrorContext, Result},
internal_api,
internal_api::{new_random_string, search},
plugin_auth_crypto::DatabaseKey,
schema::Schema,
};
use futures::future;
use md5;
use rusqlite::Transaction;
use serde::Deserialize;
......@@ -650,3 +652,90 @@ fn kubernetes_get_pods_info(filter: &[String]) -> Result<K8SPodsInfo> {
}
}
}
pub async fn get_plugins_status(
conn: &mut PooledConnection,
cli: &CliOptions,
payload: PluginStatusReq,
) -> Result<PluginStatusRes> {
debug!("Get status for {payload:?}");
let plugins = in_read_transaction(conn, move |tx| {
let schema = database_api::get_schema(tx)?;
let query = Search {
_type: Some("PluginRun".to_string()),
deleted: Some(false),
limit: u64::MAX,
..Default::default()
};
let mut plugins = search(tx, &schema, query)?
.into_iter()
.map(|element| {
serde_json::from_value::<ItemWithBase<PluginRunItem>>(element).unwrap_or_else(
|err| panic!("Unable to deserialize value into PluginRun, reason {err:?}"),
)
})
.collect::<Vec<_>>();
// Narrow the result to requested plugins
if !payload.plugins.is_empty() {
plugins.retain(|plugin| {
payload
.plugins
.iter()
.any(|plugin_id| &plugin.base.id == plugin_id)
})
}
Ok(plugins)
})?;
let client = reqwest::Client::new();
let statuses = plugins.into_iter().map(|el| async {
let status = query_for_status(&client, &el, cli).await;
(el.base.id, status)
});
let statuses = future::join_all(statuses).await;
debug!("Plugin statuses {statuses:#?}");
Ok(PluginStatusRes {
plugins: statuses.into_iter().collect(),
})
}
async fn query_for_status(
client: &reqwest::Client,
plugin_instance: &ItemWithBase<PluginRunItem>,
cli: &CliOptions,
) -> PluginStatus {
let host = &plugin_instance.item.webserver_url;
let endpoint = if cli.use_kubernetes && cli.plugins_public_domain.is_some() {
// On kubernetes reach the plugin via 80/443 port
format!("{host}/v1/health")
} else {
let port = plugin_instance.item.webserver_port;
format!("{host}:{port}/v1/health")
};
match client.get(endpoint).send().await {
Ok(response) => {
// Able to reach the webserver and get the response back
// TODO: to be more granular in the future
let plugin_state = match response.json::<Value>().await {
Ok(state) => state,
Err(_e) => {
json!("UnknownState")
}
};
PluginStatus::Running(plugin_state)
}
Err(_) => PluginStatus::Unreachable,
}
}
......@@ -45,12 +45,16 @@ name = "test_trigger_deregister"
path = "tests/test_trigger_deregister.rs"
required-features = ["include_slow_tests"]
[[test]]
name = "test_plugin_state"
path = "tests/test_plugin_state.rs"
required-features = ["include_slow_tests"]
[[test]]
name = "test_plugin"
path = "tests/test_plugin.rs"
required-features = ["include_slow_tests"]
[[test]]
name = "test_version"
path = "tests/test_version.rs"
......
......@@ -376,6 +376,19 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
)
.boxed();
let plugin_status = items_api
.and(warp::path!(PodOwner / "plugin" / "status"))
.and(warp::path::end())
.and(warp::body::bytes())
.and(with_init_db())
.and(with_cli())
.then(
|owner: PodOwner, body: Bytes, init_db: Arc<InitDb>, cli: Arc<CliOptions>| async move {
warp_endpoints::plugins_status(owner, init_db.deref(), cli.deref(), body).await
},
)
.boxed();
let origin_request = warp::options()
.and(warp::header::<String>("origin"))
.map(move |_origin| {
......@@ -430,6 +443,7 @@ pub async fn run_server<S: 'static>(cli_options: CliOptions, trace_handle: Handl
.or(oauth2_authorize.with(&headers))
.or(oauth2_access_token.with(&headers))
.or(trace_filter.with(&headers))
.or(plugin_status.with(&headers))
.boxed();
let not_found = warp::any().map(|| {
......
......@@ -4,7 +4,8 @@ use libpod::{
AuthKey, Bulk, BulkResponse, CreateEdge, CreateItem, DeleteEdgeBySourceTarget, GetEdges,
GetFile, Oauth2AccessTokenRequest, Oauth2AccessTokenResponse, Oauth2AuthUrlRequest,
Oauth2AuthorizeTokenRequest, OauthAccessTokenPayload, OauthRequestTokenPayload,
PayloadWrapper, PodOwner, Search, SendEmail, TraceRequest, UpdateItem,
PayloadWrapper, PluginStatusReq, PluginStatusRes, PodOwner, Search, SendEmail,
TraceRequest, UpdateItem,
},
command_line_interface,
command_line_interface::CliOptions,
......@@ -654,6 +655,34 @@ pub fn _trace_filter<S>(body: Bytes, trace_handler: Handle<EnvFilter, S>) -> Res
})
}
#[instrument(fields(uid=trace_uid(), %owner), skip_all)]
pub async fn plugins_status(
owner: PodOwner,
init_db: &InitDb,
cli: &CliOptions,
body: Bytes,
) -> Response {
let result = _plugins_status(owner, init_db, cli, body).await;
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
}
async fn _plugins_status(
owner: PodOwner,
init_db: &InitDb,
cli: &CliOptions,
body: Bytes,
) -> Result<PluginStatusRes> {
let body = &mut serde_json::Deserializer::from_slice(body.deref());
let body: PayloadWrapper<PluginStatusReq> = serde_path_to_error::deserialize(body)?;
let auth = body.auth;
let database_key = auth_to_database_key(auth)?;
let mut conn = check_owner_and_initialize_db(&owner, init_db, &database_key).await?;
plugin_run::get_plugins_status(&mut conn, cli, body.payload).await
}
//
// helper functions:
//
......
......@@ -79,7 +79,7 @@ fn get_pod_path_executable() -> PathBuf {
}
impl TestData {
async fn stop_containers(&self) {
pub async fn stop_containers(&self) {
let containers = self
.list_containers(ListContainersOptions {
filters: HashMap::from([("name", vec![self.owner_key.as_ref()])]),
......
mod common;
use crate::common::pod_request::{deploy_plugin, migrate_plugin_definition, start_plugin};
use common::test_data::TestData;
use libpod::api_model::{PluginStatus, PluginStatusRes};
use serde_json::json;
use test_context::test_context;
#[test_context(TestData)]
#[tokio::test]
async fn test_plugin_state(ctx: &mut TestData) {
migrate_plugin_definition(&ctx.pod_client).await;
// Request status of all plugins on fresh instance, should be none
let payload = json!(
{
"plugins" : []
}
);
let response = ctx.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
assert_eq!(
response
.json::<PluginStatusRes>()
.await
.unwrap()
.plugins
.len(),
0
);
deploy_plugin(&ctx.pod_client).await;
start_plugin(ctx).await;
// Request status of all plugins
let payload = json!(
{
"plugins" : []
}
);
let response = ctx.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
let response = response.json::<PluginStatusRes>().await.unwrap();
assert_eq!(response.plugins.len(), 1);
let plugin_run_id = "4fa2094c-51d6-4874-a135-9017fcdedf16";
assert!(matches!(
response.plugins.get(plugin_run_id).unwrap(),
PluginStatus::Running(_)
));
// Request status of selected plugins
let payload = json!(
{
"plugins" : [plugin_run_id, "invalid-id"]
}
);
let response = ctx.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
let response = response.json::<PluginStatusRes>().await.unwrap();
assert_eq!(response.plugins.len(), 1);
assert!(matches!(
response.plugins.get(plugin_run_id).unwrap(),
PluginStatus::Running(_)
));
// Request status of selected invalid plugin
let payload = json!(
{
"plugins" : ["invalid-id"]
}
);
let response = ctx.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
let response = response.json::<PluginStatusRes>().await.unwrap();
assert_eq!(response.plugins.len(), 0);
ctx.stop_containers().await;
// Request status of selected plugin, that does not run
let payload = json!(
{
"plugins" : [plugin_run_id, "invalid-id"]
}
);
let response = ctx.pod_client.post_to(payload, "plugin/status").await;
assert!(response.status().is_success());
let response = response.json::<PluginStatusRes>().await.unwrap();
assert_eq!(response.plugins.len(), 1);
assert!(matches!(
response.plugins.get(plugin_run_id).unwrap(),
PluginStatus::Unreachable
));
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment