Unverified Commit 76c447fb authored by Vasili Novikov's avatar Vasili Novikov
Browse files

Change Downloaders/Importers to be uid-based

parent 30537023
Showing with 102 additions and 103 deletions
+102 -103
......@@ -224,14 +224,16 @@ If at least one input `uid` doesn't exist, return 404 NOT_FOUND for the whole re
# Services API
Services can add or modify data on your Pod.
They can only be ever run / authorized to run by the user.
Typical examples of services are services that import emails/messages into Pod.
### POST /v2/$owner_key/run_downloader
```json
{
"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99",
"payload": {
"service": ...,
"dataType": ...,
"uid": $uid,
"servicePayload": {
"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99",
"ownerKey": $owner_key
......@@ -239,9 +241,9 @@ If at least one input `uid` doesn't exist, return 404 NOT_FOUND for the whole re
}
}
```
Run a downloader for different services with different data types,
e.g. "service=evernote" with "dataType=note".
Unsupported service or data type will yield 400 BAD_REQUEST error.
Run a downloader on an item with the given uid.
⚠️ UNSTABLE: Downloaders might be merged with importers soon.
### POST /v2/$owner_key/run_importer
......@@ -249,7 +251,7 @@ Unsupported service or data type will yield 400 BAD_REQUEST error.
{
"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99",
"payload": {
"dataType": ...,
"uid": $uid,
"servicePayload": {
"databaseKey": "2DD29CA851E7B56E4697B0E1F08507293D761A05CE4D1B628663F411A8086D99",
"ownerKey": $owner_key
......@@ -257,8 +259,7 @@ Unsupported service or data type will yield 400 BAD_REQUEST error.
}
}
```
Run an importer for a given data type, e.g. "note".
Unsupported data type will yield 400 BAD_REQUEST error.
Run an importer on an item with the given uid.
### POST /v2/$owner_key/run_indexer
......
......@@ -63,15 +63,14 @@ pub struct PayloadWrapper<T> {
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RunDownloader {
pub service: String,
pub data_type: String,
pub uid: i64,
pub service_payload: Value,
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RunImporter {
pub data_type: String,
pub uid: i64,
pub service_payload: Value,
}
......
......@@ -11,100 +11,99 @@ use std::ops::Deref;
use std::process::Command;
use warp::http::status::StatusCode;
pub fn run_downloader(payload: RunDownloader) -> Result<()> {
let service = &payload.service;
let data_type = &payload.data_type;
info!("Trying to run downloader {} for {}", service, data_type);
match service.as_str() {
"evernote" => match data_type.as_str() {
"note" => {
Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&["--rm", "--name=memri-indexers_1", "-it"])
.args(&["memri-downloaders:latest"])
.spawn()
.expect("Failed to run downloader");
}
_ => {
return Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Data type {} not supported", data_type),
})
}
},
_ => {
return Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Service {} not supported", service),
})
pub fn run_downloader(conn: &Connection, payload: RunDownloader) -> Result<()> {
info!("Trying to run downloader on item {}", payload.uid);
let result = internal_api::get_item(conn.deref(), payload.uid)?;
if result.first().is_some() {
let command = Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&[
"--rm",
"--name=memri-downloaders_1",
&format!("--env=RUN_UID={}", payload.uid),
"--volume=download-volume:/usr/src/importers/data",
])
.args(&["memri-downloaders:latest"])
.spawn();
match command {
Ok(_child) => Ok(()),
Err(err) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to run importer with uid {}, {}", payload.uid, err),
}),
}
} else {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to get item with uid={}", payload.uid),
})
}
Ok(())
}
pub fn run_importer(payload: RunImporter) -> Result<()> {
let data_type = &payload.data_type;
info!("Trying to run importer for {}", data_type);
match data_type.as_str() {
"note" => {
Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&[
"--rm",
"--volume=download-volume:/usr/src/importers/data",
"--name=memri-importers_1",
])
.args(&["memri-importers:latest"])
.spawn()
.expect("Failed to run importer");
}
_ => {
return Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Data type {} not supported", data_type),
})
pub fn run_importer(conn: &Connection, payload: RunImporter) -> Result<()> {
info!("Trying to run importer on item {}", payload.uid);
let result = internal_api::get_item(conn.deref(), payload.uid)?;
if result.first().is_some() {
let command = Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&[
"--rm",
"--name=memri-importers_1",
&format!("--env=RUN_UID={}", payload.uid),
"--volume=download-volume:/usr/src/importers/data",
])
.args(&["memri-importers:latest"])
.spawn();
match command {
Ok(_child) => Ok(()),
Err(err) => Err(Error {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Failed to run importer with uid {}, {}", payload.uid, err),
}),
}
} else {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to get item {}", payload.uid),
})
}
Ok(())
}
pub fn run_indexers(conn: &Connection, payload: RunIndexer) -> Result<()> {
let uid = payload.uid;
info!("Trying to run indexer on item {}", uid);
let result = internal_api::get_item(conn.deref(), uid)?;
match result.first() {
Some(_item) => {
Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&[
"--rm",
"--name=memri-indexers_1",
&format!("--env=RUN_UID={}", uid),
])
.args(&["memri-indexers:latest"])
.spawn()
.expect("Failed to run indexer");
Ok(())
}
None => Err(Error {
info!("Trying to run indexer on item {}", payload.uid);
let result = internal_api::get_item(conn.deref(), payload.uid)?;
if result.first().is_some() {
Command::new("docker")
.arg("run")
.args(&docker_arguments())
.arg(&format!(
"--env=POD_SERVICE_PAYLOAD={}",
payload.service_payload
))
.args(&[
"--rm",
"--name=memri-indexers_1",
&format!("--env=RUN_UID={}", payload.uid),
])
.args(&["memri-indexers:latest"])
.spawn()
.expect("Failed to run indexer");
Ok(())
} else {
Err(Error {
code: StatusCode::BAD_REQUEST,
msg: format!("Failed to get item {}", uid),
}),
msg: format!("Failed to get item {}", payload.uid),
})
}
}
......
......@@ -146,7 +146,7 @@ pub async fn run_server() {
});
let init_db = initialized_databases_arc.clone();
let run_downloaders = services_api
let run_downloader = services_api
// //! In fact, any type that implements `FromStr` can be used, in any order:
// ~/.cargo/registry.cache/src/github.com-1ecc6299db9ec823/warp-0.2.4/src/filters/path.rs:45
.and(warp::path!(String / "run_downloader"))
......@@ -159,7 +159,7 @@ pub async fn run_server() {
});
let init_db = initialized_databases_arc.clone();
let run_importers = services_api
let run_importer = services_api
.and(warp::path!(String / "run_importer"))
.and(warp::path::end())
.and(warp::body::json())
......@@ -169,7 +169,7 @@ pub async fn run_server() {
});
let init_db = initialized_databases_arc.clone();
let run_indexers = services_api
let run_indexer = services_api
.and(warp::path!(String / "run_indexer"))
.and(warp::path::end())
.and(warp::body::json())
......@@ -236,9 +236,9 @@ pub async fn run_server() {
.or(delete_item.with(&headers))
.or(search.with(&headers))
.or(get_items_with_edges.with(&headers))
.or(run_downloaders.with(&headers))
.or(run_importers.with(&headers))
.or(run_indexers.with(&headers))
.or(run_downloader.with(&headers))
.or(run_importer.with(&headers))
.or(run_indexer.with(&headers))
.or(upload_file.with(&headers))
.or(get_file.with(&headers))
.or(origin_request);
......
......@@ -124,7 +124,7 @@ pub fn run_downloader(
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?; // Check DB access
services_api::run_downloader(body.payload)
services_api::run_downloader(&conn, body.payload)
}
pub fn run_importer(
......@@ -134,7 +134,7 @@ pub fn run_importer(
) -> Result<()> {
let conn: Connection = check_owner_and_initialize_db(&owner, &init_db, &body.database_key)?;
conn.execute_batch("SELECT 1 FROM items;")?; // Check DB access
services_api::run_importer(body.payload)
services_api::run_importer(&conn, body.payload)
}
pub fn run_indexer(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment