Unverified Commit f1b1f6b6 authored by Vasili Novikov's avatar Vasili Novikov
Browse files

Implement date filtering (efficient sync)

parent 5d6dad38
Showing with 70 additions and 64 deletions
+70 -64
......@@ -200,10 +200,13 @@ Mark an item as deleted:
```json
{
"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99",
"payload": { "_type": "Label", "color": "#CCFF00", ... }
"payload": { "_type": "Label", "color": "#CCFF00", "_dateServerModifiedAfter": 123456789, ... }
}
```
Search items by their fields.
Field `_dateServerModifiedAfter` is not treated in the standard way, and instead, it filters
items by their `_dateServerModified` field using the `>` operator.
The endpoint will return an array of all items with exactly the same properties.
......
......@@ -49,6 +49,15 @@ pub struct BulkAction {
pub delete_edges: Vec<DeleteEdge>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SearchByFields {
#[serde(rename = "_dateServerModifiedAfter")]
pub _date_server_modified_after: Option<i64>,
#[serde(flatten)]
pub fields: HashMap<String, Value>,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct PayloadWrapper<T> {
......
use crate::api_model::BulkAction;
use crate::api_model::CreateItem;
use crate::api_model::DeleteEdge;
use crate::api_model::SearchByFields;
use crate::error::Error;
use crate::error::Result;
use crate::sql_converters::borrow_sql_params;
use crate::sql_converters::fields_mapping_to_owned_sql_params;
use crate::sql_converters::json_value_to_sqlite;
use crate::sql_converters::sqlite_row_to_map;
use crate::sql_converters::sqlite_rows_to_json;
......@@ -15,7 +15,6 @@ use rusqlite::Connection;
use rusqlite::ToSql;
use rusqlite::Transaction;
use rusqlite::NO_PARAMS;
use serde_json::value::Value::Object;
use serde_json::Value;
use std::collections::HashMap;
use std::collections::HashSet;
......@@ -259,20 +258,11 @@ pub fn create_item(conn: &mut Connection, create_action: CreateItem) -> Result<i
Ok(result)
}
pub fn search_by_fields(tx: &Transaction, query: Value) -> Result<Vec<Value>> {
pub fn search_by_fields(tx: &Transaction, query: SearchByFields) -> Result<Vec<Value>> {
debug!("Searching by fields {:?}", query);
let fields_map = match query {
Object(map) => map,
_ => {
return Err(Error {
code: StatusCode::BAD_REQUEST,
msg: "Expected JSON object".to_string(),
})
}
};
let mut sql_body = "SELECT * FROM items WHERE ".to_string();
let mut first_parameter = true;
for (field, value) in &fields_map {
for (field, value) in &query.fields {
validate_property_name(field)?;
match value {
Value::Array(_) => continue,
......@@ -287,10 +277,20 @@ pub fn search_by_fields(tx: &Transaction, query: Value) -> Result<Vec<Value>> {
sql_body.push_str(" = :");
sql_body.push_str(field);
}
if query._date_server_modified_after.is_some() {
sql_body.push_str(" AND _dateServerModified > :_dateServerModified");
};
sql_body.push_str(";");
let sql_params = fields_mapping_to_owned_sql_params(&fields_map)?;
let sql_params = borrow_sql_params(&sql_params);
let mut sql_params = Vec::new();
for (key, value) in &query.fields {
sql_params.push((format!(":{}", key), json_value_to_sqlite(&value, &key)?));
}
if let Some(date) = query._date_server_modified_after {
let key = ":_dateServerModified".to_string();
sql_params.push((key, date.into()));
};
let sql_params = borrow_sql_params(sql_params.as_slice());
let mut stmt = tx.prepare_cached(&sql_body)?;
let rows = stmt.query_named(sql_params.as_slice())?;
let json = sqlite_rows_to_json(rows, true)?;
......
// Fake simple library interface to allow integration tests to work
mod api_model;
pub mod api_model;
mod command_line_interface;
pub mod constants;
pub mod database_migrate_refinery;
......
......@@ -64,21 +64,6 @@ fn sqlite_value_to_json(value: ValueRef, column_name: &str) -> Option<Value> {
}
}
pub fn fields_mapping_to_owned_sql_params(
fields_map: &Map<String, serde_json::Value>,
) -> crate::error::Result<Vec<(String, ToSqlOutput)>> {
let mut sql_params = Vec::new();
for (key, value) in fields_map {
if value.is_array() || value.is_object() {
continue;
};
let value = json_value_to_sqlite(value, key)?;
let key = format!(":{}", key);
sql_params.push((key, value));
}
Ok(sql_params)
}
pub fn borrow_sql_params<'a>(
sql_params: &'a [(String, ToSqlOutput)],
) -> Vec<(&'a str, &'a dyn ToSql)> {
......
......@@ -5,6 +5,7 @@ use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::SearchByFields;
use crate::api_model::UpdateItem;
use crate::command_line_interface::CLIOptions;
use crate::internal_api;
......@@ -13,7 +14,6 @@ use bytes::Bytes;
use log::error;
use log::info;
use log::warn;
use serde_json::Value;
use std::collections::HashSet;
use std::net::IpAddr;
use std::net::SocketAddr;
......@@ -130,7 +130,7 @@ pub async fn run_server(cli_options: &CLIOptions) {
.and(warp::path!(String / "search_by_fields"))
.and(warp::path::end())
.and(warp::body::json())
.map(move |owner: String, body: PayloadWrapper<Value>| {
.map(move |owner: String, body: PayloadWrapper<SearchByFields>| {
let result = warp_endpoints::search_by_fields(owner, init_db.deref(), body);
let result = result.map(|result| warp::reply::json(&result));
respond_with_result(result)
......
......@@ -5,6 +5,7 @@ use crate::api_model::PayloadWrapper;
use crate::api_model::RunDownloader;
use crate::api_model::RunImporter;
use crate::api_model::RunIndexer;
use crate::api_model::SearchByFields;
use crate::api_model::UpdateItem;
use crate::command_line_interface;
use crate::command_line_interface::CLIOptions;
......@@ -96,7 +97,7 @@ pub fn delete_item(
pub fn search_by_fields(
owner: String,
init_db: &RwLock<HashSet<String>>,
body: PayloadWrapper<Value>,
body: PayloadWrapper<SearchByFields>,
) -> Result<Vec<Value>> {
let mut conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
in_transaction(&mut conn, |tx| {
......
......@@ -3,7 +3,6 @@ extern crate pod;
use lazy_static::lazy_static;
use pod::database_migrate_refinery;
use pod::database_migrate_schema;
use pod::error::Error;
use pod::internal_api;
use r2d2::ManageConnection;
use r2d2::Pool;
......@@ -42,42 +41,51 @@ lazy_static! {
#[test]
fn test_bulk_action() {
let sqlite: &Pool<SqliteConnectionManager> = &SQLITE;
let json = json!({
"createItems": [{"uid": 1, "_type": "Person"}, {"uid": 2, "_type": "Person"}],
"updateItems": [{"uid": 1, "_type": "Person1"}],
"createEdges": [{"_type": "friend", "_source": 1, "_target": 2, "edgeLabel": "test", "sequence": 1}]
});
let mut conn = sqlite.get().unwrap();
let bulk = {
{
let tx = conn.transaction().unwrap();
let result = internal_api::bulk_action_tx(&tx, serde_json::from_value(json).unwrap());
let json = json!({
"createItems": [{"uid": 1, "_type": "Person"}, {"uid": 2, "_type": "Person"}],
"updateItems": [{"uid": 1, "_type": "Person1"}],
"createEdges": [{"_type": "friend", "_source": 1, "_target": 2, "edgeLabel": "test", "sequence": 1}]
});
let bulk = internal_api::bulk_action_tx(&tx, serde_json::from_value(json).unwrap());
tx.commit().unwrap();
result
};
assert_eq!(bulk, Ok(()));
}
let with_edges = {
{
let tx = conn.transaction().unwrap();
internal_api::get_item_with_edges_tx(&tx, 1)
};
let with_edges = internal_api::get_item_with_edges_tx(&tx, 1);
assert!(
with_edges.is_ok(),
"get items with edges failed with: {:?}",
with_edges
);
}
let json = json!({"_type": "Person"});
let search = {
{
let tx = conn.transaction().unwrap();
internal_api::search_by_fields(&tx, json)
};
let json = json!({"_type": "Person"});
let search = internal_api::search_by_fields(&tx, serde_json::from_value(json).unwrap());
let search = search.expect("Search request failed");
assert!(!search.is_empty());
}
assert_eq!(bulk, Ok(()));
assert!(
with_edges.is_ok(),
"get items with edges failed with: {:?}",
with_edges
);
assert!(check_has_item(search));
}
{
let tx = conn.transaction().unwrap();
let json = json!({"_type": "Person", "_dateServerModifiedAfter": 1_000_000_000_000_i64 });
let search = internal_api::search_by_fields(&tx, serde_json::from_value(json).unwrap());
let search = search.expect("Search request failed");
assert!(!search.is_empty());
}
fn check_has_item(result: Result<Vec<Value>, Error>) -> bool {
result.iter().flatten().next().is_some()
{
let tx = conn.transaction().unwrap();
let json = json!({"_type": "Person", "_dateServerModifiedAfter": 999_000_000_000_000_i64 });
let search = internal_api::search_by_fields(&tx, serde_json::from_value(json).unwrap());
let search = search.expect("Search request failed");
assert_eq!(search, Vec::<Value>::new());
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment