Commit 12efd075 authored by Szymon Zimnowoda's avatar Szymon Zimnowoda
Browse files

Changing webserver to actix-web

parent c8f274d0
Showing with 1968 additions and 1515 deletions
+1968 -1515
......@@ -53,7 +53,6 @@ publish_docker_images:
- dev
- main
pages:
stage: docs
script:
......
This diff is collapsed.
......@@ -9,6 +9,7 @@ publish = false # Prevent accidental publishing
[workspace.dependencies]
hex = "0.4.3"
http = "*"
lazy_static = "1.4.0"
percent-encoding = "2.1.0"
reqwest = { version = "0.11.11" }
......@@ -19,5 +20,4 @@ serde_path_to_error = "0.1.5"
tokio = { version = "1.14.0", features = ["full"] }
tracing = "0.1.36"
tracing-subscriber = { version = "0.3.15" }
warp = { version = "0.3.2" }
futures = "0.3.24"
......@@ -9,6 +9,7 @@ publish.workspace = true
chacha20poly1305 = {version = "0.9.0", features = ["std"]}
graphql-parser = { git = "https://github.com/memri/graphql-parser" }
hex = { workspace = true }
http = { workspace = true }
lazy_static = { workspace = true }
lettre = { version = "0.10.0-rc.3", default-features = false, features = [
"builder",
......@@ -47,14 +48,13 @@ clap = {version = "4", features = ["derive", "env", "color"]}
time = { version = "0.3.5", features = ["formatting", "macros"] }
tokio = { workspace = true }
tracing = { workspace = true }
warp = { workspace = true }
zeroize = "1.4.3"
duct = "0.13.5"
futures = { workspace = true }
thiserror = "1.0.38"
[dev-dependencies]
criterion = "0.3.5"
criterion = {version = "0.3.5", features = ["async_tokio"]}
test-log = { version = "0.2.11", default-features = false, features = [
"trace",
] }
......
use criterion::{criterion_group, criterion_main, Criterion};
use libpod::{
database_api, database_migrate_refinery,
async_db_connection::{AsyncConnection, AsyncTx},
database_api,
internal_api::{graphql, search},
test_helpers::{default_cli, get_memory_connection},
};
mod common;
mod test_dataset;
use serde_json::json;
#[cfg(test)]
pub fn deep_join(c: &mut Criterion) {
let mut conn = rusqlite::Connection::open_in_memory().unwrap();
conn.set_prepared_statement_cache_capacity(0);
// can this be done in more elegant way?
{
let tx = conn.transaction().unwrap();
let _res = tx.execute("PRAGMA cache_size = 0;", []);
let _ = tx.commit();
}
database_migrate_refinery::embedded::migrations::runner()
.run(&mut conn)
.expect("Failed to run refinery migrations");
let tx = conn.transaction().unwrap();
let runtime = tokio::runtime::Runtime::new().unwrap();
let cli = common::default_cli();
let pod_owner = "";
let mut conn = get_memory_connection().unwrap();
test_dataset::prepare_graphql_dataset(&tx, pod_owner, cli);
runtime.block_on(prepare_database(&mut conn));
let schema = database_api::get_schema(&tx).unwrap();
let schema = runtime
.block_on(
conn.in_read_transaction(|tx: AsyncTx| async move { database_api::get_schema(&tx) }),
)
.unwrap();
// search
let search_json = json!({
......@@ -39,48 +29,111 @@ pub fn deep_join(c: &mut Criterion) {
"[[edges]]": {}
});
let res = search(
&tx,
&schema,
serde_json::from_value(search_json.clone()).unwrap(),
)
.unwrap();
assert_eq!(res.len(), 1000);
// graphql search
let query = "
query {
Person1 {
id
friend1 {
id
friend2 {
id
}
}
}
}"
query {
Person1 {
id
friend1 {
id
friend2 {
id
}
}
}
}"
.to_owned();
let _res = graphql(&tx, &schema, query.clone());
// print!("{:}", serde_json::to_string_pretty(&json!(_res.unwrap())).unwrap());
c.bench_function("search api", |b| {
b.iter(|| {
search(
&tx,
&schema,
serde_json::from_value(search_json.clone()).unwrap(),
)
.unwrap()
})
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| {
let schema_2 = schema.clone();
let search_json_2 = search_json.clone();
let mut conn_2 = AsyncConnection::from_other(&conn);
async move {
conn_2
.in_read_transaction(|tx: AsyncTx| async move {
search(
&tx,
&schema_2,
serde_json::from_value(search_json_2).unwrap(),
)
})
.await
}
})
});
c.bench_function("graphql deep join", |b| {
b.iter(|| graphql(&tx, &schema, query.clone()).unwrap())
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter(|| {
let schema_2 = schema.clone();
let mut conn_2 = AsyncConnection::from_other(&conn);
let query_2 = query.clone();
async move {
conn_2
.in_read_transaction(|tx: AsyncTx| async move {
graphql(&tx, &schema_2, query_2)
})
.await
}
})
});
}
async fn prepare_database(conn: &mut AsyncConnection) {
conn.set_prepared_statement_cache_capacity(0);
conn.in_write_transaction(|tx: AsyncTx| async move {
tx.execute("PRAGMA cache_size = 0;", [])?;
Ok(())
})
.await
.unwrap();
conn.in_write_transaction(|tx: AsyncTx| async move {
let cli = default_cli();
let pod_owner = "";
test_dataset::prepare_graphql_dataset(&tx, pod_owner, cli).await;
let schema = database_api::get_schema(&tx).unwrap();
// search
let search_json = json!({
"type": "Person1",
"[[edges]]": {}
});
let res = search(&tx, &schema, serde_json::from_value(search_json).unwrap()).unwrap();
assert_eq!(res.len(), 1000);
// graphql search
let query = "
query {
Person1 {
id
friend1 {
id
friend2 {
id
}
}
}
}"
.to_owned();
let _res = graphql(&tx, &schema, query);
// print!("{:}", serde_json::to_string_pretty(&json!(_res.unwrap())).unwrap());
Ok(())
})
.await
.unwrap();
}
criterion_group!(benches, deep_join);
criterion_main!(benches);
use libpod::{
api_model::CreateItem,
async_db_connection::AsyncTx,
command_line_interface::CliOptions,
database_api,
internal_api::{create_edge, create_item_tx},
};
use rusqlite::Transaction as Tx;
use serde_json::json;
pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
pub async fn prepare_graphql_dataset(tx: &AsyncTx, pod_owner: &str, cli: CliOptions) {
// insert age prop to Person
let schema_json = json!({
"type": "ItemPropertySchema",
......@@ -17,7 +17,9 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
});
let schema_struct: CreateItem = serde_json::from_value(schema_json).unwrap();
let mut schema = database_api::get_schema(tx).unwrap();
create_item_tx(tx, &mut schema, schema_struct, "", &cli).unwrap();
create_item_tx(tx, &mut schema, schema_struct, "", &cli)
.await
.unwrap();
let mut schema = database_api::get_schema(tx).unwrap();
// create level 1
......@@ -28,7 +30,9 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
"age": i
});
let person: CreateItem = serde_json::from_value(person).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli)
.await
.unwrap();
}
// create level 2
......@@ -39,7 +43,9 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
"age": i
});
let person: CreateItem = serde_json::from_value(person).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli)
.await
.unwrap();
}
// create level 3
......@@ -49,7 +55,9 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
"id": "00000".to_owned() + &i.to_string()
});
let person: CreateItem = serde_json::from_value(person).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli).unwrap();
create_item_tx(tx, &mut schema, person, pod_owner, &cli)
.await
.unwrap();
}
// create schema for connections
......@@ -66,7 +74,8 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
schema_struct,
pod_owner,
&cli,
);
)
.await;
assert!(result.is_ok());
let schema_json = json!({
......@@ -82,7 +91,8 @@ pub fn prepare_graphql_dataset(tx: &Tx, pod_owner: &str, cli: CliOptions) {
schema_struct,
pod_owner,
&cli,
);
)
.await;
assert!(result.is_ok());
// Get updated schema
......
......@@ -15,7 +15,7 @@ impl Display for PodOwner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = self.0.as_str();
if s.len() < 12 {
write!(f, "{}", s)
write!(f, "{s}")
} else {
write!(f, "{}..{}", &s[..5], &s[s.len() - 5..])
}
......
use rusqlite::TransactionBehavior;
use std::ops::Deref;
use std::rc::Rc;
use tracing::error;
use crate::database_pool::PooledConnection;
use crate::error::{ErrorContext, Result};
use crate::internal_error;
pub trait AsyncTxHandler {
type Output;
type Future: core::future::Future<Output = Self::Output>;
fn call(self, tx: AsyncTx) -> Self::Future;
}
impl<Func, Fut> AsyncTxHandler for Func
where
Func: FnOnce(AsyncTx) -> Fut,
Fut: core::future::Future,
{
type Output = Fut::Output;
type Future = Fut;
#[inline]
fn call(self, tx: AsyncTx) -> Self::Future {
(self)(tx)
}
}
pub struct AsyncConnection {
pub inner: Rc<PooledConnection>,
}
impl AsyncConnection {
pub fn new(conn: PooledConnection) -> Self {
AsyncConnection {
inner: Rc::new(conn),
}
}
pub fn from_other(conn: &AsyncConnection) -> Self {
AsyncConnection {
inner: conn.inner.clone(),
}
}
/// Starts a transaction and gathers write lock immediately,
/// might wait `busy_timeout` milliseconds if other write
/// transaction is on the way.
/// In SQLite there can be exactly one writer at a time, so write TXes
/// has to be serialized in order to avoid deadlocks
pub async fn in_write_transaction<F, T>(&mut self, f: F) -> Result<T>
where
F: AsyncTxHandler<Output = Result<T>>,
{
// EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started immediately.
// EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in other journaling modes,
// EXCLUSIVE prevents other database connections from reading the database while
// the transaction is underway.
// that prevents writer starvation in other journaling modes
let res = self.in_transaction(f, TransactionBehavior::Exclusive).await;
if let Err(e) = &res {
error!("Error {e} in async write transaction");
}
res
}
/// Executes F inside transaction with default "deferred" behavior, on current thread.
/// Commits the transaction in case of success, rollbacks otherwise.
/// Returns the execution result.
/// There can be many read transactions on the fly, they do not block on each other.
/// Make sure all operations inside TX are read only, if they are not, that might
/// cause a deadlock and fail the TX immediately with SQLITE_BUSY error
pub async fn in_read_transaction<F, T>(&mut self, f: F) -> Result<T>
where
F: AsyncTxHandler<Output = Result<T>>,
{
let res = self.in_transaction(f, TransactionBehavior::Deferred).await;
if let Err(e) = &res {
error!("Error {e} in async read transaction");
}
res
}
// &mut self for exclusive access to inner connection
pub async fn in_transaction<F, T>(&mut self, f: F, behavior: TransactionBehavior) -> Result<T>
where
F: AsyncTxHandler<Output = Result<T>>,
{
// Hide usage of tx in here, to disallow misuse by the caller
let tx = AsyncTx::new(self.inner.clone(), behavior)?;
let res = f.call(tx.clone()).await;
if res.is_ok() {
tx.commit()
.context_str("while committing the transaction")?;
} else {
let _ = tx.rollback();
}
res
}
}
impl Deref for AsyncConnection {
type Target = PooledConnection;
fn deref(&self) -> &Self::Target {
self.inner.as_ref()
}
}
#[derive(Clone)]
pub struct AsyncTx {
pub inner: Rc<PooledConnection>,
}
impl AsyncTx {
fn new(conn: Rc<PooledConnection>, behavior: TransactionBehavior) -> Result<Self> {
let query = match behavior {
TransactionBehavior::Deferred => "BEGIN DEFERRED",
TransactionBehavior::Immediate => "BEGIN IMMEDIATE",
TransactionBehavior::Exclusive => "BEGIN EXCLUSIVE",
_ => return Err(internal_error!("Unsupported TX behavior to set")),
};
conn.execute_batch(query)?;
Ok(Self { inner: conn })
}
fn commit(&self) -> Result<()> {
self.inner.execute_batch("COMMIT")?;
Ok(())
}
fn rollback(&self) -> Result<()> {
self.inner.execute_batch("ROLLBACK")?;
Ok(())
}
}
impl Deref for AsyncTx {
type Target = PooledConnection;
// Yes it's famous Deref antipattern,
// as an excuse, rusqlite does this too
fn deref(&self) -> &Self::Target {
&self.inner
}
}
// ----------------------------------------------------
// Version 1, works somewhat with regular functions, fails with closures - there is no way
// to tell lifetimes in closure
// trait MyHandler<'conn> {
// type Output;
// type Future: core::future::Future<Output = Self::Output>;
// fn call(self, tx: Rc<Tx<'conn>>) -> Self::Future;
// }
// impl<'conn, Func, Fut> MyHandler<'conn> for Func
// where
// Func: FnOnce(Rc<Tx<'conn>>) -> Fut,
// Fut: core::future::Future,
// {
// type Output = Fut::Output;
// type Future = Fut;
// #[inline]
// fn call(self, tx: Rc<Tx<'conn>>) -> Self::Future {
// (self)(tx)
// }
// }
// async fn in_tx<'conn, F, T>(conn: &'conn mut PooledConnection, f: F) -> Result<T>
// where
// F: MyHandler<'conn, Output = Result<T>>,
// {
// let tx = Rc::new(conn.transaction()?);
// let a = f.call(tx.clone()).await?;
// let tx = Rc::try_unwrap(tx).expect("Failed to unwrap transaction");
// let _ = tx.commit();
// Ok(a)
// }
// async fn some_async_fn(tx: Rc<Tx<'_>>) -> Result<()> {
// tx.prepare_cached("asdf")?;
// Ok(())
// }
......@@ -171,7 +171,7 @@ pub struct CliOptions {
fn parse_key_val(s: &str) -> Result<(String, String), String> {
let pos = s
.find('=')
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{}`", s))?;
.ok_or_else(|| format!("invalid KEY=value: no `=` found in `{s}`"))?;
Ok((
s[..pos]
.parse()
......
This diff is collapsed.
use crate::{
any_error, bad_request, constants, database_migrate_refinery,
error::{Error, ErrorContext, Result},
any_error,
async_db_connection::AsyncConnection,
bad_request, constants, database_migrate_refinery,
error::{ErrorContext, Result},
internal_error,
plugin_auth_crypto::{DatabaseKey, SHA256Output},
};
use futures::future::join_all;
use http::StatusCode;
use r2d2::{Pool, PooledConnection as R2D2PolledConnection};
use r2d2_sqlite::SqliteConnectionManager;
use reqwest::StatusCode;
use rusqlite::OpenFlags;
use std::{
collections::HashMap,
......@@ -52,14 +54,14 @@ pub async fn get_db_connection(
owner: &str,
init_db: &InitDb,
database_key: &DatabaseKey,
) -> Result<PooledConnection> {
) -> Result<AsyncConnection> {
let pool = get_pool(owner, init_db, database_key).await?;
let conn = get_connection_from_pool(&pool, database_key)?;
// Returned connection is Send, but not Sync, hence it's enforced during compilation
// that only one thread at a time can use it. That means sqlite can be set to
// multithread mode, not default serialized
Ok(conn)
Ok(AsyncConnection::new(conn))
}
pub async fn remove_db(owner: &str, init_db: &InitDb, database_key: &DatabaseKey) -> Result<()> {
......@@ -322,14 +324,14 @@ fn get_connection_from_pool(
}
#[cfg(test)]
mod tests {
pub mod tests {
use rusqlite::{
ffi::{Error as FFIError, ErrorCode},
Error::SqliteFailure,
};
use tokio::sync::RwLock;
use crate::error::ErrorType;
use crate::error::{Error, ErrorType};
use super::*;
......
use crate::{
async_db_connection::AsyncTx as Tx,
bad_request, database_api,
database_api::{IntegersNameValue, RealsNameValue, Rowid, StringsNameValue},
db_model::ItemBase,
error::{Error, Result},
error::Result,
internal_error,
schema::{Schema, SchemaPropertyType},
};
use rusqlite::{types::ValueRef, Transaction as Tx};
use rusqlite::types::ValueRef;
use serde_json::{Map, Value};
use std::{collections::HashMap, str};
use tracing::warn;
......@@ -313,144 +314,165 @@ fn add_item_base_properties(props: &mut Map<String, Value>, item: ItemBase) {
#[cfg(test)]
mod tests {
use super::{super::schema, *};
use crate::database_api::tests::{new_conn, random_id};
use crate::{
async_db_connection::AsyncTx, database_api::tests::random_id,
test_helpers::get_memory_connection,
};
use serde_json::json;
use std::ops::Not;
#[test]
fn test_one_property() -> Result<()> {
let mut conn = new_conn();
let tx = conn.transaction()?;
let mut schema = database_api::get_schema(&tx).unwrap();
schema.set_type_unchecked(
"Person".to_string(),
"age".to_string(),
SchemaPropertyType::Integer,
);
schema.set_type_unchecked(
"Person".to_string(),
"strength".to_string(),
SchemaPropertyType::Real,
);
schema.set_type_unchecked(
"Person".to_string(),
"myDescription".to_string(),
SchemaPropertyType::Text,
);
let date = schema::utc_millis();
let item: Rowid =
database_api::insert_item_base(&tx, &random_id(), "Person", date, date, date, false)?;
assert!(check_item_has_property(&tx, &schema, item, "age", &json!(20))?.not());
insert_property(&tx, &schema, item, "age", &json!(20))?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"age",
&json!(20)
)?);
assert!(check_item_has_property(&tx, &schema, item, "age", &json!(99))?.not());
// Checking non-existing property should yield an error, not a successful "no" response
assert!(check_item_has_property(&tx, &schema, item, "antiAge", &json!(99)).is_err());
insert_property(&tx, &schema, item, "strength", &json!(13.5))?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"strength",
&json!(13.5)
)?);
insert_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person"),
)?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person")
)?);
Ok(())
#[tokio::test]
async fn test_one_property() -> Result<()> {
let mut conn = get_memory_connection()?;
conn.in_write_transaction(|tx: AsyncTx| async move {
let mut schema = database_api::get_schema(&tx).unwrap();
schema.set_type_unchecked(
"Person".to_string(),
"age".to_string(),
SchemaPropertyType::Integer,
);
schema.set_type_unchecked(
"Person".to_string(),
"strength".to_string(),
SchemaPropertyType::Real,
);
schema.set_type_unchecked(
"Person".to_string(),
"myDescription".to_string(),
SchemaPropertyType::Text,
);
let date = schema::utc_millis();
let item: Rowid = database_api::insert_item_base(
&tx,
&random_id(),
"Person",
date,
date,
date,
false,
)?;
assert!(check_item_has_property(&tx, &schema, item, "age", &json!(20))?.not());
insert_property(&tx, &schema, item, "age", &json!(20))?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"age",
&json!(20)
)?);
assert!(check_item_has_property(&tx, &schema, item, "age", &json!(99))?.not());
// Checking non-existing property should yield an error, not a successful "no" response
assert!(check_item_has_property(&tx, &schema, item, "antiAge", &json!(99)).is_err());
insert_property(&tx, &schema, item, "strength", &json!(13.5))?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"strength",
&json!(13.5)
)?);
insert_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person"),
)?;
assert!(check_item_has_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person")
)?);
Ok(())
})
.await
}
#[test]
fn test_all_properties() -> Result<()> {
let mut conn = new_conn();
let tx = conn.transaction()?;
let mut schema = database_api::get_schema(&tx).unwrap();
schema.set_type_unchecked(
"Person".to_string(),
"age".to_string(),
SchemaPropertyType::Integer,
);
schema.set_type_unchecked(
"Person".to_string(),
"strength".to_string(),
SchemaPropertyType::Real,
);
schema.set_type_unchecked(
"Person".to_string(),
"myDescription".to_string(),
SchemaPropertyType::Text,
);
let date = schema::utc_millis();
let item: Rowid =
database_api::insert_item_base(&tx, &random_id(), "Person", date, date, date, false)?;
insert_property(&tx, &schema, item, "age", &json!(20))?;
insert_property(&tx, &schema, item, "strength", &json!(13.5))?;
insert_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person"),
)?;
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(20));
props.insert("strength".to_string(), json!(13.5));
props.insert("myDescription".to_string(), json!("Wow such person"));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
#[tokio::test]
async fn test_all_properties() -> Result<()> {
let mut conn = get_memory_connection()?;
conn.in_write_transaction(|tx: AsyncTx| async move {
let mut schema = database_api::get_schema(&tx).unwrap();
schema.set_type_unchecked(
"Person".to_string(),
"age".to_string(),
SchemaPropertyType::Integer,
);
schema.set_type_unchecked(
"Person".to_string(),
"strength".to_string(),
SchemaPropertyType::Real,
);
schema.set_type_unchecked(
"Person".to_string(),
"myDescription".to_string(),
SchemaPropertyType::Text,
);
let date = schema::utc_millis();
let item: Rowid = database_api::insert_item_base(
&tx,
&random_id(),
"Person",
date,
date,
date,
false,
)?;
insert_property(&tx, &schema, item, "age", &json!(20))?;
insert_property(&tx, &schema, item, "strength", &json!(13.5))?;
insert_property(
&tx,
&schema,
item,
"myDescription",
&json!("Wow such person"),
)?;
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(20));
props.insert("strength".to_string(), json!(13.5));
props.insert("myDescription".to_string(), json!("Wow such person"));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(20));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(20));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(99999999));
props.insert("strength".to_string(), json!(13.5));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?.not());
}
{
let mut props = HashMap::new();
props.insert("age".to_string(), json!(99999999));
props.insert("strength".to_string(), json!(13.5));
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?.not());
}
{
let mut props = HashMap::new();
props.insert("antiAge".to_string(), json!(-200000000));
assert!(check_item_has_all_properties(&tx, &schema, item, &props).is_err());
}
{
let mut props = HashMap::new();
props.insert("antiAge".to_string(), json!(-200000000));
assert!(check_item_has_all_properties(&tx, &schema, item, &props).is_err());
}
{
let props = HashMap::new();
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
{
let props = HashMap::new();
assert!(check_item_has_all_properties(&tx, &schema, item, &props)?);
}
Ok(())
Ok(())
})
.await
}
}
......@@ -16,7 +16,7 @@ pub fn send_email(email: SendEmail, cli: &CliOptions) -> Result<()> {
) {
(Some(relay), Some(user), Some(password)) => {
let email = Message::builder()
.from(format!("Memri plugin <{}>", user).parse()?)
.from(format!("Memri plugin <{user}>").parse()?)
.to(email.to.parse()?)
.subject(format!("{}{}", PLUGIN_EMAIL_SUBJECT_PREFIX, email.subject))
.body(format!("{}{}", PLUGIN_EMAIL_FOOTER, email.body))?;
......
......@@ -17,8 +17,8 @@
//! with meaningful error message, when type is not important.
use std::{error::Error as StdError, fmt::Display};
use http::status::StatusCode;
use thiserror::Error;
use warp::http::status::StatusCode;
use crate::api_model::PluginIdentifier;
......@@ -26,7 +26,7 @@ use crate::api_model::PluginIdentifier;
#[macro_export]
macro_rules! any_error {
($err_code: expr, $($arg:tt)*) => {
Error {
$crate::error::Error {
context: None,
error: $crate::error::ErrorType::Any { code : $err_code, msg: format!($($arg)*)}
}
......@@ -37,7 +37,7 @@ macro_rules! any_error {
#[macro_export]
macro_rules! bad_request {
($($arg:tt)*) => {
Error {
$crate::error::Error {
context: None,
error: $crate::error::ErrorType::BadRequest(format!($($arg)*))
}
......@@ -48,7 +48,7 @@ macro_rules! bad_request {
#[macro_export]
macro_rules! internal_error {
($($arg:tt)*) => {
Error {
$crate::error::Error {
context: None,
error: $crate::error::ErrorType::Internal(format!($($arg)*))
}
......@@ -64,7 +64,7 @@ macro_rules! internal_error {
pub struct Error {
pub context: Option<String>,
pub error: ErrorType,
// TODO: might want use #[backtrace] in the future, now it's unstable.
// NOTE: might want use #[backtrace] in the future, now it's unstable.
// Note, it's possible to use backtrace-rs on stable, it returns pretty
// decent trace even in async env (tested on debug)
// to make Backtrace not recognized by thiserror, wrap it in the box:
......
use crate::{
any_error, bad_request, constants, database_api,
error::{Error, Result},
internal_error,
};
use crate::async_db_connection::AsyncTx;
use crate::{any_error, bad_request, constants, database_api, error::Result, internal_error};
use chacha20poly1305::{
aead::{Aead, NewAead},
Key, XChaCha20Poly1305, XNonce,
};
use http::status::StatusCode;
use rand::random;
use rusqlite::Transaction;
use sha2::{Digest, Sha256};
use std::{
fs::{create_dir_all, OpenOptions},
......@@ -18,14 +15,8 @@ use std::{
str::FromStr,
};
use tracing::warn;
use warp::http::status::StatusCode;
pub fn upload_file(
tx: &Transaction,
owner: &str,
expected_sha256: &str,
body: &[u8],
) -> Result<()> {
pub fn upload_file(tx: &AsyncTx, owner: &str, expected_sha256: &str, body: &[u8]) -> Result<()> {
if file_exists_on_disk(owner, expected_sha256)? {
// Note that checking once for file existence here is not enough.
// To prevent TOCTOU attack, we also need to check file existence below.
......@@ -66,7 +57,7 @@ pub fn upload_file(
Ok(())
}
pub fn get_file(tx: &Transaction, owner: &str, sha256: &str) -> Result<Vec<u8>> {
pub fn get_file(tx: &AsyncTx, owner: &str, sha256: &str) -> Result<Vec<u8>> {
let file = final_path(owner, sha256)?;
let file = std::fs::read(file).map_err(|err| {
internal_error! { "Failed to read data from target file, {err}" }
......@@ -119,12 +110,7 @@ fn validate_hash(expected_sha256: &str, data: &[u8]) -> Result<()> {
}
/// Update `key` and `nonce` in DB for items that have the given `sha256`
fn update_key_and_nonce(
tx: &Transaction,
key: &[u8],
nonce: &[u8],
for_sha256: &str,
) -> Result<()> {
fn update_key_and_nonce(tx: &AsyncTx, key: &[u8], nonce: &[u8], for_sha256: &str) -> Result<()> {
let item_rowids = database_api::search_strings(tx, "sha256", for_sha256)?;
if item_rowids.is_empty() {
Err(any_error! {
......@@ -143,7 +129,7 @@ fn update_key_and_nonce(
}
/// Find first `key` and `nonce` pair in the database for an item with the desired `sha256`
fn find_key_and_nonce_by_sha256(tx: &Transaction, sha256: &str) -> Result<(Vec<u8>, Vec<u8>)> {
fn find_key_and_nonce_by_sha256(tx: &AsyncTx, sha256: &str) -> Result<(Vec<u8>, Vec<u8>)> {
let item_rowids = database_api::search_strings(tx, "sha256", sha256)?;
if let Some(rowid) = item_rowids.first() {
let mut other_props = database_api::get_strings_for_item(tx, *rowid)?;
......@@ -178,35 +164,38 @@ fn files_dir() -> Result<PathBuf> {
mod tests {
use super::{files_dir, final_owner_dir, get_file, remove_owner_files, upload_file};
use crate::{
command_line_interface, database_api, database_api::tests::new_conn, error::Result,
internal_api,
async_db_connection::AsyncTx, command_line_interface, database_api, error::Result,
internal_api, test_helpers::get_memory_connection,
};
use serde_json::json;
#[test]
fn test_file_upload_get() -> Result<()> {
let mut conn = new_conn();
let tx = conn.transaction().unwrap();
let mut schema = database_api::get_schema(&tx)?;
let cli = command_line_interface::tests::test_cli();
let owner = "testOwner".to_string();
let owner_dir = files_dir()?.join(&owner);
std::fs::remove_dir_all(&owner_dir).ok();
let sha = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string();
let json = json!({
"type": "File",
"sha256": &sha,
});
let sha_item = serde_json::from_value(json)?;
internal_api::create_item_tx(&tx, &mut schema, sha_item, &owner, &cli)?;
upload_file(&tx, &owner, &sha, &[])?;
let result = get_file(&tx, &owner, &sha)?;
assert_eq!(result.len(), 0, "{}:{}", file!(), line!());
std::fs::remove_dir_all(owner_dir).ok();
Ok(())
#[tokio::test]
async fn test_file_upload_get() -> Result<()> {
let mut conn = get_memory_connection()?;
conn.in_write_transaction(|tx: AsyncTx| async move {
let mut schema = database_api::get_schema(&tx)?;
let cli = command_line_interface::tests::test_cli();
let owner = "testOwner".to_string();
let owner_dir = files_dir()?.join(&owner);
std::fs::remove_dir_all(&owner_dir).ok();
let sha =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855".to_string();
let json = json!({
"type": "File",
"sha256": &sha,
});
let sha_item = serde_json::from_value(json)?;
internal_api::create_item_tx(&tx, &mut schema, sha_item, &owner, &cli).await?;
upload_file(&tx, &owner, &sha, &[])?;
let result = get_file(&tx, &owner, &sha)?;
assert_eq!(result.len(), 0, "{}:{}", file!(), line!());
std::fs::remove_dir_all(owner_dir).ok();
Ok(())
})
.await
}
#[tokio::test]
......
......@@ -4,7 +4,7 @@ use graphql_parser::query::{
};
use std::convert::TryFrom;
use crate::error::{Error, Result};
use crate::error::Result;
use std::collections::HashMap;
#[derive(Debug)]
......@@ -162,24 +162,24 @@ impl Filter {
),
Filter::EQ { property, value } => Filter::format_eq(property, value),
Filter::NE { property, value } => Filter::format_ne(property, value),
Filter::LT { property, value } => format!("({:} < {:})", property, value),
Filter::GT { property, value } => format!("({:} > {:})", property, value),
Filter::LTE { property, value } => format!("({:} <= {:})", property, value),
Filter::GTE { property, value } => format!("({:} >= {:})", property, value),
Filter::LT { property, value } => format!("({property:} < {value:})"),
Filter::GT { property, value } => format!("({property:} > {value:})"),
Filter::LTE { property, value } => format!("({property:} <= {value:})"),
Filter::GTE { property, value } => format!("({property:} >= {value:})"),
}
}
fn format_eq(property: &String, value: &String) -> String {
match value.to_lowercase().as_str() {
"null" => format!("({:} is {:})", property, value),
_ => format!("({:} = {:})", property, value),
"null" => format!("({property:} is {value:})"),
_ => format!("({property:} = {value:})"),
}
}
fn format_ne(property: &String, value: &String) -> String {
match value.to_lowercase().as_str() {
"null" => format!("({:} is not {:})", property, value),
_ => format!("({:} != {:})", property, value),
"null" => format!("({property:} is not {value:})"),
_ => format!("({property:} != {value:})"),
}
}
}
......
This diff is collapsed.
pub mod api_model;
pub mod async_db_connection;
pub mod command_line_interface;
pub mod constants;
pub mod database_api;
......@@ -17,6 +18,7 @@ pub mod oauth2_api;
pub mod plugin_auth_crypto;
pub mod plugin_run;
mod plugin_trigger;
mod schema;
pub mod schema;
pub mod shared_plugins;
pub mod test_helpers;
pub mod triggers;
use std::{borrow::Cow, collections::HashMap, env};
use crate::{
bad_request,
error::{Error, Result},
};
use crate::{bad_request, error::Result};
use http;
use oauth::RequestBuilder;
pub use oauth_client as oauth;
use reqwest::{
......@@ -12,7 +10,6 @@ use reqwest::{
};
use serde_json::json;
use std::convert::TryFrom;
use warp::http;
#[derive(Debug)]
pub struct AsyncRequestBuilder {
......@@ -104,7 +101,7 @@ pub async fn oauth1_request_token(
callback_url: String,
service: String,
) -> Result<serde_json::Value> {
println!("authenticating {}", service);
println!("authenticating {service}");
if service == "twitter" {
let (consumer_key, consumer_secret) = twitter_api_keys();
......
use crate::async_db_connection::AsyncTx;
use crate::{
api_model::{
CreateItem, Oauth2AccessTokenRequest, Oauth2AccessTokenResponse, Oauth2AuthUrlRequest,
......@@ -8,7 +9,7 @@ use crate::{
db_model::{
Oauth2Flow, ACCESS_TOKEN, OAUTH2_FLOW, PLATFORM, REDIRECT_URI, REFRESH_TOKEN, TOKEN_TYPE,
},
error::{Error, ErrorContext, Result},
error::{ErrorContext, Result},
internal_api::{self, search},
internal_error,
schema::Schema,
......@@ -16,16 +17,15 @@ use crate::{
use lazy_static::lazy_static;
use oauth2::{
basic::{BasicClient, BasicTokenResponse},
reqwest::http_client,
reqwest::async_http_client,
AuthUrl, AuthorizationCode, ClientId, ClientSecret, CsrfToken, PkceCodeChallenge,
PkceCodeVerifier, RedirectUrl, RefreshToken, Scope, TokenResponse, TokenUrl,
};
use reqwest::{header::CONTENT_TYPE, StatusCode};
use rusqlite::Transaction;
use serde::Deserialize;
use serde_json::json;
use std::{collections::HashMap, env};
use tokio::task;
use tracing::debug;
pub fn auth_url(payload: Oauth2AuthUrlRequest) -> Result<Oauth2AuthUrlResponse> {
......@@ -52,8 +52,8 @@ pub fn auth_url(payload: Oauth2AuthUrlRequest) -> Result<Oauth2AuthUrlResponse>
})
}
pub fn access_token(
tx: &Transaction,
pub async fn access_token(
tx: &AsyncTx,
schema: &Schema,
payload: Oauth2AccessTokenRequest,
) -> Result<Oauth2AccessTokenResponse> {
......@@ -61,11 +61,11 @@ pub fn access_token(
let oauth2_item = get_oauth2_flow(tx, schema, &payload.platform)?;
if payload.platform == Platforms::Twitter
|| token_should_be_refreshed(oauth2_item.access_token.clone(), &payload.platform)?
|| token_should_be_refreshed(oauth2_item.access_token.clone(), &payload.platform).await?
{
debug!("Refreshing token...");
let token = exchange_refresh_token(&oauth2_item)?;
let token = exchange_refresh_token(&oauth2_item).await?;
internal_api::update_item_tx(
tx,
......@@ -95,8 +95,8 @@ pub fn access_token(
}
}
pub fn authorize(
tx: &Transaction,
pub async fn authorize(
tx: &AsyncTx,
schema: &mut Schema,
owner: &str,
cli: &CliOptions,
......@@ -104,7 +104,7 @@ pub fn authorize(
) -> Result<Oauth2AccessTokenResponse> {
debug!("Authorizing token for {:?}", payload.platform);
let token = exchange_code(&payload)?;
let token = exchange_code(&payload).await?;
// It's allowed to call authorize many times for the same platform,
// re-use the Oauth2Flow item in the database if exist.
......@@ -152,7 +152,7 @@ pub fn authorize(
..Default::default()
};
let id = internal_api::create_item_tx(tx, schema, item, owner, cli)?;
let id = internal_api::create_item_tx(tx, schema, item, owner, cli).await?;
debug!("Access token for {:?}, created {id}", payload.platform);
}
};
......@@ -200,26 +200,25 @@ lazy_static! {
]);
}
fn exchange_refresh_token(oauth2_item: &Oauth2Flow) -> Result<BasicTokenResponse> {
async fn exchange_refresh_token(oauth2_item: &Oauth2Flow) -> Result<BasicTokenResponse> {
let client = get_client(&oauth2_item.platform, oauth2_item.redirect_uri.clone())?;
let res = task::block_in_place(|| {
client
.exchange_refresh_token(&RefreshToken::new(oauth2_item.refresh_token.clone()))
.request(http_client)
})?;
let res = client
.exchange_refresh_token(&RefreshToken::new(oauth2_item.refresh_token.clone()))
.request_async(async_http_client)
.await?;
Ok(res)
}
fn exchange_code(payload: &Oauth2AuthorizeTokenRequest) -> Result<BasicTokenResponse> {
async fn exchange_code(payload: &Oauth2AuthorizeTokenRequest) -> Result<BasicTokenResponse> {
let client = get_client(&payload.platform, payload.redirect_uri.clone())?;
let res = task::block_in_place(|| {
client
.exchange_code(AuthorizationCode::new(payload.auth_code.clone()))
.set_pkce_verifier(PkceCodeVerifier::new(payload.pkce_verifier.clone()))
.request(http_client)
})?;
let res = client
.exchange_code(AuthorizationCode::new(payload.auth_code.clone()))
.set_pkce_verifier(PkceCodeVerifier::new(payload.pkce_verifier.clone()))
.request_async(async_http_client)
.await?;
Ok(res)
}
......@@ -247,7 +246,7 @@ fn get_client(platform: &Platforms, redirect_uri: String) -> Result<BasicClient>
.set_redirect_uri(RedirectUrl::new(redirect_uri)?))
}
fn get_oauth2_flow(tx: &Transaction, schema: &Schema, platform: &Platforms) -> Result<Oauth2Flow> {
fn get_oauth2_flow(tx: &AsyncTx, schema: &Schema, platform: &Platforms) -> Result<Oauth2Flow> {
let query = Search {
_type: Some(OAUTH2_FLOW.to_string()),
deleted: Some(false),
......@@ -289,7 +288,7 @@ struct GitlabTokenInfo {
}
const EXPIRY_THRESHOLD: u32 = 60;
fn token_should_be_refreshed(access_token: String, platform: &Platforms) -> Result<bool> {
async fn token_should_be_refreshed(access_token: String, platform: &Platforms) -> Result<bool> {
let token_info_url = &PLATFORM_CONFIG
.get(platform)
.ok_or(internal_error! { "Missing configuration for {platform:?}",
......@@ -299,27 +298,28 @@ fn token_should_be_refreshed(access_token: String, platform: &Platforms) -> Resu
.ok_or(internal_error! { "Missing token_info_url for {platform:?}"})?
.to_string();
task::block_in_place(move || -> Result<bool> {
let client = reqwest::blocking::Client::new();
{
let client = reqwest::Client::new();
let res = client
.get(token_info_url)
.header(CONTENT_TYPE, "application/json")
.bearer_auth(access_token)
.send()
.await
.context(|| format!("Failed to retrieve token information from {token_info_url}",))?;
if res.status() != StatusCode::OK {
debug!(
"Error GET {token_info_url} response {:#?}",
res.json::<serde_json::Value>()
res.json::<serde_json::Value>().await
);
Ok(true)
} else {
let res: GitlabTokenInfo = res.json()?;
let res: GitlabTokenInfo = res.json().await?;
debug!("Ok GET {token_info_url} response {res:#?}");
// Token is going to expiry very soon
Ok(res.expires_in < EXPIRY_THRESHOLD)
}
})
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment