Commit 181a84e5 authored by Bijun Li's avatar Bijun Li
Browse files

Add run_services API

parent c655dae3
Showing with 76 additions and 55 deletions
+76 -55
......@@ -106,23 +106,19 @@ pub struct RunIndexer {
pub service_payload: Value,
}
//
// Files:
//
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GetFile {
pub sha256: String,
pub struct RunService {
pub uid: i64,
pub service_payload: Value,
}
//
// Actions:
// Files:
//
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Action {
pub action_type: String,
pub content: Value,
pub struct GetFile {
pub sha256: String,
}
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::{RunDownloader, RunService};
use crate::command_line_interface::CLIOptions;
use crate::error::Error;
use crate::error::Result;
......@@ -144,6 +144,49 @@ pub fn run_indexers(
}
}
pub fn run_services(
conn: &Connection,
payload: RunService,
cli_options: &CLIOptions,
) -> Result<()> {
info!("Trying to run service on item {}", payload.uid);
let result = internal_api::get_item(conn.deref(), payload.uid)?;
if result.first().is_none() {
return Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to get item {}", payload.uid),
});
};
let mut args: Vec<String> = Vec::new();
args.push("run".to_string());
for arg in docker_arguments(cli_options) {
args.push(arg);
}
args.push(format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
));
args.push("--rm".to_string());
args.push("--name=memri-services_1".to_string());
args.push(format!("--env=RUN_UID={}", payload.uid));
args.push(format!(
"{}:latest",
result.get(3).expect("Failed to get dataType").to_string()
));
log::debug!("Starting service docker command {:?}", args);
let command = Command::new("docker").args(&args).spawn();
match command {
Ok(_child) => {
log::debug!("Successfully started service for {}", payload.uid);
Ok(())
}
Err(err) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to run service with uid {}, {}", payload.uid, err),
}),
}
}
fn docker_arguments(cli_options: &CLIOptions) -> Vec<String> {
let is_https = cli_options.insecure_non_tls.is_none() && !cli_options.non_tls;
let schema = if is_https { "https" } else { "http" };
......
use crate::api_model::Action;
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::GetFile;
......@@ -7,6 +6,7 @@ use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::RunService;
use crate::api_model::SearchByFields;
use crate::api_model::UpdateItem;
use crate::command_line_interface::CLIOptions;
......@@ -53,9 +53,6 @@ pub async fn run_server(cli_options: &CLIOptions) {
let file_api = warp::path("v2")
.and(warp::body::content_length_limit(500 * 1024 * 1024))
.and(warp::post());
let action_api = warp::path("v2")
.and(warp::body::content_length_limit(1024 * 1024))
.and(warp::post());
let initialized_databases_arc = Arc::new(RwLock::new(HashSet::<String>::new()));
......@@ -204,6 +201,18 @@ pub async fn run_server(cli_options: &CLIOptions) {
respond_with_result(result.map(|()| warp::reply::json(&serde_json::json!({}))))
});
let init_db = initialized_databases_arc.clone();
let cli_options_arc = Arc::new(cli_options.clone());
let run_service = services_api
.and(warp::path!(String / "run_service"))
.and(warp::path::end())
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<RunService>| {
let cli: &CLIOptions = &cli_options_arc.deref();
let result = warp_endpoints::run_service(owner, init_db.deref(), body, cli);
respond_with_result(result.map(|()| warp::reply::json(&serde_json::json!({}))))
});
let init_db = initialized_databases_arc.clone();
let upload_file = file_api
.and(warp::path!(String / "upload_file" / String / String))
......@@ -232,16 +241,6 @@ pub async fn run_server(cli_options: &CLIOptions) {
respond_with_result(result.map(|result| result))
});
let do_action = action_api
.and(warp::path!(String / "do_action"))
.and(warp::path::end())
.and(warp::body::json())
.map(move |owner: String, body: Action| {
let result = warp_endpoints::do_action(owner, body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let insecure_http_headers = Arc::new(cli_options.insecure_http_headers);
let origin_request =
warp::options()
......@@ -286,9 +285,9 @@ pub async fn run_server(cli_options: &CLIOptions) {
.or(run_downloader.with(&headers))
.or(run_importer.with(&headers))
.or(run_indexer.with(&headers))
.or(run_service.with(&headers))
.or(upload_file.with(&headers))
.or(get_file.with(&headers))
.or(do_action.with(&headers))
.or(origin_request);
if cli_options.non_tls || cli_options.insecure_non_tls.is_some() {
......
use crate::api_model::Action;
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::GetFile;
......@@ -7,6 +6,7 @@ use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::RunService;
use crate::api_model::SearchByFields;
use crate::api_model::UpdateItem;
use crate::command_line_interface;
......@@ -167,6 +167,17 @@ pub fn run_indexer(
services_api::run_indexers(&conn, body.payload, cli_options)
}
pub fn run_service(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<RunService>,
cli_options: &CLIOptions,
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?;
services_api::run_services(&conn, body.payload, cli_options)
}
pub fn upload_file(
owner: String,
init_db: &RwLock<HashSet<String>>,
......@@ -193,34 +204,6 @@ pub fn get_file(
})
}
pub fn do_action(owner: String, body: Action) -> Result<Value> {
Ok(Value::Null)
// check_owner(&owner)?;
// let res = match body.action_type.as_str() {
// "mx_matrix_register" => whatsapp::matrix_register(body.content),
// "mx_matrix_login" => whatsapp::matrix_login(body.content),
// "mx_create_room" => whatsapp::create_room(body.content),
// "mx_get_joined_rooms" => whatsapp::get_joined_rooms(body.content),
// "mx_invite_user_to_join" => whatsapp::invite_user_to_join(body.content),
// "mx_get_joined_members" => whatsapp::get_joined_members(body.content),
// "mx_send_messages" => whatsapp::send_messages(body.content),
// "mx_sync_events" => whatsapp::sync_events(body.content),
// "mx_get_messages" => whatsapp::get_messages(body.content),
// "mx_get_qrcode" => whatsapp::get_qrcode(body.content),
// e => Err(whatsapp::error::Error {
// code: StatusCode::BAD_REQUEST,
// msg: format!("Unsupported function: {}", e),
// }),
// };
// match res {
// Ok(var) => Ok(var),
// Err(e) => Err(Error {
// code: StatusCode::BAD_REQUEST,
// msg: e.msg,
// }),
// }
}
//
// helper functions:
//
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment