Commit af33c30a authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

wip benching

No related merge requests found
Showing with 1218 additions and 4 deletions
+1218 -4
-- "Message": {
-- "datePublished": DateTime,
-- "isMock": Bool,
-- "keyword": Text,
-- "subject": Text,
-- "transcript": Text,
-- "service": Text,
-- "textContent": Text,
-- "title": Text,
-- "externalId": Text,
-- "abstract": Text,
-- "dateSent": DateTime,
-- "dateReceived": DateTime,
-- "itemType": Text,
-- "content": Text,
-- },
CREATE TABLE Message (
rowid INTEGER PRIMARY KEY,
isMock INTEGER NULL,
-- dateCreated INTEGER /* datetime */ NOT NULL,
-- dateModified INTEGER /* datetime */ NOT NULL,
-- dateServerModified INTEGER /* datetime */ NOT NULL,
-- deleted INTEGER /* boolean */ NOT NULL
keyword TEXT NULL,
subject TEXT NULL,
transcript TEXT NULL,
service TEXT NULL,
textContent TEXT NULL,
title TEXT NULL,
externalId TEXT NULL,
abstract TEXT NULL,
dateSent INTEGER NULL,
dateReceived INTEGER NULL,
itemType TEXT NULL,
content TEXT NULL
);
\ No newline at end of file
......@@ -694,6 +694,10 @@ pub fn get_incoming_edges(tx: &Tx, target: Rowid) -> Result<Vec<EdgePointer>> {
}
pub fn get_schema(tx: &Tx) -> Result<Schema> {
// SELECT type, name, type from ItemPropertySchemas
// WHERE
let mut stmt = tx
.prepare_cached(
"SELECT itemType.value, propertyName.value, valueType.value \
......
......@@ -28,7 +28,8 @@ use crate::async_db_connection::AsyncTx as Tx;
use http::StatusCode;
use rand::Rng;
use serde_json::{Map, Value};
use std::{collections::HashMap, str};
use std::{collections::HashMap, str, time::Duration};
use tokio::time::Instant;
use tracing::{debug, info};
pub fn get_project_version() -> String {
......@@ -547,17 +548,39 @@ pub async fn create_item(
// Commit the item to the DB
let item_id = conn
.in_write_transaction(|tx: AsyncTx| async move {
let now = Instant::now();
let mut schema = database_api::get_schema(&tx)?;
create_item_tx(&tx, &mut schema, payload, owner, cli).await
let get_schema = Instant::now() - now;
static mut max_schema: Duration = Duration::ZERO;
unsafe {
if get_schema > max_schema {
max_schema = get_schema;
}
info!("get schema took {:?}", max_schema);
}
let now = Instant::now();
let res = create_item_tx(&tx, &mut schema, payload, owner, cli).await;
let create_dur = Instant::now() - now;
info!("create item took: {:?}", create_dur);
res
})
.await;
let now = Instant::now();
if let Ok(id) = &item_id {
// After transaction commit, the item is visible to all readers, now it's safe to call triggers
triggers::actions_after_item_create(conn, &[id], owner, cli, database_key).await?;
}
let actions_dur = Instant::now() - now;
info!("actions after item create took {actions_dur:?}");
item_id
}
......
......@@ -5,6 +5,7 @@ use crate::actix_endpoints::{
plugin_api_call, plugin_attach, plugins_status, search, trace_filter, trigger_status,
update_item, upload_file, upload_file_b, version,
};
use crate::v5;
use actix_cors::Cors;
use actix_web::{
self,
......@@ -109,7 +110,7 @@ pub async fn run_server<S: 'static>(
)
.service(get_logs)
.service(trigger_status),
)
).service(web::scope("v5").service(v5::create_item))
});
let (address, use_tls) = if cli_options.non_tls || cli_options.insecure_non_tls.is_some() {
......
mod actix_api;
mod actix_endpoints;
mod v5;
mod pod_handlers_v5;
mod pod_handlers;
use libpod::internal_api::get_project_version;
......
This diff is collapsed.
use std::fmt::Display;
use crate::pod_handlers_v5::{self, trace_uid};
use actix_web::error::ErrorBadRequest;
use actix_web::{get, post, web, HttpResponse, HttpResponseBuilder, Responder, ResponseError};
use http::StatusCode;
use libpod::{
api_model::PodOwner, command_line_interface::CliOptions, database_pool::InitDb, error::Result,
};
use serde::{Deserialize, Serialize};
use tracing::error;
use tracing::instrument;
use tracing_subscriber::{reload::Handle, EnvFilter};
// use warp::{http::status::StatusCode, hyper::body::Bytes, reply::Response, Reply};
pub async fn not_found() -> impl Responder {
HttpResponse::NotFound().json("Endpoint not found")
}
//
// Items API:
//
#[instrument(fields(uid=trace_uid(), %owner), skip_all)]
#[post("{owner}/create_item")]
pub async fn create_item(
owner: web::Path<PodOwner>,
init_db: web::Data<InitDb>,
body: web::Bytes,
cli: web::Data<CliOptions>,
) -> actix_web::Result<impl Responder> {
let body = extract_json(&body)?;
let result =
pod_handlers_v5::create_item(owner.to_owned(), &init_db.to_owned(), body, &cli.to_owned())
.await;
let result = result.map(web::Json);
respond_with_result(result)
}
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/get_item")]
// pub async fn get_item(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::get_item(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(skip(body))]
// #[post("{owner}/oauth1_request_token")]
// pub async fn oauth1_request_token(body: web::Bytes) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::oauth1_request_token(body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(skip(body))]
// #[post("{owner}/oauth1_access_token")]
// pub async fn oauth1_access_token(body: web::Bytes) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::oauth1_access_token(body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/oauth2/auth_url")]
// pub async fn oauth2_auth_url(
// owner: web::Path<PodOwner>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::oauth2_auth_url(body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/oauth2/authorize")]
// pub async fn oauth2_authorize(
// owner: web::Path<PodOwner>,
// body: web::Bytes,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::oauth2_authorize(
// owner.to_owned(),
// body,
// &init_db.to_owned(),
// &cli.to_owned(),
// )
// .await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/oauth2/access_token")]
// pub async fn oauth2_access_token(
// owner: web::Path<PodOwner>,
// body: web::Bytes,
// init_db: web::Data<InitDb>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::oauth2_access_token(owner.to_owned(), body, &init_db.to_owned()).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/update_item")]
// pub async fn update_item(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::update_item(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(|()| web::Json(serde_json::json!({})));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/bulk")]
// pub async fn bulk(
// owner: web::Path<PodOwner>,
// body: web::Bytes,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::bulk(owner.to_owned(), &init_db.to_owned(), body, &cli.to_owned()).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/delete_item")]
// pub async fn delete_item(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::delete_item(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(|()| web::Json(serde_json::json!({})));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/delete_user")]
// pub async fn delete_user(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::delete_user(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(|()| web::Json(serde_json::json!({})));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid()), skip_all)]
// #[post("/account")]
// pub async fn create_account(
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::create_account(&init_db.into_inner(), body).await;
// let result = result.map(|result| web::Json(serde_json::json!(result)));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/create_edge")]
// pub async fn create_edge(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::create_edge(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/delete_edge_by_source_target")]
// pub async fn delete_edge_by_source_target(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::delete_edge_by_source_target(owner.to_owned(), &init_db.to_owned(), body)
// .await;
// let result = result.map(|()| web::Json(serde_json::json!({})));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/get_edges")]
// pub async fn get_edges(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::get_edges(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/search")]
// pub async fn search(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::search(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/graphql")]
// pub async fn graphql(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::graphql(owner.to_owned(), &init_db.to_owned(), body).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// //
// // Files API:
// //
// #[instrument(fields(uid=trace_uid(), owner=%path.0), skip_all)]
// pub async fn upload_file(
// path: web::Path<(PodOwner, String, String)>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let (owner, database_key, expected_sha256) = path.into_inner();
// let result =
// pod_handlers::upload_file(owner, &init_db, database_key, expected_sha256, &body).await;
// respond_with_result(result.map(|()| web::Json(serde_json::json!({}))))
// }
// #[instrument(fields(uid=trace_uid(), owner=%path.0), skip_all)]
// pub async fn upload_file_b(
// path: web::Path<(PodOwner, String, String)>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let (owner, auth_json, expected_sha256) = path.into_inner();
// let result =
// pod_handlers::upload_file_b(owner, &init_db, auth_json, expected_sha256, &body).await;
// respond_with_result(result.map(|()| web::Json(serde_json::json!({}))))
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// pub async fn get_file(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// pod_handlers::get_file(owner.to_owned(), &init_db.to_owned(), body)
// .await
// .map_err(ErrorBadRequest)
// }
// //
// // Logs API
// //
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("{owner}/get_pluginrun_log")]
// pub async fn get_logs(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// cli: web::Data<CliOptions>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::get_logs(owner.to_owned(), &init_db.to_owned(), body, &cli.to_owned()).await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// //
// // Email API
// //
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/{owner}/send_email")]
// pub async fn send_email(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// body: web::Bytes,
// cli: web::Data<CliOptions>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::send_email(owner.to_owned(), &init_db.to_owned(), body, &cli.to_owned())
// .await;
// let result = result.map(|()| web::Json(serde_json::json!({})));
// respond_with_result(result)
// }
// //
// // Trigger status
// //
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/{owner}/trigger/status")]
// pub(crate) async fn trigger_status(
// owner: web::Path<PodOwner>,
// _init_db: web::Data<InitDb>,
// body: web::Bytes,
// _cli: web::Data<CliOptions>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::trigger_status(&body);
// respond_with_result(result)
// }
// #[instrument(skip_all)]
// pub async fn trace_filter<S>(
// body: web::Bytes,
// trace_handler: web::Data<Handle<EnvFilter, S>>,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result = pod_handlers::trace_filter(body, &trace_handler.into_inner());
// let result = result.map(|()| web::Json(serde_json::json!("ok")));
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/status")]
// pub async fn plugins_status(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::plugins_status(owner.to_owned(), &init_db.to_owned(), &cli.to_owned(), body)
// .await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/api")]
// pub(crate) async fn plugin_api(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::plugin_api(owner.to_owned(), &init_db.to_owned(), &cli.to_owned(), body)
// .await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/api/call")]
// pub async fn plugin_api_call(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::plugin_api_call(owner.to_owned(), &init_db.to_owned(), &cli.to_owned(), body)
// .await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
// #[instrument(fields(uid=trace_uid(), %owner), skip_all)]
// #[post("/attach")]
// pub async fn plugin_attach(
// owner: web::Path<PodOwner>,
// init_db: web::Data<InitDb>,
// cli: web::Data<CliOptions>,
// body: web::Bytes,
// ) -> actix_web::Result<impl Responder> {
// let body = extract_json(&body)?;
// let result =
// pod_handlers::plugin_attach(owner.to_owned(), &init_db.to_owned(), &cli.to_owned(), body)
// .await;
// let result = result.map(web::Json);
// respond_with_result(result)
// }
//
// helper functions:
//
pub fn respond_with_result<T: Serialize>(result: Result<T>) -> actix_web::Result<HttpResponse> {
let response = match result {
Ok(t) => HttpResponse::Ok().json(t),
Err(err) => {
let code = match &err.error {
libpod::error::ErrorType::AeadEncryption(_) => StatusCode::MISDIRECTED_REQUEST,
libpod::error::ErrorType::Database(_)
| libpod::error::ErrorType::Other { msg: _, source: _ }
| libpod::error::ErrorType::PluginNotFound(_)
| libpod::error::ErrorType::BadRequest(_) => StatusCode::BAD_REQUEST,
libpod::error::ErrorType::Any { code, msg: _ } => *code,
libpod::error::ErrorType::UnauthorizedDatabaseAccess(_) => StatusCode::UNAUTHORIZED,
libpod::error::ErrorType::Internal(_) => StatusCode::INTERNAL_SERVER_ERROR,
};
error!("Returning HTTP failure, code {code} error {err}");
let msg = format!("Failure: Error {code}, {err}");
HttpResponseBuilder::new(code).json(msg)
}
};
Ok(response)
}
#[derive(Debug)]
struct JsonExtractorError(String);
impl std::error::Error for JsonExtractorError {}
impl Display for JsonExtractorError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Extracting JSON from request failed: {}", self.0)
}
}
impl ResponseError for JsonExtractorError {
fn status_code(&self) -> StatusCode {
StatusCode::BAD_REQUEST
}
}
/// It's tempting to use web::Json<PayloadWrapper<T>>,
/// However that turned out to be significantly slower than extracting the bytes and then
/// deserializing manually. Moreover JSON extractor does not report anything on the console
/// if request body was in some sense invalid. That's really annoying and requires
/// looking at client logs for error response - making debugging the pod impossible.
fn extract_json<'a, T: Deserialize<'a>>(body: &'a web::Bytes) -> actix_web::Result<T> {
let deserializer = &mut serde_json::Deserializer::from_slice(body);
let result = serde_path_to_error::deserialize::<_, T>(deserializer)
.map_err(|e| JsonExtractorError(format!("{e}")));
if let Err(e) = &result {
error!("{e}");
}
Ok(result?)
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment