Commit d4a46191 authored by Chaitanya Pandit's avatar Chaitanya Pandit Committed by Azat Alimov
Browse files

Support Edges during sync

parent b88a5820
......@@ -350,7 +350,7 @@ struct CVUAction_AddItem: CVUAction {
// todomigrate: Make sure this passes
guard let target = template.item(key),
let targetRowId = target.rowId else { return nil }
return EdgeRecord(id: itemRowId, source: itemRowId, type: key, target: targetRowId)
return try? EdgeRecord.createFor(source: itemRowId, type: key, target: targetRowId)
}
}
try? db.write { (db) in
......@@ -577,15 +577,14 @@ struct CVUAction_Link: CVUAction {
for currentEdge in subjectItem.edges(edgeType) {
if currentEdge.name == edgeType {
guard let result = try? currentEdge.delete(), result == true else {
print("ERROR CVUAction_Unlink: item: \(subjectItem.type) with id: \(subjectItem.id) edge id: \(currentEdge.id)")
print("ERROR CVUAction_Unlink: item: \(subjectItem.type) with id: \(subjectItem.id) edge item: \(currentEdge.item)")
return
}
}
}
}
let edge = EdgeRecord(id: subjectItemRowId, source: subjectItemRowId, type: edgeType, target: currentItemRowId)
try? edge.save()
let _ = try? EdgeRecord.createFor(source: subjectItemRowId, type: edgeType, target: currentItemRowId)
sceneController.scheduleUIUpdate()
}
......@@ -621,7 +620,7 @@ struct CVUAction_Unlink: CVUAction {
}
guard let result = try? edgeToUnlink.delete(), result == true else {
print("ERROR CVUAction_Unlink: item: \(subjectItem.type) with id: \(subjectItem.id) edge id: \(edgeToUnlink.id)")
print("ERROR CVUAction_Unlink: item: \(subjectItem.type) with id: \(subjectItem.id) edge item: \(edgeToUnlink.item)")
return
}
......
......@@ -143,7 +143,7 @@ struct AddressBookController {
&& EdgeRecord.Columns.target == existingItemRowId
&& EdgeRecord.Columns.name == "hasPhoneNumber").fetchOne(db)
if (existingEdge == nil) {
try? EdgeRecord(id: personRowId, source: personRowId, type: "hasPhoneNumber", target: existingItemRowId).save(db: db)
let _ = try? EdgeRecord.createFor(source: personRowId, type: "hasPhoneNumber", target: existingItemRowId, db: db)
}
}
}
......
......@@ -134,7 +134,7 @@ class DatabaseController {
// Edges
try db.create(table: "edges") { t in
t.column("id", .integer).references("items", onDelete: .cascade)
t.column("item", .integer).primaryKey().references("items", onDelete: .cascade)
t.column("source", .integer).notNull().references("items", onDelete: .cascade)
t.column("name", .text).notNull()
t.column("target", .integer).notNull().references("items")
......
......@@ -171,8 +171,7 @@ enum DemoData {
guard let targetRowId = rowIdLookup[edge.tempTarget] else {
continue
}
let record = EdgeRecord(id: itemRowId, source: itemRowId, type: edge.type, target:targetRowId)
try record.insert(db)
let _ = try EdgeRecord.createFor(source: itemRowId, type: edge.type, target:targetRowId, db: db)
}
}
}
......
......@@ -12,6 +12,40 @@ struct SyncUploadEdge {
extension EdgeRecord {
static func fromSyncEdgeDict(item: Int64, fromSyncItemDict dict: [String: AnyDecodable], schema: Schema, db: Database) throws -> EdgeRecord? {
do {
guard let edgeName = dict["_edge"]?.value as? String,
let sourceItemId = dict["_source"]?.value as? String,
let targetItemId = dict["_target"]?.value as? String else {
return nil
}
guard let sourceItem = try ItemRecord.fetchWithID(StringUUID(uid: sourceItemId), db: db),
let sourceItemRowId = sourceItem.rowId,
let targetItem = try ItemRecord.fetchWithID(StringUUID(uid: targetItemId), db: db),
let targetItemRowId = targetItem.rowId else {
print("ERROR could not find source or target item for edge: ", dict)
return nil
}
var existingEdge = try sourceItem.request(for: ItemRecord.edges).filter(EdgeRecord.Columns.name == edgeName
&& EdgeRecord.Columns.name == edgeName
&& EdgeRecord.Columns.source == sourceItemRowId
&& EdgeRecord.Columns.target == targetItemRowId).fetchOne(db)
if (existingEdge == nil) {
let edge = try EdgeRecord.createFor(item: item, source: sourceItemRowId, type: edgeName, target: targetItemRowId, syncState: .noChanges)
existingEdge = edge
}
return existingEdge
} catch let error {
print("ERROR creating edge from sync: ", error)
}
return nil
}
static func syncEdgesWithState(_ state: SyncState, maxItems: Int = 100) throws -> [[String: AnyEncodable]] {
guard let pool = AppController.shared.databaseController.databasePool else {
throw StringError(description: "Database pool not available")
......@@ -62,14 +96,13 @@ extension EdgeRecord {
let sourceItemRowId = sourceItem.rowId,
let targetItem = try ItemRecord.filter(ItemRecord.Columns.id == targetItemId).fetchOne(db),
let targetItemRowId = targetItem.rowId,
let edgeRecord = try EdgeRecord.filter(EdgeRecord.Columns.id == sourceItemRowId
&& EdgeRecord.Columns.source == sourceItemRowId
let edgeRecord = try EdgeRecord.filter(EdgeRecord.Columns.source == sourceItemRowId
&& EdgeRecord.Columns.target == targetItemRowId
&& EdgeRecord.Columns.name == name).fetchOne(db) {
try EdgeRecord
.filter(Column(EdgeRecord.Columns.source.rawValue) == edgeRecord.source
&& Column(EdgeRecord.Columns.target.rawValue) == edgeRecord.target
&& Column(EdgeRecord.Columns.id.rawValue) == edgeRecord.id
&& Column(EdgeRecord.Columns.item.rawValue) == edgeRecord.item
&& Column(EdgeRecord.Columns.name.rawValue) == edgeRecord.name
)
.updateAll(db, [Column(EdgeRecord.Columns.syncState.rawValue).set(to: SyncState.noChanges.rawValue)])
......
......@@ -11,7 +11,7 @@ import GRDB
class EdgeRecord: BaseRecord {
static override var databaseTableName: String { "edges" }
let id: Int64
let item: Int64
let source: Int64
let name: String
let target: Int64
......@@ -22,20 +22,9 @@ class EdgeRecord: BaseRecord {
static let itemForeignKey = ForeignKey(["itemId"])
enum Columns: String, ColumnExpression {
case id, source, name, target, syncState, syncHasPriority
case item, source, name, target, syncState, syncHasPriority
}
static func fetchWithID(_ id: Int64, db: DatabaseController = AppController.shared.databaseController) -> EdgeRecord? {
do {
return try db.read { db in
try EdgeRecord.fetchOne(db, key: id)
}
} catch let error {
print("ERROR EdgeRecord: fetchWithID: \(id) error: \(error)")
return nil
}
}
static var ownerForeignKey = ForeignKey([Columns.source], to: [ItemRecord.Columns.rowId])
static var owningItem = hasOne(ItemRecord.self, using: ownerForeignKey)
......@@ -50,8 +39,42 @@ class EdgeRecord: BaseRecord {
try? ItemRecord.filter(key: target).fetchOne(db)
}
internal init(id: Int64, source: Int64, type: String, target: Int64, syncState: SyncState = .create, syncHasPriority: Bool = false) {
self.id = id
static func createFor(item: Int64? = nil, source: Int64, type: String, target: Int64, syncState: SyncState = .create, syncHasPriority: Bool = false, db: Database) throws -> EdgeRecord {
guard let existingEdge = try EdgeRecord.filter(EdgeRecord.Columns.source == source
&& EdgeRecord.Columns.target == target
&& EdgeRecord.Columns.name == type).fetchOne(db) else {
var itemRowId = item
if (item == nil) {
let edgeItem = ItemRecord(type: "Edge")
edgeItem.syncState = .create
try edgeItem.save(db: db)
itemRowId = edgeItem.rowId
}
guard let rowId = itemRowId else {
print("Error saving edge item")
throw StringError(description: "Error saving edge item")
}
let edge = EdgeRecord(item: rowId, source: source, type: type, target: target, syncState: syncState, syncHasPriority: syncHasPriority)
try edge.save()
return edge
}
return existingEdge
}
static func createFor(item: Int64? = nil, source: Int64, type: String, target: Int64, syncState: SyncState = .create, syncHasPriority: Bool = false, db: Database? = nil) throws -> EdgeRecord {
try AppController.shared.databaseController.writeSync { db in
return try createFor(source: source, type: type, target: target, db: db)
}
}
internal init(item: Int64, source: Int64, type: String, target: Int64, syncState: SyncState = .create, syncHasPriority: Bool = false) {
self.item = item
self.source = source
self.name = type
self.target = target
......@@ -62,7 +85,7 @@ class EdgeRecord: BaseRecord {
}
required init(row: Row) {
self.id = row[Columns.id]
self.item = row[Columns.item]
source = row[Columns.source]
name = row[Columns.name]
target = row[Columns.target]
......@@ -73,7 +96,7 @@ class EdgeRecord: BaseRecord {
}
override func encode(to container: inout PersistenceContainer) {
container[Columns.id] = id
container[Columns.item] = item
container[Columns.source] = source
container[Columns.name] = name
container[Columns.target] = target
......
......@@ -81,10 +81,10 @@ extension ItemRecord {
throw StringError(description: "Error saving keys, invalid rowIds")
}
try EdgeRecord(id: privateKeyItemRowId, source: privateKeyItemRowId, type: "owner", target: meRowId).save()
try EdgeRecord(id: publicKeyItemRowId, source: publicKeyItemRowId, type: "owner", target: meRowId).save()
try EdgeRecord(id: privateKeyItemRowId, source: privateKeyItemRowId, type: "publicKey", target: publicKeyItemRowId).save()
try EdgeRecord(id: publicKeyItemRowId, source: publicKeyItemRowId, type: "privateKey", target: privateKeyItemRowId).save()
try _ = EdgeRecord.createFor(source: privateKeyItemRowId, type: "owner", target: meRowId)
try _ = EdgeRecord.createFor(source: publicKeyItemRowId, type: "owner", target: meRowId)
try _ = EdgeRecord.createFor(source: privateKeyItemRowId, type: "publicKey", target: publicKeyItemRowId)
try _ = EdgeRecord.createFor(source: publicKeyItemRowId, type: "privateKey", target: privateKeyItemRowId)
} catch {
print("ERROR: setOwnerAndDBKey", error)
throw StringError(description: "Error deleting existing db keys")
......
......@@ -109,13 +109,16 @@ extension ItemRecord {
func setPropertyValue(
name: String,
value: PropertyDatabaseValue?,
addLog: Bool = true,
db dbController: DatabaseController = AppController.shared.databaseController
) throws {
try dbController.writeSync { (db) in
try setPropertyValue(name: name, value: value, db: db)
self.syncState = .update
try self.update(db)
try addChangeLog(name: name, value: value, db: dbController)
if (addLog) {
try addChangeLog(name: name, value: value, db: dbController)
}
}
}
......@@ -164,6 +167,6 @@ extension ItemRecord {
try IntegerRecord.setPropertyValue(name: "date", value: DatabaseHelperFunctions.encode(Date()), item: auditItem, db: dbController)
try StringRecord.setPropertyValue(name: "content", value: name, item: auditItem, db: dbController)
try StringRecord.setPropertyValue(name: "actionName", value: "edit", item: auditItem, db: dbController)
try EdgeRecord(id: rowId, source: rowId, type: "changelog", target: auditItemRowId).save()
try EdgeRecord.createFor(source: rowId, type: "changelog", target: auditItemRowId)
}
}
......@@ -100,10 +100,17 @@ extension ItemRecord {
try item?.save(db: db)
}
guard let createdItem = item else {
guard let createdItem = item,
let itemRowId = createdItem.rowId else {
return nil
}
if let dateServerModified = dict["dateServerModified"]?.value as? Int {
let date = DatabaseHelperFunctions.decode(from: dateServerModified)
createdItem.dateServerModified = date
try createdItem.save(db: db)
}
try dict.forEach { (propertyName, decodableValue) in
let unescapedPropertyName = unEscapeReservedKeywordsInPod(propertyName)
......@@ -124,6 +131,9 @@ extension ItemRecord {
try createdItem.setPropertyValue(name: unescapedPropertyName, value: databaseValue)
}
// Check if it has edge and update
let _ = try EdgeRecord.fromSyncEdgeDict(item: itemRowId, fromSyncItemDict: dict, schema: schema, db: db)
return createdItem
}
......@@ -239,7 +249,6 @@ extension ItemRecord {
/// Select the items to sync, giving priority to those marked as `syncHasPriority`
return try (pool.read { db in
try? ItemRecord
.filter(Column(ItemRecord.Columns.syncState.rawValue) == SyncState.noChanges)
.order(ItemRecord.Columns.dateServerModified.desc)
.fetchOne(db)
})
......
......@@ -300,7 +300,8 @@ extension ItemRecord {
return nil
}
try EdgeRecord(id: meRowId, source: meRowId, type: "me", target: meRowId).save()
try EdgeRecord.createFor(source: meRowId, type: "me", target: meRowId)
return myself
}
}
......@@ -12,11 +12,11 @@ import Combine
enum SyncControllerState: String, Codable, Hashable, DatabaseValueConvertible {
case idle
case started
case downloadedItems
case uploadedSchemaProperties
case uploadedSchemaEdges
case uploadedItems
case uploadedEdges
case downloadedItems
case done
case failed
}
......@@ -70,13 +70,42 @@ class SyncController {
self.state = .started
}
func finishSync() {
self.state = .done
self.syncing = false
self.completion?(self.lastError)
self.state = .idle
self.completion = nil
private func downloadItems() throws {
let dateServerModified = (try? ItemRecord.lastSyncedItem())?.dateServerModified ?? Date.distantPast
let timestamp = DatabaseHelperFunctions.encode(dateServerModified)
try searchAction(payload: timestamp > 0 ? ["dateServerModified>=": AnyEncodable.init(integerLiteral: timestamp)] : [:], completion: { (data, error) in
guard error == nil,
let responseData = data,
let responseObjects = try? JSONSerialization.jsonObject(with: responseData) as? [[String: Any]] else {
self.lastError = error
self.state = .failed
return
}
let responseItems = responseObjects.filter{$0["_source"] == nil || $0["_target"] == nil || $0["_edge"] == nil}
let responseEdges = responseObjects.filter{$0["_source"] != nil && $0["_target"] != nil && $0["_edge"] != nil}
try? AppController.shared.databaseController.writeSync {(db) in
let schema = AppController.shared.databaseController.schema
// We need to insert items first since edges need items to exist in db
_ = responseItems.compactMap{object -> [String : AnyDecodable]? in
return object.compactMapValues{AnyDecodable($0)}
}.compactMap { object -> ItemRecord? in
return try? ItemRecord.fromSyncItemDict(fromSyncItemDict: object, schema: schema, db: db)
}
// Edges should be inserted only after all items are created
// because currently there are scenarios where edges appear before items in server response
_ = responseEdges.compactMap{object -> [String : AnyDecodable]? in
return object.compactMapValues{AnyDecodable($0)}
}.compactMap { object -> ItemRecord? in
return try? ItemRecord.fromSyncItemDict(fromSyncItemDict: object, schema: schema, db: db)
}
}
self.state = .downloadedItems
})
}
func uploadSchemaProperties() throws {
......@@ -125,7 +154,7 @@ class SyncController {
})
}
func uploadItems() throws {
private func uploadItems() throws {
let syncPayload = try makeSyncUploadData()
guard syncPayload.createItems.count > 0
......@@ -149,7 +178,7 @@ class SyncController {
})
}
func uploadEdges(maxItems: Int = 100) throws {
private func uploadEdges(maxItems: Int = 100) throws {
let syncPayload = try makeSyncEdgesData(maxItems: maxItems)
guard syncPayload.createEdges.count > 0 else {
......@@ -171,29 +200,13 @@ class SyncController {
})
}
func downloadItems() throws {
let dateServerModified = (try? ItemRecord.lastSyncedItem())?.dateServerModified ?? Date.distantPast
try searchAction(dateServerModified: dateServerModified, completion: { (data, error) in
guard error == nil,
let responseData = data,
let responseObjects = try? JSONSerialization.jsonObject(with: responseData) as? [[String: Any]] else {
self.lastError = error
self.state = .failed
return
}
try? AppController.shared.databaseController.writeSync {(db) in
let schema = AppController.shared.databaseController.schema
_ = responseObjects.compactMap{object -> [String : AnyDecodable]? in
return object.compactMapValues{AnyDecodable($0)}
}.compactMap { object -> ItemRecord? in
return try? ItemRecord.fromSyncItemDict(fromSyncItemDict: object, schema: schema, db: db)
}
}
self.state = .downloadedItems
})
private func finishSync() {
self.state = .done
self.syncing = false
self.completion?(self.lastError)
self.state = .idle
self.completion = nil
}
func makeSyncSchemaPropertiesData() throws -> PodAPIPayload.BulkAction {
......@@ -238,7 +251,7 @@ class SyncController {
}
}
func makeSyncUploadData(maxItems: Int = 100) throws -> PodAPIPayload.BulkAction {
private func makeSyncUploadData(maxItems: Int = 100) throws -> PodAPIPayload.BulkAction {
let createItems = try ItemRecord.syncItemsWithState(.create, maxItems: maxItems)
let updateItems = try ItemRecord.syncItemsWithState(.update, maxItems: maxItems)
......@@ -251,7 +264,7 @@ class SyncController {
return bulkAction
}
func makeSyncEdgesData(maxItems: Int = 100) throws -> PodAPIPayload.BulkAction {
private func makeSyncEdgesData(maxItems: Int = 100) throws -> PodAPIPayload.BulkAction {
let createEdges = try EdgeRecord.syncEdgesWithState(.create, maxItems: maxItems)
let bulkAction = PodAPIPayload.BulkAction(
......@@ -263,7 +276,7 @@ class SyncController {
return bulkAction
}
func bulkAction(bulkPayload: PodAPIPayload.BulkAction, completion: ((Error?) -> Void)?) throws {
private func bulkAction(bulkPayload: PodAPIPayload.BulkAction, completion: ((Error?) -> Void)?) throws {
guard let connectionConfig = AppController.shared.podConnectionConfig
else { throw StringError(description: "No pod connection config") }
......@@ -278,12 +291,11 @@ class SyncController {
} .store(in: &subscriptions)
}
func searchAction(dateServerModified: Date, completion: ((Data?, Error?) -> Void)?) throws {
private func searchAction(payload: [String: AnyEncodable], completion: ((Data?, Error?) -> Void)?) throws {
guard let connectionConfig = AppController.shared.podConnectionConfig
else { throw StringError(description: "No pod connection config") }
let timestamp = DatabaseHelperFunctions.encode(dateServerModified)
let request = PodAPIStandardRequest.searchAction(payload: ["dateServerModified>=": AnyEncodable.init(integerLiteral: timestamp)])
let request = PodAPIStandardRequest.searchAction(payload: payload)
let networkCall = try request.execute(connectionConfig: connectionConfig)
networkCall.sink { (result) in
......
......@@ -117,11 +117,8 @@ class MapHelper {
// todomigrate: Make sure this passes
if let addressRowId = address.rowId,
let locationRowId = locationItem.rowId {
let locationEdge = EdgeRecord(id: addressRowId, source: addressRowId, type: "location", target: locationRowId)
try? AppController.shared.databaseController.write { db in
try locationEdge.save(db)
onCompletion()
}
let _ = try? EdgeRecord.createFor(source: addressRowId, type: "location", target: locationRowId)
onCompletion()
} else {
onCompletion()
}
......
......@@ -74,8 +74,7 @@ struct LabelAnnotationRendererView: View {
return
}
let edge = EdgeRecord(id: annotationItemRowId, source: annotationItemRowId, type: "annotatedItem", target: currentItemRowId)
try? edge.save()
let _ = try? EdgeRecord.createFor(source: annotationItemRowId, type: "annotatedItem", target: currentItemRowId)
moveToNextItem()
}
......
......@@ -115,12 +115,11 @@ extension NoteEditorRendererView: MemriTextEditorImageSelectionHandler {
return
}
let edge = EdgeRecord(id: noteRowId, source: noteRowId, type: "file", target: fileRowId)
let _ = try EdgeRecord.createFor(source: noteRowId, type: "file", target: fileRowId)
let filenameProperty = StringRecord(item: fileRowId, name: "filename", value: filename)
try viewContext.databaseController.writeSync { db in
try filenameProperty.save()
try edge.save()
}
} catch {
print(error)
......
......@@ -65,8 +65,8 @@ class MemriPodAPITests: XCTestCase {
}()
let testEdge: [String: AnyEncodable] = {
let testEdge = EdgeRecord(id: noteRowId, source: noteRowId, type: "file", target: fileRowId)
return testEdge._syncDict()
let testEdge = try? EdgeRecord.createFor(source: noteRowId, type: "file", target: fileRowId)
return testEdge?._syncDict()
}()
let bulkAction = PodAPIPayload.BulkAction(
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment