Unverified Commit 6bb57d86 authored by Vasili Novikov's avatar Vasili Novikov
Browse files

Implement authentication, lazy DB init and DB encryption

parent 6052117a
Showing with 897 additions and 535 deletions
+897 -535
......@@ -4,8 +4,6 @@ target/
# IDE-s
.idea/
# database file
/data/db/*.db
# User-generated files (database, files, etc)
/data/
# user files (in future)
/data/files
This diff is collapsed.
......@@ -12,12 +12,13 @@ publish = false # Since the project does not have a licence yet, prevent acciden
[dependencies]
blake2 = "0.9.0"
bytes = "0.5.6"
chrono = { version = "0.4.13", features = ["serde"] }
env_logger = "0.7.1"
hex = "0.4.2"
lazy_static = "1.4.0"
log = "0.4.11"
pnet = "0.26.0"
r2d2 = "0.8.9"
r2d2_sqlite = "0.16.0"
refinery = { version = "0.3.0", features = ["rusqlite"] }
......@@ -28,6 +29,15 @@ serde_json = "1.0.56"
tokio = { version = "0.2.22", features = ["macros"] }
warp = { version = "0.2.4", default-features = false, features = ["tls"] }
[dev-dependencies]
criterion = "0.3.3"
[profile.release]
lto = true
[[bench]]
name = "rusqlite_reconnection"
harness = false
......@@ -4,14 +4,6 @@ Pod is the open-source backend for [Memri](https://memri.io/) project.
It's written in Rust and provides an HTTP interface for use by the clients.
## WARNING: NOT SECURE WITH PUBLIC IP!!!
The current version of Pod **DOES NOT** guarantee security yet,
**DO NOT** use it for production or run it with a public IP.
* When attempting to run with a public IP, Pod will give an error and refuse to start;
* Setting the environment variable `INSECURE_USE_PUBLIC_IP` to any value
will allow Pod to start even with a public IP (with the security implications above!).
## Run in docker
To run Pod inside docker:
......@@ -44,7 +36,7 @@ If you develop Pod, you might want to have faster build turn-around.
Use this to incrementally compile the project (after installing [cargo-watch](https://github.com/passcod/cargo-watch)):
```sh
cargo watch --ignore /docs --ignore /data
cargo watch --ignore docs
```
To build (debug version):
......
use criterion::criterion_group;
use criterion::criterion_main;
use criterion::Criterion;
use rusqlite::Connection;
/// Test the performance of opening an sqlite (rusqlite) connection,
/// accessing the DB and closing the connection.
fn open_file_connection() {
let conn = Connection::open("target/criterion/deleteme.db").unwrap();
let params: &[i64] = &[];
let result = conn
.execute("UPDATE test SET uid = 91 WHERE uid = 91;", params)
.unwrap();
assert_eq!(result, 0);
}
fn criterion_benchmark(c: &mut Criterion) {
let db_address = "target/criterion/deleteme.db";
let conn = Connection::open(db_address).unwrap();
conn.execute_batch("CREATE TABLE IF NOT EXISTS test ( uid INTEGER );")
.unwrap();
conn.close().unwrap();
c.bench_function("reopening SQLite connections in a loop", |b| {
b.iter(|| open_file_connection())
});
std::fs::remove_file(db_address).unwrap();
}
criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);
......@@ -15,6 +15,7 @@ services:
- /var/run/docker.sock:/var/run/docker.sock
environment:
- POD_IS_IN_DOCKER=true
# - POD_OWNER_HASHES=your-client's-key-hash
volumes:
db:
......
......@@ -75,9 +75,10 @@ When you need to calculate the hash to send to Pod, you can use one of the libra
[example](https://github.com/jedisct1/libsodium.js/blob/master/test/sodium_utils.js#L113)),
[swift](https://github.com/jedisct1/swift-sodium/blob/master/Sodium/GenericHash.swift),
[rust](https://crates.io/crates/blake2),
[libsodium](https://doc.libsodium.org/hashing/generic_hashing).
[libsodium](https://doc.libsodium.org/hashing/generic_hashing),
CLI `b2sum --length=256`.
During development, you can also just send any request to the Pod and see it's logs,
which will contain the owner check denial along with the expected hash.
which will contain the owner denial along with the expected hash.
Additionally, you can use the word "ANY" for owner list in Pod, which will make Pod accept
requests from any owner -- so called multi-tenancy.
......
......@@ -48,3 +48,33 @@ pub struct BulkAction {
#[serde(default)]
pub delete_edges: Vec<DeleteEdge>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PayloadWrapper<T> {
pub database_key: String,
pub payload: T,
}
//
// Services:
//
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RunDownloader {
pub service: String,
pub data_type: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RunImporter {
pub data_type: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RunIndexer {
pub uid: i64,
}
// Not a real configuration, but one place that lists all environment variable with their usages.
use std::env;
pub const DATABASE_DIR: &str = "./data/db";
pub fn pod_is_in_docker() -> bool {
env::var_os("POD_IS_IN_DOCKER").is_some()
}
/// Comma-separated list of ownerKey-s (hashes) that the server should respond to.
///
/// See /docs/HTTP_API.md on how it works
pub fn pod_owners() -> Option<String> {
env::var("POD_OWNER_HASHES").ok()
}
use r2d2::ManageConnection;
use r2d2_sqlite::SqliteConnectionManager;
use crate::error::Result;
use refinery::Runner;
use rusqlite::Connection;
pub mod embedded {
use refinery::embed_migrations;
embed_migrations!("./res/migrations");
}
pub fn migrate(sqlite: &SqliteConnectionManager) {
let mut refinery_connection = sqlite
.connect()
.expect("Failed to open a connection for refinery database migrations");
// Run "refinery" migrations to bring the core structure of items/edges up-to-date
embedded::migrations::runner()
.run(&mut refinery_connection)
.expect("Failed to run refinery migrations");
/// Run "refinery" migrations to bring the core structure of items/edges up-to-date
pub fn migrate(conn: &mut Connection) -> Result<()> {
let runner: Runner = embedded::migrations::runner();
runner.run(conn).map(|_report| ()).map_err(|e| e.into())
}
use crate::sql_converters::validate_property_name;
use lazy_static::lazy_static;
use log::info;
use r2d2::Pool;
use r2d2::PooledConnection;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::Connection;
use rusqlite::NO_PARAMS;
use serde::Deserialize;
use serde::Serialize;
use std::collections::HashMap;
use std::collections::HashSet;
use std::hash::Hash;
#[derive(Serialize, Deserialize)]
struct DatabaseSchema {
......@@ -104,12 +101,9 @@ lazy_static! {
};
}
pub const SCHEMA_JSON_BYTES: &[u8] = include_bytes!("../res/autogenerated_database_schema.json");
const SCHEMA_JSON_BYTES: &[u8] = include_bytes!("../res/autogenerated_database_schema.json");
pub fn migrate(sqlite: &Pool<SqliteConnectionManager>) -> Result<(), String> {
let conn = sqlite
.get()
.expect("Failed to acquire SQLite connection during db initialization");
pub fn migrate(conn: &Connection) -> Result<(), String> {
info!("Initializing database schema (additional columns)");
let schema: &DatabaseSchema = &SCHEMA_STRUCT;
let (column_indexes, declared_columns) = get_column_info(schema, &conn)?;
......@@ -117,10 +111,8 @@ pub fn migrate(sqlite: &Pool<SqliteConnectionManager>) -> Result<(), String> {
if !sql.is_empty() {
info!("Updating database schema with:\n{}", sql);
}
conn.execute_batch(&sql)
.unwrap_or_else(|err| panic!("Failed to execute SQL:\n{}\n{}", sql, err));
Ok(())
.map_err(|err| format!("Failed to execute SQL:\n{}\n{}", sql, err))
}
fn validate_schema(schema: &DatabaseSchema) -> Result<(), String> {
......@@ -156,7 +148,7 @@ fn validate_schema(schema: &DatabaseSchema) -> Result<(), String> {
type IndexedAndDeclaredProperties = (HashMap<String, bool>, HashMap<String, SchemaPropertyType>);
fn get_column_info(
schema: &DatabaseSchema,
conn: &PooledConnection<SqliteConnectionManager>,
conn: &Connection,
) -> Result<IndexedAndDeclaredProperties, String> {
validate_schema(&schema)?;
let mut column_indexes = HashMap::new();
......@@ -186,10 +178,7 @@ fn get_column_info(
//
// Note that the approach of querying `pragma_table_info`
// does not work on older sqlcipher versions (ubuntu 20.04 still uses sqlcipher 3.4).
fn get_all_columns_pragma(
table: &str,
conn: &PooledConnection<SqliteConnectionManager>,
) -> rusqlite::Result<HashSet<String>> {
fn get_all_columns_pragma(table: &str, conn: &Connection) -> rusqlite::Result<HashSet<String>> {
let sql = format!("PRAGMA table_info('{}');", table);
let mut stmt = conn.prepare_cached(&sql)?;
let mut rows = stmt.query(NO_PARAMS)?;
......@@ -235,17 +224,3 @@ fn generate_sql(
result
}
pub fn group_by<T, K, F>(collection: T, grouping_func: F) -> HashMap<K, Vec<T::Item>>
where
T: IntoIterator,
F: Fn(&T::Item) -> K,
K: Eq + Hash,
{
let mut map = HashMap::new();
for item in collection {
let group = grouping_func(&item);
map.entry(group).or_insert_with(Vec::new).push(item);
}
map
}
......@@ -15,6 +15,16 @@ impl core::fmt::Display for Error {
}
}
impl From<hex::FromHexError> for Error {
fn from(err: hex::FromHexError) -> Error {
let msg = format!("Error converting from hex, {}", err);
Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg,
}
}
}
impl From<r2d2::Error> for Error {
fn from(err: r2d2::Error) -> Error {
let msg = format!("Database r2d2 error {}", err);
......@@ -25,6 +35,16 @@ impl From<r2d2::Error> for Error {
}
}
impl From<refinery::Error> for Error {
fn from(err: refinery::Error) -> Error {
let msg = format!("Database 'refinery' migration error, {}", err);
Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg,
}
}
}
impl From<rusqlite::Error> for Error {
fn from(err: rusqlite::Error) -> Error {
let msg = format!("Database rusqlite error {}", err);
......@@ -44,3 +64,16 @@ impl From<serde_json::Error> for Error {
}
}
}
impl<T> From<std::sync::PoisonError<T>> for Error {
fn from(err: std::sync::PoisonError<T>) -> Error {
let msg = format!(
"Failed to acquire internal lock because it was poisoned {}",
err
);
Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg,
}
}
}
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::DeleteEdge;
use crate::api_model::UpdateItem;
use crate::error::Error;
use crate::error::Result;
use crate::sql_converters::borrow_sql_params;
......@@ -12,8 +11,7 @@ use crate::sql_converters::sqlite_rows_to_json;
use crate::sql_converters::validate_property_name;
use chrono::Utc;
use log::debug;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::Connection;
use rusqlite::ToSql;
use rusqlite::Transaction;
use rusqlite::NO_PARAMS;
......@@ -23,17 +21,14 @@ use std::collections::HashMap;
use std::str;
use warp::http::status::StatusCode;
/// Get project version as seen by Cargo.
pub fn get_project_version() -> String {
let git = env!("GIT_DESCRIBE");
let cargo = env!("CARGO_PKG_VERSION");
format!("{}-cargo{})", git, cargo)
}
/// See HTTP_API.md for details
pub fn get_item(sqlite: &Pool<SqliteConnectionManager>, uid: i64) -> Result<Vec<Value>> {
pub fn get_item(conn: &Connection, uid: i64) -> Result<Vec<Value>> {
debug!("Getting item {}", uid);
let conn = sqlite.get()?;
let mut stmt = conn.prepare_cached("SELECT * FROM items WHERE uid = :uid")?;
let rows = stmt.query_named(&[(":uid", &uid)])?;
......@@ -55,10 +50,8 @@ fn check_item_exists(tx: &Transaction, uid: i64) -> Result<bool> {
Ok(result)
}
/// See HTTP_API.md for details
pub fn get_all_items(sqlite: &Pool<SqliteConnectionManager>) -> Result<Vec<Value>> {
pub fn get_all_items(conn: &Connection) -> Result<Vec<Value>> {
debug!("Getting all items");
let conn = sqlite.get()?;
let mut stmt = conn.prepare_cached("SELECT * FROM items")?;
let rows = stmt.query(NO_PARAMS)?;
let json = sqlite_rows_to_json(rows, true)?;
......@@ -107,8 +100,7 @@ fn execute_sql(tx: &Transaction, sql: &str, fields: &HashMap<String, Value>) ->
Ok(())
}
/// Create an item presuming consistency checks were already done
fn create_item_tx(tx: &Transaction, fields: HashMap<String, Value>) -> Result<i64> {
pub fn create_item_tx(tx: &Transaction, fields: HashMap<String, Value>) -> Result<i64> {
let mut fields: HashMap<String, Value> = fields
.into_iter()
.filter(|(k, v)| !is_array_or_object(v) && validate_property_name(k).is_ok())
......@@ -132,8 +124,7 @@ fn create_item_tx(tx: &Transaction, fields: HashMap<String, Value>) -> Result<i6
Ok(tx.last_insert_rowid())
}
/// Update an item presuming all dangerous fields were already removed, and "uid" is present
fn update_item_tx(tx: &Transaction, uid: i64, fields: HashMap<String, Value>) -> Result<()> {
pub fn update_item_tx(tx: &Transaction, uid: i64, fields: HashMap<String, Value>) -> Result<()> {
let mut fields: HashMap<String, Value> = fields
.into_iter()
.filter(|(k, v)| !is_array_or_object(v) && validate_property_name(k).is_ok())
......@@ -164,7 +155,6 @@ fn update_item_tx(tx: &Transaction, uid: i64, fields: HashMap<String, Value>) ->
execute_sql(tx, &sql, &fields)
}
/// Create an edge presuming consistency checks were already done
fn create_edge(
tx: &Transaction,
_type: String,
......@@ -207,9 +197,7 @@ fn create_edge(
execute_sql(tx, &sql, &fields)
}
/// Delete an edge and all its properties.
/// WARNING: Deleting an edge is irreversible!!!
fn delete_edge(tx: &Transaction, edge: DeleteEdge) -> Result<usize> {
fn delete_edge_tx(tx: &Transaction, edge: DeleteEdge) -> Result<usize> {
let sql =
"DELETE FROM edges WHERE _source = :_source AND _target = :_target AND _type = :_type;";
let sql_params = vec![
......@@ -223,7 +211,7 @@ fn delete_edge(tx: &Transaction, edge: DeleteEdge) -> Result<usize> {
Ok(result)
}
fn delete_item_tx(tx: &Transaction, uid: i64) -> Result<()> {
pub fn delete_item_tx(tx: &Transaction, uid: i64) -> Result<()> {
let mut fields = HashMap::new();
let time_now = Utc::now().timestamp_millis();
fields.insert("deleted".to_string(), true.into());
......@@ -231,7 +219,7 @@ fn delete_item_tx(tx: &Transaction, uid: i64) -> Result<()> {
update_item_tx(tx, uid, fields)
}
fn bulk_action_tx(tx: &Transaction, bulk_action: BulkAction) -> Result<()> {
pub fn bulk_action_tx(tx: &Transaction, bulk_action: BulkAction) -> Result<()> {
debug!("Performing bulk action {:#?}", bulk_action);
let edges_will_be_created = !bulk_action.create_edges.is_empty();
for item in bulk_action.create_items {
......@@ -253,57 +241,20 @@ fn bulk_action_tx(tx: &Transaction, bulk_action: BulkAction) -> Result<()> {
delete_item_tx(tx, edge_uid)?;
}
for del_edge in bulk_action.delete_edges {
delete_edge(tx, del_edge)?;
delete_edge_tx(tx, del_edge)?;
}
Ok(())
}
/// See HTTP_API.md for details
pub fn bulk_action(sqlite: &Pool<SqliteConnectionManager>, json: Value) -> Result<()> {
debug!("Performing bulk action {}", json);
let bulk_action: BulkAction = serde_json::from_value(json)?;
let mut conn = sqlite.get()?;
let tx = conn.transaction()?;
bulk_action_tx(&tx, bulk_action)?;
tx.commit()?;
Ok(())
}
/// See HTTP_API.md for details
pub fn create_item(sqlite: &Pool<SqliteConnectionManager>, json: Value) -> Result<i64> {
debug!("Creating item {}", json);
let create_action: CreateItem = serde_json::from_value(json)?;
let mut conn = sqlite.get()?;
pub fn create_item(conn: &mut Connection, create_action: CreateItem) -> Result<i64> {
debug!("Creating item {:?}", create_action);
let tx = conn.transaction()?;
let result = create_item_tx(&tx, create_action.fields)?;
tx.commit()?;
Ok(result)
}
/// See HTTP_API.md for details
pub fn update_item(sqlite: &Pool<SqliteConnectionManager>, uid: i64, json: Value) -> Result<()> {
debug!("Updating item {} with {}", uid, json);
let update_action: UpdateItem = serde_json::from_value(json)?;
let mut conn = sqlite.get()?;
let tx = conn.transaction()?;
update_item_tx(&tx, update_action.uid, update_action.fields)?;
tx.commit()?;
Ok(())
}
/// See HTTP_API.md for details
pub fn delete_item(sqlite: &Pool<SqliteConnectionManager>, uid: i64) -> Result<()> {
debug!("Deleting item {}", uid);
let time_now = Utc::now().timestamp_millis();
let json = serde_json::json!({ "deleted": true, "dateModified": time_now, });
update_item(sqlite, uid, json)
}
/// See HTTP_API.md for details
pub fn search_by_fields(
sqlite: &Pool<SqliteConnectionManager>,
query: Value,
) -> Result<Vec<Value>> {
pub fn search_by_fields(tx: &Transaction, query: Value) -> Result<Vec<Value>> {
debug!("Searching by fields {:?}", query);
let fields_map = match query {
Object(map) => map,
......@@ -335,14 +286,12 @@ pub fn search_by_fields(
let sql_params = fields_mapping_to_owned_sql_params(&fields_map)?;
let sql_params = borrow_sql_params(&sql_params);
let conn = sqlite.get()?;
let mut stmt = conn.prepare_cached(&sql_body)?;
let mut stmt = tx.prepare_cached(&sql_body)?;
let rows = stmt.query_named(sql_params.as_slice())?;
let json = sqlite_rows_to_json(rows, true)?;
Ok(json)
}
/// See HTTP_API.md for details
pub fn get_item_with_edges_tx(tx: &Transaction, uid: i64) -> Result<Value> {
let mut stmt_item = tx.prepare_cached("SELECT * FROM items WHERE uid = :uid")?;
let mut item_rows = stmt_item.query_named(&[(":uid", &uid)])?;
......@@ -392,32 +341,10 @@ pub fn get_item_with_edges_tx(tx: &Transaction, uid: i64) -> Result<Value> {
Ok(item.into())
}
pub fn get_item_with_edges(sqlite: &Pool<SqliteConnectionManager>, uid: i64) -> Result<Value> {
debug!("Getting item with edges {}", uid);
let mut conn = sqlite.get()?;
let tx = conn.transaction()?;
let result = get_item_with_edges_tx(&tx, uid)?;
tx.commit()?;
Ok(result)
}
fn get_items_with_edges_tx(tx: &Transaction, uids: &[i64]) -> Result<Vec<Value>> {
pub fn get_items_with_edges_tx(tx: &Transaction, uids: &[i64]) -> Result<Vec<Value>> {
let mut result = Vec::new();
for uid in uids {
result.push(get_item_with_edges_tx(tx, *uid)?);
}
Ok(result)
}
pub fn get_items_with_edges(
sqlite: &Pool<SqliteConnectionManager>,
json: Value,
) -> Result<Vec<Value>> {
debug!("Getting items with edges {}", json);
let mut conn = sqlite.get()?;
let tx = conn.transaction()?;
let uids: Vec<i64> = serde_json::from_value(json)?;
let result = get_items_with_edges_tx(&tx, &uids)?;
tx.commit()?;
Ok(result)
}
// Fake simple library interface to allow integration tests to work
pub mod api_model;
mod api_model;
mod configuration;
pub mod database_migrate_refinery;
pub mod database_migrate_schema;
pub mod error;
pub mod internal_api;
pub mod sql_converters;
mod services_api;
mod sql_converters;
pub mod warp_endpoints;
......@@ -3,6 +3,7 @@ extern crate r2d2_sqlite;
extern crate rusqlite;
mod api_model;
mod configuration;
mod database_migrate_refinery;
pub mod database_migrate_schema;
mod error;
......@@ -10,20 +11,12 @@ pub mod internal_api;
pub mod services_api;
mod sql_converters;
mod warp_api;
mod warp_endpoints;
use chrono::Utc;
use env_logger::Env;
use log::info;
use log::warn;
use pnet::datalink;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use std::env;
use std::fs::create_dir_all;
use std::io::Write;
use std::net::IpAddr::V4;
use std::net::IpAddr::V6;
use std::path::PathBuf;
#[tokio::main]
async fn main() {
......@@ -40,56 +33,14 @@ async fn main() {
.init();
info!(
"Starting Pod version {} (Cargo version {})",
env!("GIT_DESCRIBE"),
env!("CARGO_PKG_VERSION")
std::env!("GIT_DESCRIBE"),
std::env!("CARGO_PKG_VERSION")
);
if std::env::args().any(|a| a == "--version" || a == "--help") {
eprintln!("Done");
std::process::exit(0)
};
let sqlite_file = PathBuf::from("data/db/pod.db");
info!("Using SQLite database {:?}", sqlite_file);
let sqlite_dir = sqlite_file
.parent()
.expect("Failed to get parent directory for database");
create_dir_all(sqlite_dir).expect("Failed to create database directory");
let sqlite_manager = SqliteConnectionManager::file(&sqlite_file)
.with_init(|c| c.execute_batch("PRAGMA foreign_keys = ON;"));
database_migrate_refinery::migrate(&sqlite_manager);
let sqlite: Pool<SqliteConnectionManager> =
r2d2::Pool::new(sqlite_manager).expect("Failed to create r2d2 SQLite connection pool");
// Run "schema" migrations based on the auto-generated JSON schema.
// This creates all optional properties in items table, and adds/removes property indices.
database_migrate_schema::migrate(&sqlite)
.unwrap_or_else(|err| panic!("Failed to migrate schema, {}", err));
// Try to prevent Pod from running on a public IP
for interface in datalink::interfaces() {
for ip in interface.ips {
// https://en.wikipedia.org/wiki/Private_network
let is_private = match ip.ip() {
V4(v4) => v4.is_private() || v4.is_loopback() || v4.is_link_local(),
V6(v6) if v6.is_loopback() => true,
// https://en.wikipedia.org/wiki/Unique_local_address
// Implementation copied from `v6.is_unique_local()`,
// which is not yet stabilized in Rust
V6(v6) if (v6.segments()[0] & 0xfe00) == 0xfc00 => true,
// https://en.wikipedia.org/wiki/Link-local_address
V6(v6) if (v6.segments()[0] & 0xffc0) == 0xfe80 => true,
_ => false,
};
if !is_private && env::var_os("INSECURE_USE_PUBLIC_IP").is_some() {
warn!("USING INSECURE PUBLIC IP {}.", ip.ip());
} else if !is_private {
eprintln!("ERROR! Pod seems to be running on a public network with IP {}. THIS IS NOT SECURE! Please wait until proper authorization is implemented, or do not run Pod on a publicly available network, or set environment variable INSECURE_USE_PUBLIC_IP to any value to override the failsafe.", ip.ip());
std::process::exit(1)
}
}
}
// Start web framework
warp_api::run_server(sqlite).await;
warp_api::run_server().await;
}
use crate::configuration::pod_is_in_docker;
use crate::error::Error;
use crate::error::Result;
use crate::internal_api;
use log::info;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::Connection;
use std::ops::Deref;
use std::process::Command;
use warp::http::status::StatusCode;
fn docker_arguments() -> Vec<String> {
if std::env::var_os("POD_IS_IN_DOCKER").is_some() {
vec![
"--network=pod_memri-net".to_string(),
"--env=POD_ADDRESS=pod_pod_1".to_string(),
]
} else {
// The indexers/importers/downloaders need to have access to the host
// This is currently done differently on MacOS and Linux
// https://stackoverflow.com/questions/24319662/from-inside-of-a-docker-container-how-do-i-connect-to-the-localhost-of-the-mach
let pod_address = if cfg!(target_os = "linux") {
"localhost"
} else {
"host.docker.internal"
};
vec![
format!("--env=POD_ADDRESS={}", pod_address),
"--network=host".to_string(),
]
}
}
pub fn run_downloaders(service: String, data_type: String) -> Result<()> {
pub fn run_downloader(service: String, data_type: String) -> Result<()> {
info!("Trying to run downloader {} for {}", service, data_type);
match service.as_str() {
"evernote" => match data_type.as_str() {
......@@ -59,7 +38,7 @@ pub fn run_downloaders(service: String, data_type: String) -> Result<()> {
Ok(())
}
pub fn run_importers(data_type: String) -> Result<()> {
pub fn run_importer(data_type: String) -> Result<()> {
info!("Trying to run importer for {}", data_type);
match data_type.as_str() {
"note" => {
......@@ -85,9 +64,9 @@ pub fn run_importers(data_type: String) -> Result<()> {
Ok(())
}
pub fn run_indexers(sqlite: &Pool<SqliteConnectionManager>, uid: i64) -> Result<()> {
pub fn run_indexers(conn: &Connection, uid: i64) -> Result<()> {
info!("Trying to run indexer on item {}", uid);
let result = internal_api::get_item(sqlite, uid)?;
let result = internal_api::get_item(conn.deref(), uid)?;
match result.first() {
Some(_item) => {
Command::new("docker")
......@@ -109,3 +88,25 @@ pub fn run_indexers(sqlite: &Pool<SqliteConnectionManager>, uid: i64) -> Result<
}),
}
}
fn docker_arguments() -> Vec<String> {
if pod_is_in_docker() {
vec![
"--network=pod_memri-net".to_string(),
"--env=POD_ADDRESS=pod_pod_1".to_string(),
]
} else {
// The indexers/importers/downloaders need to have access to the host
// This is currently done differently on MacOS and Linux
// https://stackoverflow.com/questions/24319662/from-inside-of-a-docker-container-how-do-i-connect-to-the-localhost-of-the-mach
let pod_address = if cfg!(target_os = "linux") {
"localhost"
} else {
"host.docker.internal"
};
vec![
format!("--env=POD_ADDRESS={}", pod_address),
"--network=host".to_string(),
]
}
}
......@@ -42,7 +42,7 @@ pub fn sqlite_rows_to_json(mut rows: Rows, partial: bool) -> rusqlite::Result<Ve
Ok(result)
}
pub fn sqlite_value_to_json(value: ValueRef, column_name: &str) -> Option<Value> {
fn sqlite_value_to_json(value: ValueRef, column_name: &str) -> Option<Value> {
match value {
ValueRef::Null => None,
ValueRef::Integer(i) => {
......@@ -153,28 +153,31 @@ pub fn json_value_to_sqlite<'a>(
}
}
/// Field name is valid only if it contains less than or equal to 18 characters and
/// characters from 'a' to 'z', 'A' to 'Z'.
pub fn validate_property_name(field: &str) -> crate::error::Result<()> {
pub fn validate_property_name(property: &str) -> crate::error::Result<()> {
lazy_static! {
static ref REGEXP: Regex = Regex::new(r"^[_a-zA-Z]{1,30}$").expect("Cannot create regex");
}
if REGEXP.is_match(field) && !BLACKLIST_COLUMN_NAMES.contains(field) {
Ok(())
} else {
if BLOCKLIST_COLUMN_NAMES.contains(property) {
Err(crate::error::Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Blocklisted item property {}", property),
})
} else if !REGEXP.is_match(property) {
Err(crate::error::Error {
code: StatusCode::BAD_REQUEST,
msg: format!(
"Item property name {} does not satisfy the format {}",
field,
property,
REGEXP.as_str()
),
})
} else {
Ok(())
}
}
// Taken from the official documentation https://www.sqlite.org/lang_keywords.html
const BLACKLIST_COLUMN_NAMES_ARRAY: &[&str] = &[
const BLOCKLIST_COLUMN_NAMES_ARRAY: &[&str] = &[
"ABORT",
"ACTION",
"ADD",
......@@ -323,8 +326,8 @@ const BLACKLIST_COLUMN_NAMES_ARRAY: &[&str] = &[
];
lazy_static! {
static ref BLACKLIST_COLUMN_NAMES: HashSet<String> = {
BLACKLIST_COLUMN_NAMES_ARRAY
static ref BLOCKLIST_COLUMN_NAMES: HashSet<String> = {
BLOCKLIST_COLUMN_NAMES_ARRAY
.iter()
.map(|w| w.to_string())
.collect()
......
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::UpdateItem;
use crate::internal_api;
use crate::services_api;
use bytes::Bytes;
use crate::warp_endpoints;
use log::info;
use log::warn;
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use serde_json::Value;
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::Arc;
use std::sync::RwLock;
use warp::http;
use warp::http::header::HeaderMap;
use warp::http::header::HeaderValue;
......@@ -15,7 +23,7 @@ use warp::Filter;
use warp::Reply;
/// Start web framework with specified APIs.
pub async fn run_server(sqlite_pool: Pool<SqliteConnectionManager>) {
pub async fn run_server() {
let package_name = env!("CARGO_PKG_NAME").to_uppercase();
info!("Starting {} HTTP server", package_name);
......@@ -24,159 +32,135 @@ pub async fn run_server(sqlite_pool: Pool<SqliteConnectionManager>) {
headers.insert("Access-Control-Allow-Origin", HeaderValue::from_static("*"));
let headers = warp::reply::with::headers(headers);
let api_version_1 = warp::path("v1");
let api_defaults = warp::path("v2")
.and(warp::body::content_length_limit(1024 * 32))
.and(warp::post());
let pool_arc = Arc::new(sqlite_pool);
let initialized_databases_arc = Arc::new(RwLock::new(HashSet::<String>::new()));
let version = warp::path("version")
.and(warp::path::end())
.and(warp::get())
.map(internal_api::get_project_version);
let pool = pool_arc.clone();
let get_item = api_version_1
.and(warp::path!("items" / i64))
let init_db = initialized_databases_arc.clone();
let get_item = api_defaults
.and(warp::path!(String / "get_item"))
.and(warp::path::end())
.and(warp::get())
.map(move |uid: i64| {
let result = internal_api::get_item(&pool, uid);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<i64>| {
let result = warp_endpoints::get_item(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let pool = pool_arc.clone();
let get_all_items = api_version_1
.and(warp::path!("all_items"))
let init_db = initialized_databases_arc.clone();
let get_all_items = api_defaults
.and(warp::path!(String / "get_all_items"))
.and(warp::path::end())
.and(warp::get())
.map(move || {
let result = internal_api::get_all_items(&pool);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<()>| {
let result = warp_endpoints::get_all_items(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let pool = pool_arc.clone();
let create_item = api_version_1
.and(warp::path("items"))
let init_db = initialized_databases_arc.clone();
let create_item = api_defaults
.and(warp::path!(String / "create_item"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.map(move |body: serde_json::Value| {
let result = internal_api::create_item(&pool, body);
.map(move |owner: String, body: PayloadWrapper<CreateItem>| {
let result = warp_endpoints::create_item(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let pool = pool_arc.clone();
let update_item = api_version_1
.and(warp::path!("items" / i64))
let init_db = initialized_databases_arc.clone();
let update_item = api_defaults
.and(warp::path!(String / "update_item"))
.and(warp::path::end())
.and(warp::put())
.and(warp::body::json())
.map(move |uid: i64, body: serde_json::Value| {
let result = internal_api::update_item(&pool, uid, body);
.map(move |owner: String, body: PayloadWrapper<UpdateItem>| {
let result = warp_endpoints::update_item(owner, init_db.deref(), body);
let result = result.map(|()| warp::reply::json(&serde_json::json!({})));
respond_with_result(result)
});
let pool = pool_arc.clone();
let bulk_action = api_version_1
.and(warp::path!("bulk_action"))
let init_db = initialized_databases_arc.clone();
let bulk_action = api_defaults
.and(warp::path!(String / "bulk_action"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.map(move |body: serde_json::Value| {
let result = internal_api::bulk_action(&pool, body);
.map(move |owner: String, body: PayloadWrapper<BulkAction>| {
let result = warp_endpoints::bulk_action(owner, init_db.deref(), body);
let result = result.map(|()| warp::reply::json(&serde_json::json!({})));
respond_with_result(result)
});
let pool = pool_arc.clone();
let delete_item = api_version_1
.and(warp::path!("items" / i64))
let init_db = initialized_databases_arc.clone();
let delete_item = api_defaults
.and(warp::path!(String / "delete_item"))
.and(warp::path::end())
.and(warp::delete())
.map(move |uid: i64| {
let result = internal_api::delete_item(&pool, uid);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<i64>| {
let result = warp_endpoints::delete_item(owner, init_db.deref(), body);
let result = result.map(|()| warp::reply::json(&serde_json::json!({})));
respond_with_result(result)
});
let pool = pool_arc.clone();
let external_id_exists = api_version_1
.and(warp::path!("deprecated" / "uri_exists" / String))
.and(warp::path::end())
.and(warp::get())
.map(move |external_id: String| {
let body = serde_json::json!({ "uri": external_id });
let result = internal_api::search_by_fields(&pool, body);
let result = result.map(|result| warp::reply::json(&!result.is_empty()));
respond_with_result(result)
});
let pool = pool_arc.clone();
let search = api_version_1
.and(warp::path("search_by_fields"))
let init_db = initialized_databases_arc.clone();
let search = api_defaults
.and(warp::path!(String / "search_by_fields"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::bytes())
.map(move |body: Bytes| {
let body =
serde_json::from_slice(&body).expect("Failed to serialize request body to JSON");
let result = internal_api::search_by_fields(&pool, body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let pool = pool_arc.clone();
let get_item_with_edges = api_version_1
.and(warp::path!("item_with_edges" / i64))
.and(warp::path::end())
.and(warp::get())
.map(move |uid: i64| {
let result = internal_api::get_item_with_edges(&pool, uid);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<Value>| {
let result = warp_endpoints::search_by_fields(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let pool = pool_arc.clone();
let get_items_with_edges = api_version_1
.and(warp::path!("items_with_edges"))
let init_db = initialized_databases_arc.clone();
let get_items_with_edges = api_defaults
.and(warp::path!(String / "get_items_with_edges"))
.and(warp::path::end())
.and(warp::post())
.and(warp::body::json())
.map(move |body: serde_json::Value| {
let result = internal_api::get_items_with_edges(&pool, body);
.map(move |owner: String, body: PayloadWrapper<Vec<i64>>| {
let result = warp_endpoints::get_items_with_edges(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
});
let run_downloaders = api_version_1
.and(warp::path!("run_service" / "downloaders" / String / String))
let init_db = initialized_databases_arc.clone();
let run_downloaders = api_defaults
// //! In fact, any type that implements `FromStr` can be used, in any order:
// ~/.cargo/registry.cache/src/github.com-1ecc6299db9ec823/warp-0.2.4/src/filters/path.rs:45
.and(warp::path!(String / "run_downloader"))
.and(warp::path::end())
.and(warp::post())
.map(move |service: String, data_type: String| {
let result = services_api::run_downloaders(service, data_type);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<RunDownloader>| {
let result = warp_endpoints::run_downloader(owner, init_db.deref(), body);
let result = result.map(|()| warp::reply::json(&serde_json::json!({})));
respond_with_result(result)
});
let run_importers = api_version_1
.and(warp::path!("run_service" / "importers" / String))
let init_db = initialized_databases_arc.clone();
let run_importers = api_defaults
.and(warp::path!(String / "run_importer"))
.and(warp::path::end())
.and(warp::post())
.map(move |data_type: String| {
let result = services_api::run_importers(data_type);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<RunImporter>| {
let result = warp_endpoints::run_importer(owner, init_db.deref(), body);
respond_with_result(result.map(|()| warp::reply::json(&serde_json::json!({}))))
});
let pool = pool_arc.clone();
let run_indexers = api_version_1
.and(warp::path!("run_service" / "indexers" / i64))
let init_db = initialized_databases_arc.clone();
let run_indexers = api_defaults
.and(warp::path!(String / "run_indexer"))
.and(warp::path::end())
.and(warp::post())
.map(move |uid: i64| {
let result = services_api::run_indexers(&pool, uid);
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<RunIndexer>| {
let result = warp_endpoints::run_indexer(owner, init_db.deref(), body);
respond_with_result(result.map(|()| warp::reply::json(&serde_json::json!({}))))
});
......@@ -208,9 +192,7 @@ pub async fn run_server(sqlite_pool: Pool<SqliteConnectionManager>) {
.or(bulk_action.with(&headers))
.or(update_item.with(&headers))
.or(delete_item.with(&headers))
.or(external_id_exists.with(&headers))
.or(search.with(&headers))
.or(get_item_with_edges.with(&headers))
.or(get_items_with_edges.with(&headers))
.or(run_downloaders.with(&headers))
.or(run_importers.with(&headers))
......
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::UpdateItem;
use crate::configuration;
use crate::database_migrate_refinery;
use crate::database_migrate_schema;
use crate::error::Error;
use crate::error::Result;
use crate::internal_api;
use crate::services_api;
use blake2::digest::Update;
use blake2::digest::VariableOutput;
use blake2::VarBlake2b;
use hex;
use lazy_static::lazy_static;
use log::error;
use log::info;
use rusqlite::Connection;
use serde_json::Value;
use std::collections::HashSet;
use std::ops::Deref;
use std::path::PathBuf;
use std::sync::RwLock;
use warp::http::status::StatusCode;
pub fn get_item(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<i64>,
) -> Result<Vec<Value>> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
internal_api::get_item(&conn, body.payload)
}
pub fn get_all_items(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<()>,
) -> Result<Vec<Value>> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
internal_api::get_all_items(&conn)
}
pub fn create_item(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<CreateItem>,
) -> Result<i64> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::create_item_tx(&tx, body.payload.fields);
tx.commit()?;
result
}
pub fn update_item(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<UpdateItem>,
) -> Result<()> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::update_item_tx(&tx, body.payload.uid, body.payload.fields);
tx.commit()?;
result
}
pub fn bulk_action(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<BulkAction>,
) -> Result<()> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::bulk_action_tx(&tx, body.payload);
tx.commit()?;
result
}
pub fn delete_item(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<i64>,
) -> Result<()> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::delete_item_tx(&tx, body.payload);
tx.commit()?;
result
}
pub fn search_by_fields(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<Value>,
) -> Result<Vec<Value>> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::search_by_fields(&tx, body.payload);
tx.commit()?;
result
}
pub fn get_items_with_edges(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<Vec<i64>>,
) -> Result<Vec<Value>> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
let tx = conn.transaction()?;
let result = internal_api::get_items_with_edges_tx(&tx, &body.payload);
tx.commit()?;
result
}
pub fn run_downloader(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<RunDownloader>,
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?; // Check DB access
services_api::run_downloader(body.payload.service, body.payload.data_type)
}
pub fn run_importer(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<RunImporter>,
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?; // Check DB access
services_api::run_importer(body.payload.data_type)
}
pub fn run_indexer(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<RunIndexer>,
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?; // Check DB access
services_api::run_indexers(&conn, body.payload.uid)
}
//
// helper functions:
//
/// Two methods combined into one to prevent creating a database connection without owner checks.
/// As additional failsafe to the fact that non-owners don't have the database key.
fn check_owner_and_initialize_db(
owner: &str,
init_db: &RwLock<HashSet<String>>,
database_key: &str,
) -> Result<Connection> {
check_owner(owner)?;
initialize_db(owner, init_db, database_key)
}
fn initialize_db(
owner: &str,
init_db: &RwLock<HashSet<String>>,
database_key: &str,
) -> Result<Connection> {
let database_path = format!("{}.db", &owner);
let database_path = PathBuf::from(configuration::DATABASE_DIR).join(database_path);
let mut conn = Connection::open(database_path).unwrap();
let pragma_sql = format!("PRAGMA key = \"x'{}'\";", database_key);
conn.execute_batch(&pragma_sql)?;
let mut init_db = init_db.write()?;
if !init_db.contains(owner) {
init_db.insert(owner.to_string());
database_migrate_refinery::migrate(&mut conn)?;
database_migrate_schema::migrate(&conn).map_err(|err| Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to migrate database according to schema, {}", err),
})?;
}
Ok(conn)
}
fn allowed_owner_hashes_fn() -> HashSet<Vec<u8>> {
if let Some(owners) = configuration::pod_owners() {
let mut result = HashSet::new();
for owner in owners.split(',') {
let hexed = hex::decode(owner).unwrap_or_else(|err| {
error!(
"POD_OWNER_HASHES is invalid, failed to decode {} from hex, {}",
owner, err
);
std::process::exit(1);
});
result.insert(hexed);
}
result
} else {
error!("No POD_OWNER_HASHES configured for Pod. Without owners to trust, Pod will not be able to answer any HTTP requests.");
HashSet::new()
}
}
lazy_static! {
static ref ALLOWED_OWNER_HASHES: HashSet<Vec<u8>> = allowed_owner_hashes_fn();
}
fn hash_hex(hex_string: &str) -> Result<Box<[u8]>> {
let mut possible_hash: VarBlake2b = VarBlake2b::new(32).expect("Invalid output size");
let hex_string = hex::decode(hex_string)?;
possible_hash.update(hex_string);
Ok(possible_hash.finalize_boxed())
}
fn check_owner(possible_owner: &str) -> Result<()> {
if configuration::pod_owners().iter().any(|e| e == "ANY") {
return Ok(());
};
let possible_hash = hash_hex(possible_owner)?;
if ALLOWED_OWNER_HASHES.contains(possible_hash.deref()) {
Ok(())
} else {
info!(
"Denying unexpected owner {} with hash {}",
possible_owner,
hex::encode(possible_hash)
);
Err(Error {
code: StatusCode::FORBIDDEN,
msg: "Unexpected owner".to_string(),
})
}
}
......@@ -33,7 +33,7 @@ lazy_static! {
let sqlite: Pool<SqliteConnectionManager> =
r2d2::Pool::new(sqlite).expect("Failed to create r2d2 SQLite connection pool");
database_migrate_schema::migrate(&sqlite)
database_migrate_schema::migrate(&mut refinery_connection)
.unwrap_or_else(|err| panic!("Failed to migrate schema, {}", err));
sqlite
};
......@@ -41,7 +41,7 @@ lazy_static! {
#[test]
fn test_bulk_action() {
let sqlite = &SQLITE;
let sqlite: &Pool<SqliteConnectionManager> = &SQLITE;
let json = json!({
"createItems": [{"uid": 1, "_type": "Person"}, {"uid": 2, "_type": "Person"}],
......@@ -49,18 +49,31 @@ fn test_bulk_action() {
"createEdges": [{"_type": "friend", "_source": 1, "_target": 2, "edgeLabel": "test", "sequence": 1}]
});
let bulk = bulk_action(&sqlite, json);
let mut conn = sqlite.get().unwrap();
let edges = get_item_with_edges(&sqlite, 1);
let bulk = {
let tx = conn.transaction().unwrap();
let result = bulk_action_tx(&tx, serde_json::from_value(json).unwrap());
tx.commit().unwrap();
result
};
let with_edges = {
let tx = conn.transaction().unwrap();
get_item_with_edges_tx(&tx, 1)
};
let json = json!({"_type": "Person"});
let search = search_by_fields(&sqlite, json);
let search = {
let tx = conn.transaction().unwrap();
search_by_fields(&tx, json)
};
assert_eq!(bulk, Ok(()));
assert!(
edges.is_ok(),
with_edges.is_ok(),
"get items with edges failed with: {:?}",
edges
with_edges
);
assert!(check_has_item(search));
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment