diff --git a/.sqlx/query-081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6.json b/.sqlx/query-081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6.json index 8f1a96994..94b77425c 100644 --- a/.sqlx/query-081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6.json +++ b/.sqlx/query-081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6.json @@ -12,7 +12,9 @@ "parameters": { "Right": 1 }, - "nullable": [false] + "nullable": [ + false + ] }, "hash": "081c729a0f1ad6e4ff3e13d6702c946bc4d37d50f40670b4f51d2efcce595aa6" } diff --git a/.sqlx/query-0d465a17ebbb5761421def759c73cad023c30705d5b41a1399ef79d8d2571d7c.json b/.sqlx/query-0d465a17ebbb5761421def759c73cad023c30705d5b41a1399ef79d8d2571d7c.json deleted file mode 100644 index e746a4c0e..000000000 --- a/.sqlx/query-0d465a17ebbb5761421def759c73cad023c30705d5b41a1399ef79d8d2571d7c.json +++ /dev/null @@ -1,18 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT min(entered_at) as start_date\n FROM swap_states\n WHERE swap_id = ?\n ", - "describe": { - "columns": [ - { - "name": "start_date", - "ordinal": 0, - "type_info": "Text" - } - ], - "parameters": { - "Right": 1 - }, - "nullable": [true] - }, - "hash": "0d465a17ebbb5761421def759c73cad023c30705d5b41a1399ef79d8d2571d7c" -} diff --git a/.sqlx/query-1f332be08a5426f3fbcadea4e755d82ff1cdc2690eb464ccc607d3a613fa76a1.json b/.sqlx/query-1f332be08a5426f3fbcadea4e755d82ff1cdc2690eb464ccc607d3a613fa76a1.json index 52e81ff83..4c1f9abe8 100644 --- a/.sqlx/query-1f332be08a5426f3fbcadea4e755d82ff1cdc2690eb464ccc607d3a613fa76a1.json +++ b/.sqlx/query-1f332be08a5426f3fbcadea4e755d82ff1cdc2690eb464ccc607d3a613fa76a1.json @@ -12,7 +12,9 @@ "parameters": { "Right": 0 }, - "nullable": [true] + "nullable": [ + true + ] }, "hash": "1f332be08a5426f3fbcadea4e755d82ff1cdc2690eb464ccc607d3a613fa76a1" } diff --git a/.sqlx/query-3a286c86edc7fd3579fdfca41e385aa91e334f76f1d47be8ec9e87b39e37458b.json b/.sqlx/query-3a286c86edc7fd3579fdfca41e385aa91e334f76f1d47be8ec9e87b39e37458b.json new file mode 100644 index 000000000..469ea79d6 --- /dev/null +++ b/.sqlx/query-3a286c86edc7fd3579fdfca41e385aa91e334f76f1d47be8ec9e87b39e37458b.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT swap_id, address, percentage, label\n FROM monero_addresses\n ORDER BY swap_id\n ", + "describe": { + "columns": [ + { + "name": "swap_id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "address", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "percentage", + "ordinal": 2, + "type_info": "Float" + }, + { + "name": "label", + "ordinal": 3, + "type_info": "Text" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + true, + false, + false + ] + }, + "hash": "3a286c86edc7fd3579fdfca41e385aa91e334f76f1d47be8ec9e87b39e37458b" +} diff --git a/.sqlx/query-494867a1323557716de6399e6b936598479cb46f8807a7378064cb6d05712494.json b/.sqlx/query-494867a1323557716de6399e6b936598479cb46f8807a7378064cb6d05712494.json new file mode 100644 index 000000000..1a4a86077 --- /dev/null +++ b/.sqlx/query-494867a1323557716de6399e6b936598479cb46f8807a7378064cb6d05712494.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT change\n FROM change\n WHERE peer_id = ?\n ORDER BY id DESC\n ", + "describe": { + "columns": [ + { + "name": "change", + "ordinal": 0, + "type_info": "Blob" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false + ] + }, + "hash": "494867a1323557716de6399e6b936598479cb46f8807a7378064cb6d05712494" +} diff --git a/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json b/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json index c35d06f74..8a964df56 100644 --- a/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json +++ b/.sqlx/query-4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab.json @@ -22,7 +22,11 @@ "parameters": { "Right": 2 }, - "nullable": [false, false, false] + "nullable": [ + false, + false, + false + ] }, "hash": "4ce7c42906ba69e0c8e1c0dad952956edd582a0edecd45710e22dcb28785eeab" } diff --git a/.sqlx/query-4f7358fe6db39990bab54a240066ed13ba4c410229f42ab52f762335f8c7eaf2.json b/.sqlx/query-4f7358fe6db39990bab54a240066ed13ba4c410229f42ab52f762335f8c7eaf2.json new file mode 100644 index 000000000..3e6683603 --- /dev/null +++ b/.sqlx/query-4f7358fe6db39990bab54a240066ed13ba4c410229f42ab52f762335f8c7eaf2.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT INTO swap_states (\n swap_id,\n entered_at,\n state,\n hlc_counter\n ) VALUES (?, ?, ?, ?);\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "4f7358fe6db39990bab54a240066ed13ba4c410229f42ab52f762335f8c7eaf2" +} diff --git a/.sqlx/query-5cc61dd0315571bc198401a354cd9431ee68360941f341386cbacf44ea598de8.json b/.sqlx/query-5cc61dd0315571bc198401a354cd9431ee68360941f341386cbacf44ea598de8.json deleted file mode 100644 index 7fc74b316..000000000 --- a/.sqlx/query-5cc61dd0315571bc198401a354cd9431ee68360941f341386cbacf44ea598de8.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n SELECT swap_id, state\n FROM (\n SELECT max(id), swap_id, state\n FROM swap_states\n GROUP BY swap_id\n )\n ", - "describe": { - "columns": [ - { - "name": "swap_id", - "ordinal": 0, - "type_info": "Text" - }, - { - "name": "state", - "ordinal": 1, - "type_info": "Text" - } - ], - "parameters": { - "Right": 0 - }, - "nullable": [true, true] - }, - "hash": "5cc61dd0315571bc198401a354cd9431ee68360941f341386cbacf44ea598de8" -} diff --git a/.sqlx/query-6130b6cdd184181f890964eb460741f5cf23b5237fb676faed009106627a4ca6.json b/.sqlx/query-6130b6cdd184181f890964eb460741f5cf23b5237fb676faed009106627a4ca6.json index a3d9d364d..2dfabcfb6 100644 --- a/.sqlx/query-6130b6cdd184181f890964eb460741f5cf23b5237fb676faed009106627a4ca6.json +++ b/.sqlx/query-6130b6cdd184181f890964eb460741f5cf23b5237fb676faed009106627a4ca6.json @@ -17,7 +17,10 @@ "parameters": { "Right": 0 }, - "nullable": [false, false] + "nullable": [ + false, + false + ] }, "hash": "6130b6cdd184181f890964eb460741f5cf23b5237fb676faed009106627a4ca6" } diff --git a/.sqlx/query-70e12cb1a4bebc4cc2a274d8f954ff2e0fb33525f7b17597eb853367aa1f4c84.json b/.sqlx/query-70e12cb1a4bebc4cc2a274d8f954ff2e0fb33525f7b17597eb853367aa1f4c84.json new file mode 100644 index 000000000..9f75aa403 --- /dev/null +++ b/.sqlx/query-70e12cb1a4bebc4cc2a274d8f954ff2e0fb33525f7b17597eb853367aa1f4c84.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT OR IGNORE INTO swap_states (\n swap_id,\n entered_at,\n state,\n hlc_counter\n ) VALUES (?, ?, ?, ?);\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "70e12cb1a4bebc4cc2a274d8f954ff2e0fb33525f7b17597eb853367aa1f4c84" +} diff --git a/.sqlx/query-738b7c4e86b9910320c90a727feabdeeefd2716f0bc66e75182674e4627018c8.json b/.sqlx/query-738b7c4e86b9910320c90a727feabdeeefd2716f0bc66e75182674e4627018c8.json new file mode 100644 index 000000000..be927bbb3 --- /dev/null +++ b/.sqlx/query-738b7c4e86b9910320c90a727feabdeeefd2716f0bc66e75182674e4627018c8.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT min(entered_at) as \"start_date: i64\"\n FROM swap_states\n WHERE swap_id = ?\n ", + "describe": { + "columns": [ + { + "name": "start_date: i64", + "ordinal": 0, + "type_info": "Null" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + true + ] + }, + "hash": "738b7c4e86b9910320c90a727feabdeeefd2716f0bc66e75182674e4627018c8" +} diff --git a/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json b/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json index 7e94ecbb1..ea77f24f2 100644 --- a/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json +++ b/.sqlx/query-7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77.json @@ -22,7 +22,11 @@ "parameters": { "Right": 1 }, - "nullable": [false, true, true] + "nullable": [ + false, + true, + true + ] }, "hash": "7bc8d637e7cf020bff58d05109ae88e56672e95e0e638af99e82df5b00595e77" } diff --git a/.sqlx/query-7c37de52b3bb2ccd0868ccb861127416848d85eaebe8245c58d5beac7d537087.json b/.sqlx/query-7c37de52b3bb2ccd0868ccb861127416848d85eaebe8245c58d5beac7d537087.json deleted file mode 100644 index e4073119f..000000000 --- a/.sqlx/query-7c37de52b3bb2ccd0868ccb861127416848d85eaebe8245c58d5beac7d537087.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "db_name": "SQLite", - "query": "\n insert into monero_addresses (\n swap_id,\n address,\n percentage,\n label\n ) values (?, ?, ?, ?);\n ", - "describe": { - "columns": [], - "parameters": { - "Right": 4 - }, - "nullable": [] - }, - "hash": "7c37de52b3bb2ccd0868ccb861127416848d85eaebe8245c58d5beac7d537087" -} diff --git a/.sqlx/query-7e58428584d28a238ab37a83662b88afcef6fc5246f11c85a35869f79da61c34.json b/.sqlx/query-7e58428584d28a238ab37a83662b88afcef6fc5246f11c85a35869f79da61c34.json index 6be7d85eb..d9f7da1d8 100644 --- a/.sqlx/query-7e58428584d28a238ab37a83662b88afcef6fc5246f11c85a35869f79da61c34.json +++ b/.sqlx/query-7e58428584d28a238ab37a83662b88afcef6fc5246f11c85a35869f79da61c34.json @@ -17,7 +17,10 @@ "parameters": { "Right": 1 }, - "nullable": [false, false] + "nullable": [ + false, + false + ] }, "hash": "7e58428584d28a238ab37a83662b88afcef6fc5246f11c85a35869f79da61c34" } diff --git a/.sqlx/query-888b6068a36501c66d1f9ff0cfe93ad93c1d30ffb6963009e6e7923dcabf571c.json b/.sqlx/query-888b6068a36501c66d1f9ff0cfe93ad93c1d30ffb6963009e6e7923dcabf571c.json new file mode 100644 index 000000000..61905e7ac --- /dev/null +++ b/.sqlx/query-888b6068a36501c66d1f9ff0cfe93ad93c1d30ffb6963009e6e7923dcabf571c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert or ignore into monero_addresses (\n swap_id,\n address,\n percentage,\n label\n ) values (?, ?, ?, ?);\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "888b6068a36501c66d1f9ff0cfe93ad93c1d30ffb6963009e6e7923dcabf571c" +} diff --git a/.sqlx/query-88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf.json b/.sqlx/query-88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf.json index d13a0de08..b7429dcdd 100644 --- a/.sqlx/query-88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf.json +++ b/.sqlx/query-88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf.json @@ -12,7 +12,9 @@ "parameters": { "Right": 1 }, - "nullable": [false] + "nullable": [ + false + ] }, "hash": "88f761a4f7a0429cad1df0b1bebb1c0a27b2a45656549b23076d7542cfa21ecf" } diff --git a/.sqlx/query-b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0.json b/.sqlx/query-89d69d65db6f42b60f38a42e4a75d5be61125d6710b2c4e677673b291d4f8d41.json similarity index 57% rename from .sqlx/query-b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0.json rename to .sqlx/query-89d69d65db6f42b60f38a42e4a75d5be61125d6710b2c4e677673b291d4f8d41.json index 79ade14e0..1eb979d0a 100644 --- a/.sqlx/query-b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0.json +++ b/.sqlx/query-89d69d65db6f42b60f38a42e4a75d5be61125d6710b2c4e677673b291d4f8d41.json @@ -1,12 +1,12 @@ { "db_name": "SQLite", - "query": "\n insert into swap_states (\n swap_id,\n entered_at,\n state\n ) values (?, ?, ?);\n ", + "query": "\n insert into swap_states (\n swap_id,\n entered_at,\n state,\n hlc_counter\n ) values (?, ?, ?, ?);\n ", "describe": { "columns": [], "parameters": { - "Right": 3 + "Right": 4 }, "nullable": [] }, - "hash": "b703032b4ddc627a1124817477e7a8e5014bdc694c36a14053ef3bb2fc0c69b0" + "hash": "89d69d65db6f42b60f38a42e4a75d5be61125d6710b2c4e677673b291d4f8d41" } diff --git a/.sqlx/query-8df497a0bedfad6330e819626af499bc681c8be50e9bf6574dbadfedca115c9f.json b/.sqlx/query-8df497a0bedfad6330e819626af499bc681c8be50e9bf6574dbadfedca115c9f.json new file mode 100644 index 000000000..c9acb255b --- /dev/null +++ b/.sqlx/query-8df497a0bedfad6330e819626af499bc681c8be50e9bf6574dbadfedca115c9f.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT swap_id, state, entered_at as \"entered_at: i64\"\n FROM (\n SELECT max(id), swap_id, state, entered_at\n FROM swap_states\n GROUP BY swap_id\n )\n ", + "describe": { + "columns": [ + { + "name": "swap_id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "state", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "entered_at: i64", + "ordinal": 2, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + true, + true, + true + ] + }, + "hash": "8df497a0bedfad6330e819626af499bc681c8be50e9bf6574dbadfedca115c9f" +} diff --git a/.sqlx/query-ab71b2a112e8df38ccd07767bfb2526690f707d1ab899f7c562f067c66bb1676.json b/.sqlx/query-ab71b2a112e8df38ccd07767bfb2526690f707d1ab899f7c562f067c66bb1676.json new file mode 100644 index 000000000..94bb3152a --- /dev/null +++ b/.sqlx/query-ab71b2a112e8df38ccd07767bfb2526690f707d1ab899f7c562f067c66bb1676.json @@ -0,0 +1,38 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT swap_id, state, entered_at as \"entered_at: i64\", hlc_counter as \"hlc_counter: i64\"\n FROM swap_states\n ORDER BY entered_at DESC, hlc_counter DESC\n ", + "describe": { + "columns": [ + { + "name": "swap_id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "state", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "entered_at: i64", + "ordinal": 2, + "type_info": "Integer" + }, + { + "name": "hlc_counter: i64", + "ordinal": 3, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false, + false + ] + }, + "hash": "ab71b2a112e8df38ccd07767bfb2526690f707d1ab899f7c562f067c66bb1676" +} diff --git a/.sqlx/query-b8daa32597ddf3a7bf1367cd9a154022149797ace30b1bed48459a2162a6a3ea.json b/.sqlx/query-b8daa32597ddf3a7bf1367cd9a154022149797ace30b1bed48459a2162a6a3ea.json new file mode 100644 index 000000000..ca28ec94b --- /dev/null +++ b/.sqlx/query-b8daa32597ddf3a7bf1367cd9a154022149797ace30b1bed48459a2162a6a3ea.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT swap_id, state, entered_at as \"entered_at: i64\"\n FROM swap_states\n ORDER BY entered_at DESC\n ", + "describe": { + "columns": [ + { + "name": "swap_id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "state", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "entered_at: i64", + "ordinal": 2, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 0 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "b8daa32597ddf3a7bf1367cd9a154022149797ace30b1bed48459a2162a6a3ea" +} diff --git a/.sqlx/query-d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9.json b/.sqlx/query-d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9.json index b7cd990e4..f1dcd349c 100644 --- a/.sqlx/query-d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9.json +++ b/.sqlx/query-d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9.json @@ -17,7 +17,10 @@ "parameters": { "Right": 1 }, - "nullable": [true, true] + "nullable": [ + true, + true + ] }, "hash": "d32d91ca2debc4212841282533482b2ff081234c7f9f848a7223ae04234995d9" } diff --git a/.sqlx/query-d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2.json b/.sqlx/query-d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2.json index 4b61e1a59..cf0105e21 100644 --- a/.sqlx/query-d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2.json +++ b/.sqlx/query-d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2.json @@ -12,7 +12,9 @@ "parameters": { "Right": 1 }, - "nullable": [false] + "nullable": [ + false + ] }, "hash": "d78acba5eb8563826dd190e0886aa665aae3c6f1e312ee444e65df1c95afe8b2" } diff --git a/.sqlx/query-dff8b986c3dde27b8121775e48a58564fa346b038866699210a63f8a33b03f0b.json b/.sqlx/query-dff8b986c3dde27b8121775e48a58564fa346b038866699210a63f8a33b03f0b.json index 180b4d0c9..ec1acabcd 100644 --- a/.sqlx/query-dff8b986c3dde27b8121775e48a58564fa346b038866699210a63f8a33b03f0b.json +++ b/.sqlx/query-dff8b986c3dde27b8121775e48a58564fa346b038866699210a63f8a33b03f0b.json @@ -22,7 +22,11 @@ "parameters": { "Right": 1 }, - "nullable": [true, false, false] + "nullable": [ + true, + false, + false + ] }, "hash": "dff8b986c3dde27b8121775e48a58564fa346b038866699210a63f8a33b03f0b" } diff --git a/.sqlx/query-e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646.json b/.sqlx/query-e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646.json index 1835133ff..5eb6fef36 100644 --- a/.sqlx/query-e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646.json +++ b/.sqlx/query-e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646.json @@ -12,7 +12,9 @@ "parameters": { "Right": 1 }, - "nullable": [false] + "nullable": [ + false + ] }, "hash": "e05620f420f8c1022971eeb66a803323a8cf258cbebb2834e3f7cf8f812fa646" } diff --git a/.sqlx/query-e7de07925e4ce1a8dd44a9790c6645b8ed9079c8b5aa7583b59e894e5e7b3e3c.json b/.sqlx/query-e7de07925e4ce1a8dd44a9790c6645b8ed9079c8b5aa7583b59e894e5e7b3e3c.json new file mode 100644 index 000000000..eb19d21a4 --- /dev/null +++ b/.sqlx/query-e7de07925e4ce1a8dd44a9790c6645b8ed9079c8b5aa7583b59e894e5e7b3e3c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n INSERT or IGNORE INTO change (peer_id, change)\n VALUES (?, ?)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "e7de07925e4ce1a8dd44a9790c6645b8ed9079c8b5aa7583b59e894e5e7b3e3c" +} diff --git a/.sqlx/query-e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae.json b/.sqlx/query-e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae.json index 952f282c2..f375b6b68 100644 --- a/.sqlx/query-e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae.json +++ b/.sqlx/query-e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae.json @@ -12,7 +12,9 @@ "parameters": { "Right": 1 }, - "nullable": [false] + "nullable": [ + false + ] }, "hash": "e9d422daf774d099fcbde6c4cda35821da948bd86cc57798b4d8375baf0b51ae" } diff --git a/.sqlx/query-ec65e362e0c292da360cd00a751564d5fa224c242232361f6890f522f38e2934.json b/.sqlx/query-ec65e362e0c292da360cd00a751564d5fa224c242232361f6890f522f38e2934.json new file mode 100644 index 000000000..e63294283 --- /dev/null +++ b/.sqlx/query-ec65e362e0c292da360cd00a751564d5fa224c242232361f6890f522f38e2934.json @@ -0,0 +1,20 @@ +{ + "db_name": "SQLite", + "query": "\n SELECT COALESCE(MAX(hlc_counter), -1) + 1 as \"next_counter: i64\"\n FROM swap_states\n WHERE swap_id = ? AND entered_at = ?\n ", + "describe": { + "columns": [ + { + "name": "next_counter: i64", + "ordinal": 0, + "type_info": "Integer" + } + ], + "parameters": { + "Right": 2 + }, + "nullable": [ + false + ] + }, + "hash": "ec65e362e0c292da360cd00a751564d5fa224c242232361f6890f522f38e2934" +} diff --git a/Cargo.lock b/Cargo.lock index d9f06ff3c..faf3537e3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -787,6 +787,54 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "automerge" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02fba3b76f952e13270300d8a1aca3476942bba013de5e64ef538ef5607b40df" +dependencies = [ + "cfg-if", + "flate2", + "fxhash", + "hex", + "im", + "itertools 0.13.0", + "leb128", + "serde", + "sha2 0.10.9", + "smol_str", + "thiserror 1.0.69", + "tinyvec", + "tracing", + "unicode-segmentation", + "uuid", +] + +[[package]] +name = "autosurgeon" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb09e782087199bfbea2848b7dc808a93eaaf9a185d2706a6efabc7e606fcb04" +dependencies = [ + "automerge", + "autosurgeon-derive", + "similar", + "thiserror 1.0.69", + "uuid", +] + +[[package]] +name = "autosurgeon-derive" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbf16fd191f2b254f0cc64783196f818ebe02c9c918deee82fe96fad07a72aa6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.104", + "thiserror 1.0.69", +] + [[package]] name = "aws-lc-rs" version = "1.13.3" @@ -1264,6 +1312,15 @@ dependencies = [ "serde", ] +[[package]] +name = "bitmaps" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "031043d04099746d8db04daf1fa424b2bc8bd69d92b25962dcde24da39ab64a2" +dependencies = [ + "typenum", +] + [[package]] name = "bitvec" version = "1.0.1" @@ -1285,6 +1342,19 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "blake3" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + [[package]] name = "blanket" version = "0.3.0" @@ -3175,6 +3245,47 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "eigensync-client" +version = "0.1.0" +dependencies = [ + "anyhow", + "automerge", + "autosurgeon", + "eigensync-protocol", + "libp2p", + "tokio", + "tokio-util", + "tracing", +] + +[[package]] +name = "eigensync-protocol" +version = "0.1.0" +dependencies = [ + "anyhow", + "automerge", + "blake3", + "chacha20poly1305", + "libp2p", + "serde", +] + +[[package]] +name = "eigensync-server" +version = "0.1.0" +dependencies = [ + "anyhow", + "clap 4.5.42", + "eigensync-protocol", + "hex", + "libp2p", + "sqlx", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "either" version = "1.15.0" @@ -4924,6 +5035,20 @@ dependencies = [ "xmltree", ] +[[package]] +name = "im" +version = "15.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0acd33ff0285af998aaf9b57342af478078f53492322fafc47450e09397e0e9" +dependencies = [ + "bitmaps", + "rand_core 0.6.4", + "rand_xoshiro", + "sized-chunks", + "typenum", + "version_check", +] + [[package]] name = "image" version = "0.25.6" @@ -5111,6 +5236,15 @@ dependencies = [ "either", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.14.0" @@ -5454,6 +5588,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "leb128" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884e2677b40cc8c339eaefcb701c32ef1fd2493d71118dc0ca4b6a736c93bd67" + [[package]] name = "libappindicator" version = "0.9.0" @@ -8492,6 +8632,15 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand_xoshiro" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f97cdb2a36ed4183de61b2f824cc45c9f1037f28afe0a322e9fff4c108b5aaa" +dependencies = [ + "rand_core 0.6.4", +] + [[package]] name = "raw-window-handle" version = "0.6.2" @@ -10007,6 +10156,10 @@ name = "similar" version = "2.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbbb5d9659141646ae647b42fe094daf6c6192d1620870b449d9557f748b2daa" +dependencies = [ + "bstr", + "unicode-segmentation", +] [[package]] name = "siphasher" @@ -10020,6 +10173,16 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" +[[package]] +name = "sized-chunks" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16d69225bde7a69b235da73377861095455d298f2b970996eec25ddbb42b3d1e" +dependencies = [ + "bitmaps", + "typenum", +] + [[package]] name = "slab" version = "0.4.10" @@ -10073,6 +10236,15 @@ dependencies = [ "serde", ] +[[package]] +name = "smol_str" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd538fb6910ac1099850255cf94a94df6551fbdd602454387d0adb2d1ca6dead" +dependencies = [ + "serde", +] + [[package]] name = "snow" version = "0.9.6" @@ -10615,6 +10787,8 @@ dependencies = [ "async-trait", "asynchronous-codec 0.7.0", "atty", + "automerge", + "autosurgeon", "backoff", "base64 0.22.1", "bdk", @@ -10636,6 +10810,7 @@ dependencies = [ "dialoguer", "ecdsa_fun", "ed25519-dalek 1.0.1", + "eigensync-client", "electrum-pool", "fns", "futures", diff --git a/Cargo.toml b/Cargo.toml index 1a670d804..a1cbc8bb0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "2" -members = [ +members = [ "eigensync-client", "eigensync-protocol", "eigensync-server", "electrum-pool", "libp2p-rendezvous-server", "libp2p-tor", @@ -41,17 +41,21 @@ once_cell = "1.19" rand = "0.8" reqwest = { version = "0.12", default-features = false, features = ["json"] } rust_decimal = { version = "1", features = ["serde-float"] } -rust_decimal_macros = "1" serde = { version = "1.0", features = ["derive"] } serde_json = "1" thiserror = "1" tokio = { version = "1", features = ["rt-multi-thread", "time", "macros", "sync"] } toml = "0.9.5" tracing = { version = "0.1", features = ["attributes"] } +clap = { version = "4", features = ["derive"] } tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt", "ansi", "env-filter", "time", "tracing-log", "json"] } typeshare = "1.0" url = { version = "2", features = ["serde"] } uuid = { version = "1", features = ["v4"] } +sqlx = { version = "0.8", features = ["sqlite", "runtime-tokio-rustls", "macros", "chrono"] } +automerge = "0.6.1" +autosurgeon = { version = "0.8.7", features = ["uuid"] } +tokio-util = { version = "0.7.16", features = ["rt"] } # Tor/Arti crates arti-client = { git = "https://github.com/eigenwallet/arti", rev = "18111286b5830cda88af5df1950b5e24ee5a8841", default-features = false } diff --git a/eigensync-client/Cargo.toml b/eigensync-client/Cargo.toml new file mode 100644 index 000000000..3f122c270 --- /dev/null +++ b/eigensync-client/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "eigensync-client" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = { workspace = true } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "macros", + "sync", + "time", +] } +tokio-util = { workspace = true } +tracing = { workspace = true } + +libp2p = { workspace = true, features = [ + "request-response", + "tcp", + "noise", + "yamux", + "macros", + "cbor", + "tokio", + "ping", +] } +automerge = { workspace = true } +autosurgeon = { workspace = true } +eigensync-protocol = { version = "0.1.0", path = "../eigensync-protocol" } + +[lints] +workspace = true diff --git a/eigensync-client/src/lib.rs b/eigensync-client/src/lib.rs new file mode 100644 index 000000000..9590ae489 --- /dev/null +++ b/eigensync-client/src/lib.rs @@ -0,0 +1,358 @@ +use std::{collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; + +use anyhow::{Context}; +use libp2p::{ + futures::StreamExt, + identity, noise, request_response::{self, OutboundRequestId}, + swarm::SwarmEvent, + tcp, yamux, Multiaddr, PeerId, Swarm, SwarmBuilder +}; +use tokio::{sync::{mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, oneshot, RwLock}, task}; +use automerge::{ActorId, AutoCommit, Change}; +use autosurgeon::{hydrate, reconcile, Hydrate, Reconcile}; +use tokio_util::task::AbortOnDropHandle; +use eigensync_protocol::{client, Behaviour, BehaviourEvent, Response, SerializedChange, EncryptedChange, ServerRequest}; + +/// High-level handle for synchronizing a typed application state with an +/// Eigensync server using Automerge and libp2p. +/// +/// This handle owns an Automerge `AutoCommit` document and a client-side +/// networking loop. It provides: +/// - hydration of the CRDT into a typed `T` via `autosurgeon::hydrate` +/// - reconciliation of edits back into the CRDT via `autosurgeon::reconcile` +/// - signing, encrypting, and uploading local changes to the server +/// - downloading, verifying, decrypting, and applying remote changes +/// +/// Usage outline: +/// 1) Construct with `new(server_addr, server_id, keypair, encryption_key)`. It starts networking +/// without blocking for connectivity. +/// 2) Read the current typed state with `get_document_state()`. +/// 3) Modify state with `modify(|state| { /* mutate */ Ok(()) })`, which reconciles into the CRDT. +/// 4) Call `sync_with_server().await` to push/pull changes, or wrap the handle in `Arc>` +/// and use `EigensyncHandleBackgroundSync::background_sync()` for periodic syncing. +/// +/// Security notes: +/// - Changes are encrypted client-side with XChaCha20-Poly1305 using a 32-byte key. +/// - A deterministic nonce is derived from the plaintext plus key using BLAKE3 to make +/// re-encryption idempotent; do not reuse the same key across unrelated datasets. +/// +/// Type parameter: +/// - `T: Reconcile + Hydrate + Default + Debug` is your typed view over the CRDT document. +/// +/// Example (simplified): +/// ```ignore +/// use autosurgeon::{Hydrate, Reconcile}; +/// use eigensync::EigensyncHandle; +/// use libp2p::{identity, Multiaddr, PeerId}; +/// +/// #[derive(Debug, Default, Hydrate, Reconcile)] +/// struct AppState { +/// // your fields +/// } +/// +/// # async fn demo() -> anyhow::Result<()> { +/// let server_addr: Multiaddr = "/ip4/127.0.0.1/tcp/3333".parse()?; +/// let server_id: PeerId = "12D3KooW...".parse()?; // server's PeerId +/// let keypair = identity::Keypair::generate_ed25519(); +/// let encryption_key = [0u8; 32]; +/// +/// let mut handle = EigensyncHandle::::new( +/// server_addr, server_id, keypair, encryption_key, +/// ).await?; +/// +/// // Read current state +/// let _state = handle.get_document_state()?; +/// +/// // Make changes +/// handle.modify(|s| { +/// // mutate s +/// Ok(()) +/// })?; +/// +/// // Push/pull +/// handle.sync_with_server().await?; +/// # Ok(()) } +/// ``` +pub struct EigensyncHandle { + pub document: AutoCommit, + sender: UnboundedSender, + encryption_key: [u8; 32], + _marker: PhantomData, + connection_ready_rx: Option>, +} + +impl EigensyncHandle { + pub async fn new(server_addr: Multiaddr, server_id: PeerId, keypair: identity::Keypair, encryption_key: [u8; 32]) -> anyhow::Result { + + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + ) + .context("Failed to create TCP transport")? + .with_behaviour(|_| Ok(client())) + .context("Failed to create behaviour")? + .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::MAX)) + .build(); + + swarm.add_peer_address(server_id.clone(), server_addr.clone()); + + swarm.dial(server_id).context("Failed to dial")?; + + let (sender, receiver) = unbounded_channel(); + let (connection_ready_tx, connection_ready_rx) = oneshot::channel(); + + task::spawn(async move { + SyncLoop::new(receiver, swarm, connection_ready_tx).await.run(server_id).await.unwrap(); + }); + + let document = AutoCommit::new().with_actor(ActorId::random()); + + let handle = Self { + document, + _marker: PhantomData, + sender, + encryption_key: encryption_key.clone(), + connection_ready_rx: Some(connection_ready_rx), + }; + + Ok(handle) + } + + pub fn get_changes(&mut self) -> Vec { + self.document + .get_changes(&[]) + .iter() + .map(|c| (*c).clone()) + .collect() + } + + pub fn get_document_state(&mut self) -> anyhow::Result { + hydrate(&self.document).context("Failed to hydrate document") + } + + pub async fn sync_with_server(&mut self) -> anyhow::Result<()> { + let (sender, receiver) = oneshot::channel(); + + let changes: Vec = self.get_changes().into_iter().map(SerializedChange::from).collect(); + + // Encrypt each change (deterministic nonce) + let encrypted_changes: Vec = changes + .iter() + .map(|c| c.sign_and_encrypt(&self.encryption_key)) + .collect::>()?; + + self.sender + .send(ChannelRequest { encrypted_changes, response_channel: sender }) + .context("Failed to send changes to server")?; + + let new_changes_serialized = receiver.await?.map_err(|e| anyhow::anyhow!(e))?; + + // Decrypt as early as possible; warn and skip on failure + let decrypted_serialized: Vec = new_changes_serialized + .into_iter() + .enumerate() + .filter_map(|(i, c)| { + match c.decrypt_and_verify(&self.encryption_key) { + Ok(pt) => Some(pt), + Err(e) => { + tracing::warn!("Ignoring invalid change #{}: {}", i, e); + None + } + } + }) + .collect(); + + // Try to deserialize; warn and skip on failure + let new_changes: Vec = decrypted_serialized + .into_iter() + .enumerate() + .filter_map(|(i, sc)| { + match Change::from_bytes(sc.to_bytes()) { + Ok(ch) => Some(ch), + Err(e) => { + tracing::warn!("Ignoring undecodable change #{}: {}", i, e); + None + } + } + }) + .collect(); + + self.document.apply_changes(new_changes)?; + + Ok(()) + } + + pub fn modify(&mut self, f: impl FnOnce(&mut T) -> anyhow::Result<()>) -> anyhow::Result<()> { + let mut state = hydrate(&self.document).context("Failed to hydrate document")?; + f(&mut state)?; + reconcile(&mut self.document, state) + .context("Failed to reconcile")?; + + Ok(()) + } +} + +#[derive(Debug)] +pub struct ChannelRequest { + pub encrypted_changes: Vec, + pub response_channel: oneshot::Sender>> +} + +pub struct SyncLoop { + receiver: UnboundedReceiver, + response_map: HashMap>>>, + swarm: Swarm, + connection_established: Option>, +} + +impl SyncLoop { + pub async fn new(receiver: UnboundedReceiver, swarm: Swarm, connection_established: oneshot::Sender<()>) -> Self { + + Self { receiver, response_map: HashMap::new(), swarm, connection_established: Some(connection_established)} + } + + pub fn sync_with_server(&mut self, request: ChannelRequest, server_id: PeerId) { + let server_request = ServerRequest::UploadChangesToServer { encrypted_changes: request.encrypted_changes }; + let request_id = self.swarm.behaviour_mut().send_request(&server_id, server_request); + self.response_map.insert(request_id, request.response_channel); + } + + pub async fn run(&mut self, server_id: PeerId) -> anyhow::Result<()> { + loop { + tokio::select! { + event = self.swarm.select_next_some() => { + if let Err(e) = handle_event( + event, + server_id, + &mut self.swarm, + &mut self.response_map, + self.connection_established.take() + ).await { + tracing::error!(%e, "Eigensync event handling failed"); + } + }, + request_from_handle = self.receiver.recv() => { + if let Some(request) = request_from_handle { + self.sync_with_server(request, server_id); + } + } + } + } + } +} + +pub async fn handle_event( + event: SwarmEvent, + _server_id: PeerId, + _swarm: &mut Swarm, + response_map: &mut HashMap>>>, + mut connection_established: Option>, +) -> anyhow::Result<()> { + Ok(match event { + SwarmEvent::Behaviour(BehaviourEvent::Sync(request_response::Event::Message { + peer: _, + message, + })) => match message { + request_response::Message::Response { + request_id, + response, + } => match response { + Response::NewChanges { changes } => { + let sender = response_map.remove(&request_id).context(format!("No sender for request id {:?}", request_id))?; + + if let Err(e) = sender.send(Ok(changes)) { + tracing::error!("Failed to send changes to client: {:?}", e); + } + }, + Response::Error { reason } => { + let sender = response_map.remove(&request_id).context(format!("No sender for request id {:?}", request_id))?; + + if let Err(e) = sender.send(Err(anyhow::anyhow!(reason.clone()))) { + tracing::error!("Failed to send error to client: {:?}", e); + } + }, + }, + request_response::Message::Request { + request: _, + channel: _, + request_id: _, + } => { + tracing::error!("Received the request when we're the client"); + } + }, + SwarmEvent::Behaviour(BehaviourEvent::Sync(request_response::Event::OutboundFailure { + peer: _, + request_id, + error, + })) => { + let sender = response_map.remove(&request_id).context(format!("No sender for request id {:?}", request_id))?; + + if let Err(e) = sender.send(Err(anyhow::anyhow!(error.to_string()))) { + tracing::error!("Failed to send error to client: {:?}", e); + } + } + SwarmEvent::ConnectionEstablished { peer_id: _peer_id, .. } => { + // send the connection established signal + if let Some(sender) = connection_established.take() { + if let Err(e) = sender.send(()) { + tracing::error!("Failed to send connection established signal to client: {:?}", e); + } + } + }, + other => tracing::debug!("Received event: {:?}", other), + }) +} + + + +pub trait EigensyncHandleBackgroundSync { + fn background_sync(&mut self) -> AbortOnDropHandle<()>; +} + +impl EigensyncHandleBackgroundSync for Arc>> +where + T: Reconcile + Hydrate + Default + Debug + Send + Sync + 'static, +{ + fn background_sync(&mut self) -> AbortOnDropHandle<()> { + let handle = self.clone(); + AbortOnDropHandle::new(tokio::task::spawn(async move { + let mut seeded_default = false; + let connection_ready_rx = { + let mut guard = handle.write().await; + guard.connection_ready_rx.take() + }; + if let Some(rx) = connection_ready_rx { + if let Err(e) = rx.await { + tracing::error!(%e, "Background sync failed, continuing, seeded_default: {}", seeded_default); + return; + } + } + loop { + println!("Background sync loop"); + // Try sync; if offline, we still proceed to seed default once + if let Err(e) = handle.write().await.sync_with_server().await { + tracing::error!(%e, "Background sync failed, continuing, seeded_default: {}", seeded_default); + continue; + } + + if !seeded_default { + println!("Seeding default eigensync state"); + let mut guard = handle.write().await; + if guard.document.get_changes(&[]).is_empty() { + let state = T::default(); + println!("Seeding default eigensync state, document is empty"); + if let Err(e) = reconcile(&mut guard.document, &state) { + tracing::error!(error = ?e, "Failed to seed default eigensync state, continuing"); + } + } + seeded_default = true; + } + + tokio::time::sleep(Duration::from_secs(2)).await; + } + })) + } +} \ No newline at end of file diff --git a/eigensync-protocol/Cargo.toml b/eigensync-protocol/Cargo.toml new file mode 100644 index 000000000..768226503 --- /dev/null +++ b/eigensync-protocol/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "eigensync-protocol" +version = "0.1.0" +edition = "2024" + +[dependencies] +serde = { workspace = true } +anyhow = { workspace = true } + +automerge = { workspace = true } +chacha20poly1305 = { version = "0.10", features = ["alloc", "std"] } +libp2p = { workspace = true, features = [ + "request-response", + "tcp", + "noise", + "yamux", + "macros", + "cbor", + "tokio", + "ping", +] } +blake3 = "1" + +[lints] +workspace = true diff --git a/eigensync-protocol/src/lib.rs b/eigensync-protocol/src/lib.rs new file mode 100644 index 000000000..8ad4a753e --- /dev/null +++ b/eigensync-protocol/src/lib.rs @@ -0,0 +1,170 @@ +use automerge::Change; +use chacha20poly1305::aead::{Aead, Payload}; +use chacha20poly1305::{KeyInit, XChaCha20Poly1305, XNonce}; +use libp2p::request_response::{ProtocolSupport}; +use libp2p::swarm::NetworkBehaviour; +use libp2p::{ping, request_response, StreamProtocol}; +use serde::{Deserialize, Serialize}; +use std::ops::{Deref, DerefMut}; +use std::time::Duration; + +const PROTOCOL: &str = "/eigensync/1.0.0"; + +#[derive(NetworkBehaviour)] +pub struct Behaviour { + ping: ping::Behaviour, + sync: SyncBehaviour, +} + +pub type SyncBehaviour = request_response::cbor::Behaviour; + +#[derive(Debug, Clone, Copy, Default)] +pub struct EigensyncProtocol; + +impl AsRef for EigensyncProtocol { + fn as_ref(&self) -> &str { + PROTOCOL + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct SerializedChange(Vec); + +impl SerializedChange { + pub fn new(data: Vec) -> Self { + SerializedChange(data) + } + + pub fn to_bytes(&self) -> Vec { + self.0.clone() + } + + pub fn sign_and_encrypt(&self, enc_key: &[u8; 32]) -> anyhow::Result { + let key32 = key32(enc_key)?; + let aead = XChaCha20Poly1305::new((&key32).into()); + let pt = self.to_bytes(); + let nonce = nonce_from_plaintext(&key32, &pt); + let ct = aead.encrypt( + XNonce::from_slice(&nonce), + Payload { msg: &pt, aad: b"eigensync.v1" }, + )?; + let mut out = Vec::with_capacity(24 + ct.len()); + out.extend_from_slice(&nonce); + out.extend_from_slice(&ct); + Ok(EncryptedChange::new(out)) + } +} + +impl From for SerializedChange { + fn from(mut change: Change) -> Self { + SerializedChange(change.bytes().to_vec()) + } +} + +impl From for Change { + fn from(change: SerializedChange) -> Self { + Change::from_bytes(change.0).unwrap() + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub struct EncryptedChange(Vec); + +impl EncryptedChange { + pub fn new(data: Vec) -> Self { + EncryptedChange(data) + } + + pub fn to_bytes(&self) -> Vec { + self.0.clone() + } + + pub fn decrypt_and_verify(self, enc_key: &[u8; 32]) -> anyhow::Result { + let key32 = key32(enc_key)?; + let aead = XChaCha20Poly1305::new((&key32).into()); + let buf = self.to_bytes(); + anyhow::ensure!(buf.len() >= 24, "ciphertext too short"); + let (nonce, ct) = buf.split_at(24); + let pt = aead.decrypt( + XNonce::from_slice(nonce), + Payload { msg: ct, aad: b"eigensync.v1" }, + )?; + Ok(SerializedChange::new(pt)) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum ServerRequest { + UploadChangesToServer { + encrypted_changes: Vec + } +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum Response { + /// When the server has changes the device hasn't yet + NewChanges { + changes: Vec, + }, + Error { + reason: String, + }, +} + +pub fn server() -> Behaviour { + Behaviour { + ping: ping::Behaviour::new(ping::Config::default()), + sync: SyncBehaviour::new( + vec![( + StreamProtocol::new(EigensyncProtocol.as_ref()), + ProtocolSupport::Inbound, + )], + request_response::Config::default() + .with_request_timeout(Duration::from_secs(30)) + .with_max_concurrent_streams(10), + ), + } +} + +pub fn client() -> Behaviour { + Behaviour { + ping: ping::Behaviour::new(ping::Config::default()), + sync: SyncBehaviour::new( + vec![( + StreamProtocol::new(EigensyncProtocol.as_ref()), + ProtocolSupport::Outbound, + )], + request_response::Config::default() + .with_request_timeout(Duration::from_secs(30)) + .with_max_concurrent_streams(10), + ), + } +} + +impl Deref for Behaviour { + type Target = SyncBehaviour; + + fn deref(&self) -> &Self::Target { + &self.sync + } +} + +impl DerefMut for Behaviour { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.sync + } +} + +fn key32(key: &[u8]) -> anyhow::Result<[u8; 32]> { + key.try_into().map_err(|_| anyhow::anyhow!("encryption key must be 32 bytes")) +} + +fn nonce_from_plaintext(key32: &[u8; 32], pt: &[u8]) -> [u8; 24] { + let mut h = blake3::Hasher::new_keyed(key32); + h.update(b"eigensync.v1.nonce"); + h.update(pt); + let out = h.finalize(); + let mut nonce = [0u8; 24]; + nonce.copy_from_slice(&out.as_bytes()[..24]); + nonce +} \ No newline at end of file diff --git a/eigensync-server/Cargo.toml b/eigensync-server/Cargo.toml new file mode 100644 index 000000000..1ad049f11 --- /dev/null +++ b/eigensync-server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "eigensync-server" +version = "0.1.0" +edition = "2024" + +[dependencies] +anyhow = { workspace = true } +libp2p = { workspace = true } +eigensync-protocol = { version = "0.1.0", path = "../eigensync-protocol" } +sqlx = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +clap = { workspace = true } +tokio = { workspace = true } +hex = { workspace = true } + +[lints] +workspace = true diff --git a/eigensync-server/migrations/20250829103047_create_initial_table.sql b/eigensync-server/migrations/20250829103047_create_initial_table.sql new file mode 100644 index 000000000..a22708e25 --- /dev/null +++ b/eigensync-server/migrations/20250829103047_create_initial_table.sql @@ -0,0 +1,10 @@ +-- Add migration script here +-- Add migration script here + +CREATE TABLE change ( + peer_id TEXT NOT NULL, + id INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL, + change BLOB NOT NULL +); + +CREATE UNIQUE INDEX IF NOT EXISTS idx_change_unique ON change(peer_id, change); \ No newline at end of file diff --git a/eigensync-server/src/main.rs b/eigensync-server/src/main.rs new file mode 100644 index 000000000..329538a24 --- /dev/null +++ b/eigensync-server/src/main.rs @@ -0,0 +1,191 @@ +use std::{fs::{self, File}, io::Write, path::{Path, PathBuf}, str::FromStr, time::Duration}; + +use anyhow::{Context}; +use eigensync_protocol::{server, Behaviour, BehaviourEvent, Response, ServerRequest}; +use libp2p::{ + futures::StreamExt, identity::{self, ed25519}, noise, request_response, swarm::SwarmEvent, tcp, yamux, Multiaddr, Swarm, SwarmBuilder +}; +use tracing_subscriber::EnvFilter; + +use anyhow::{Result}; + +use libp2p::PeerId; +use sqlx::{sqlite::{SqliteConnectOptions, SqlitePoolOptions}, SqlitePool}; +use tracing::info; + +use eigensync_protocol::EncryptedChange; + +#[derive(Clone)] +pub struct Database { + pub pool: SqlitePool, +} + +impl Database { + pub async fn new(data_dir: PathBuf) -> Result { + if !data_dir.exists() { + std::fs::create_dir_all(&data_dir)?; + info!(data_dir = %data_dir.display(), "Created server database directory"); + } + + let db_path = data_dir.join("changes"); + let connect_options = SqliteConnectOptions::new() + .filename(&db_path) + .create_if_missing(true); + + let pool = SqlitePoolOptions::new() + .connect_with(connect_options) + .await?; + + let db = Self { pool }; + db.migrate().await?; + + Ok(db) + } + + async fn migrate(&self) -> Result<()> { + sqlx::migrate!("./migrations").run(&self.pool).await?; + info!("Server database migration completed"); + Ok(()) + } + + pub async fn get_peer_changes(&self, peer_id: PeerId) -> Result> { + let peer_id = peer_id.to_string(); + + let rows = sqlx::query!( + r#" + SELECT change + FROM change + WHERE peer_id = ? + ORDER BY id DESC + "#, + peer_id + ) + .fetch_all(&self.pool) + .await?; + + let changes = rows.iter().map(|row| EncryptedChange::new(row.change.clone())).collect(); + + Ok(changes) + } + + pub async fn insert_peer_changes(&self, peer_id: PeerId, changes: Vec) -> Result<()> { + let peer_id = peer_id.to_string(); + + for change in changes { + let serialized = change.to_bytes(); + sqlx::query!( + r#" + INSERT or IGNORE INTO change (peer_id, change) + VALUES (?, ?) + "#, + peer_id, + serialized + ) + .execute(&self.pool) + .await?; + } + + Ok(()) + } +} + +use clap::Parser; + +#[derive(Parser)] +struct Cli { + #[arg(long)] + pub data_dir: PathBuf +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + // Initialize tracing with info level + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::new("info")) + .init(); + + let data_dir = Cli::parse().data_dir; + + let db = Database::new(data_dir.clone()).await?; + + let multiaddr = Multiaddr::from_str("/ip4/127.0.0.1/tcp/3333")?; + + let file_path_buf = data_dir.join("seed.hex"); + let file_path = Path::new(&file_path_buf); + let keypair = if file_path.exists() { + let contents = fs::read_to_string(file_path)?; + identity::Keypair::ed25519_from_bytes(hex::decode(contents)?).unwrap() + } else { + let secret_key = ed25519::SecretKey::generate(); + let mut file = File::create(file_path)?; + file.write_all(hex::encode(secret_key.as_ref()).as_bytes())?; + identity::Keypair::from(ed25519::Keypair::from(secret_key)) + }; + + let mut swarm = SwarmBuilder::with_existing_identity(keypair) + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default, + ) + .context("Failed to create TCP transport")? + .with_behaviour(|_| Ok(server())) + .context("Failed to create behaviour")? + .with_swarm_config(|cfg| cfg.with_idle_connection_timeout(Duration::MAX)) + .build(); + + swarm.listen_on(multiaddr.clone())?; + + tracing::info!( + "Listening on {}/p2p/{}", + multiaddr, + swarm.local_peer_id() + ); + + loop { + tokio::select! { + event = swarm.select_next_some() => handle_event(&mut swarm, event, &db).await? + } + } +} + +async fn handle_event(swarm: &mut Swarm, event: SwarmEvent, db: &Database) -> anyhow::Result<()> { + + match event { + SwarmEvent::Behaviour(BehaviourEvent::Sync(request_response::Event::Message { + peer, + message, + })) => { + match message { + request_response::Message::Request { request, channel, .. } => { + match request { + ServerRequest::UploadChangesToServer { encrypted_changes: changes } => { + let saved_changed_of_peer = db.get_peer_changes(peer).await?; + let changes_clone = changes.clone(); + tracing::info!("Received {} changes from client", changes.len()); + let uploaded_new_changes: Vec<_> = changes.into_iter().filter(|c| !saved_changed_of_peer.contains(c)).collect(); + db.insert_peer_changes(peer, uploaded_new_changes).await?; + + let changes_client_is_missing: Vec<_> = db.get_peer_changes(peer).await?.iter().filter(|c| !changes_clone.contains(c)).cloned().collect(); + tracing::info!("Sending {} changes to client", changes_client_is_missing.len()); + let response = Response::NewChanges { changes: changes_client_is_missing }; + swarm.behaviour_mut().send_response(channel, response).expect("Failed to send response"); + } + } + } + request_response::Message::Response { request_id, .. } => tracing::info!("Received response for request of id {:?}", request_id), + } + } + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + tracing::info!("Connection established with peer: {:?}", peer_id); + } + SwarmEvent::ConnectionClosed { peer_id, .. } => { + tracing::info!("Connection closed with peer: {:?}", peer_id); + } + other => { + tracing::info!("Received event: {:?}", other); + }, + }; + Ok(()) +} diff --git a/regenerate_sqlx_cache.sh b/regenerate_sqlx_cache.sh index 87130bc4e..299fd198f 100755 --- a/regenerate_sqlx_cache.sh +++ b/regenerate_sqlx_cache.sh @@ -41,7 +41,7 @@ export DATABASE_URL echo "🗄️ Creating database..." cargo sqlx database create -for dir in swap monero-sys monero-rpc-pool; do +for dir in swap monero-sys monero-rpc-pool eigensync; do echo "🔄 Running migrations in $dir..." (cd "$WORKSPACE_ROOT/$dir" && rm -rf .sqlx && cargo sqlx migrate run --ignore-missing) done diff --git a/src-gui/src/renderer/components/pages/help/SettingsBox.tsx b/src-gui/src/renderer/components/pages/help/SettingsBox.tsx index b4ddd8aa1..e94422eca 100644 --- a/src-gui/src/renderer/components/pages/help/SettingsBox.tsx +++ b/src-gui/src/renderer/components/pages/help/SettingsBox.tsx @@ -41,6 +41,7 @@ import { setEnableMoneroTor, setUseMoneroRpcPool, setDonateToDevelopment, + setEigensyncServer, } from "store/features/settingsSlice"; import { useAppDispatch, useNodes, useSettings } from "store/hooks"; import ValidatedTextField from "renderer/components/other/ValidatedTextField"; @@ -100,6 +101,7 @@ export default function SettingsBox() { + @@ -994,3 +996,37 @@ function DonationTipSetting() { ); } + +/** + * A setting that allows you to configure the Eigensync server multiaddr + */ +function EigensyncServerSetting() { + const eigensyncServer = useSettings((s) => s.eigensyncServer); + const dispatch = useAppDispatch(); + + const handleServerChange = (newServer: string) => { + dispatch(setEigensyncServer(newServer)); + }; + + return ( + + + + + + + + + ); +} diff --git a/src-gui/src/renderer/components/pages/history/table/HistoryRowExpanded.tsx b/src-gui/src/renderer/components/pages/history/table/HistoryRowExpanded.tsx index 6dd131615..4191e63c4 100644 --- a/src-gui/src/renderer/components/pages/history/table/HistoryRowExpanded.tsx +++ b/src-gui/src/renderer/components/pages/history/table/HistoryRowExpanded.tsx @@ -20,6 +20,7 @@ import { isTestnet } from "store/config"; import { getBitcoinTxExplorerUrl } from "utils/conversionUtils"; import SwapLogFileOpenButton from "./SwapLogFileOpenButton"; import ExportLogsButton from "./ExportLogsButton"; +import { parseDateString } from "utils/parseUtils"; export default function HistoryRowExpanded({ swap, @@ -39,7 +40,9 @@ export default function HistoryRowExpanded({ Started on - {swap.start_date} + + {new Date(parseDateString(swap.start_date)).toLocaleString()} + Swap ID @@ -112,7 +115,7 @@ export default function HistoryRowExpanded({ Monero receive pool - {swap.monero_receive_pool.map((pool, index) => ( + {swap.monero_receive_pool?.map((pool, index) => ( ) { slice.enableMoneroTor = action.payload; }, + setEigensyncServer(slice, action: PayloadAction) { + slice.eigensyncServer = action.payload; + }, setUseMoneroRpcPool(slice, action: PayloadAction) { slice.useMoneroRpcPool = action.payload; }, @@ -250,6 +256,7 @@ export const { setFiatCurrency, setTorEnabled, setEnableMoneroTor, + setEigensyncServer, setUseMoneroRpcPool, setUserHasSeenIntroduction, addRendezvousPoint, diff --git a/src-gui/src/utils/parseUtils.ts b/src-gui/src/utils/parseUtils.ts index 99e332a2c..f12e28059 100644 --- a/src-gui/src/utils/parseUtils.ts +++ b/src-gui/src/utils/parseUtils.ts @@ -19,28 +19,8 @@ export function extractAmountFromUnitString(text: string): number | null { } // E.g: 2024-08-19 6:11:37.475038 +00:00:00 -export function parseDateString(str: string): number { - // Split the string and take only the date and time parts - const [datePart, timePart] = str.split(" "); - - if (!datePart || !timePart) { - throw new Error(`Invalid date string format: ${str}`); - } - - // Parse time part - const [hours, minutes, seconds] = timePart.split(":"); - const paddedHours = hours.padStart(2, "0"); // Ensure two-digit hours - - // Combine date and time parts, ensuring two-digit hours - const dateTimeString = `${datePart}T${paddedHours}:${minutes}:${seconds.split(".")[0]}Z`; - - const date = Date.parse(dateTimeString); - - if (Number.isNaN(date)) { - throw new Error(`Date string could not be parsed: ${str}`); - } - - return date; +export function parseDateString(unix_epoch_seconds: number): number { + return unix_epoch_seconds * 1000; } export function getLinesOfString(data: string): string[] { diff --git a/src-tauri/src/lib.rs b/src-tauri/src/lib.rs index 2e6227daa..9087b9a00 100644 --- a/src-tauri/src/lib.rs +++ b/src-tauri/src/lib.rs @@ -445,6 +445,7 @@ async fn initialize_context( .with_debug(true) .with_tor(settings.use_tor) .with_enable_monero_tor(settings.enable_monero_tor) + .with_eigensync_server(settings.eigensync_server_multiaddr.clone()) .with_tauri(tauri_handle.clone()) .build() .await; diff --git a/swap-asb/src/main.rs b/swap-asb/src/main.rs index f9d3e2fea..2850f37fc 100644 --- a/swap-asb/src/main.rs +++ b/swap-asb/src/main.rs @@ -330,7 +330,7 @@ pub async fn main() -> Result<()> { ]); let all_swaps = db.all().await?; - for (swap_id, state) in all_swaps { + for (swap_id, state, _) in all_swaps { let state: AliceState = state .try_into() .expect("Alice database only has Alice states"); diff --git a/swap-controller/Cargo.toml b/swap-controller/Cargo.toml index 5439f10d0..4d0a2ad5b 100644 --- a/swap-controller/Cargo.toml +++ b/swap-controller/Cargo.toml @@ -9,7 +9,7 @@ path = "src/main.rs" [dependencies] anyhow = { workspace = true } -clap = { version = "4", features = ["derive"] } +clap = { workspace = true } jsonrpsee = { workspace = true, features = ["client-core", "http-client"] } monero = { workspace = true } rustyline = "17.0.0" diff --git a/swap/Cargo.toml b/swap/Cargo.toml index c41db2d1a..1ef8cc903 100644 --- a/swap/Cargo.toml +++ b/swap/Cargo.toml @@ -81,7 +81,7 @@ swap-serde = { path = "../swap-serde" } tauri = { version = "2.0", features = ["config-json5"], optional = true, default-features = false } thiserror = { workspace = true } throttle = { path = "../throttle" } -time = "0.3" +time = { version = "0.3", features = ["serde"] } tokio = { workspace = true, features = ["process", "fs", "net", "parking_lot", "rt"] } tokio-tungstenite = { version = "0.15", features = ["rustls-tls"] } tokio-util = { version = "0.7", features = ["io", "codec", "rt"] } @@ -97,6 +97,9 @@ url = { workspace = true } uuid = { workspace = true, features = ["serde"] } void = "1" zeroize = "1.8.1" +autosurgeon = { version = "0.8.7", features = ["uuid"] } +automerge = { workspace = true } +eigensync-client = { version = "0.1.0", path = "../eigensync-client" } [target.'cfg(not(windows))'.dependencies] tokio-tar = "0.3" diff --git a/swap/migrations/20250827161545_add_unique_constraint_monero_addresses.sql b/swap/migrations/20250827161545_add_unique_constraint_monero_addresses.sql new file mode 100644 index 000000000..8c2a68150 --- /dev/null +++ b/swap/migrations/20250827161545_add_unique_constraint_monero_addresses.sql @@ -0,0 +1,12 @@ +-- Ensure uniqueness of (swap_id, address) including NULL-address rows + +-- 3) Create null-safe unique indexes +-- Prevent duplicates when address IS NOT NULL +CREATE UNIQUE INDEX IF NOT EXISTS idx_monero_addresses_unique_nonnull +ON monero_addresses(swap_id, address) +WHERE address IS NOT NULL; + +-- Prevent duplicates when address IS NULL (treat one NULL per swap_id as unique) +CREATE UNIQUE INDEX IF NOT EXISTS idx_monero_addresses_unique_null +ON monero_addresses(swap_id) +WHERE address IS NULL; diff --git a/swap/migrations/20250915123000_add_hlc_to_swap_states.sql b/swap/migrations/20250915123000_add_hlc_to_swap_states.sql new file mode 100644 index 000000000..db90dff9b --- /dev/null +++ b/swap/migrations/20250915123000_add_hlc_to_swap_states.sql @@ -0,0 +1,37 @@ +-- Add HLC counter (using entered_at as logical seconds) to swap_states and backfill + +-- Create new table with HLC columns +CREATE TABLE swap_states_new ( + id INTEGER PRIMARY KEY autoincrement NOT NULL, + swap_id TEXT NOT NULL, + entered_at INTEGER NOT NULL, + state TEXT NOT NULL, + hlc_counter INTEGER NOT NULL +); + +-- Backfill with deterministic counters per (swap_id, entered_at) +INSERT INTO swap_states_new (id, swap_id, entered_at, state, hlc_counter) +SELECT + id, + swap_id, + CAST(strftime('%s', substr(entered_at, 1, 19)) AS INTEGER) AS entered_at_seconds, + state, + ( + ROW_NUMBER() OVER ( + PARTITION BY swap_id, + CAST(strftime('%s', substr(entered_at, 1, 19)) AS INTEGER) + ORDER BY id + ) - 1 + ) AS hlc_counter +FROM swap_states; + +-- Replace old table +DROP TABLE swap_states; +ALTER TABLE swap_states_new RENAME TO swap_states; + +-- Indices for HLC-based uniqueness and performance +CREATE UNIQUE INDEX IF NOT EXISTS swap_states_unique_hlc ON swap_states(swap_id, entered_at, hlc_counter); +CREATE INDEX IF NOT EXISTS idx_swap_states_entered_at ON swap_states(entered_at); +CREATE INDEX IF NOT EXISTS idx_swap_states_swap_id ON swap_states(swap_id); + + diff --git a/swap/src/asb/event_loop.rs b/swap/src/asb/event_loop.rs index 6ae0b3b7e..a84419887 100644 --- a/swap/src/asb/event_loop.rs +++ b/swap/src/asb/event_loop.rs @@ -19,6 +19,7 @@ use libp2p::request_response::{OutboundFailure, OutboundRequestId, ResponseChann use libp2p::swarm::SwarmEvent; use libp2p::{PeerId, Swarm}; use moka::future::Cache; +use time::UtcDateTime; use std::collections::HashMap; use std::convert::TryInto; use std::fmt::Debug; @@ -193,10 +194,10 @@ where let unfinished_swaps = swaps .into_iter() - .filter(|(_swap_id, state)| !state.swap_finished()) - .collect::>(); + .filter(|(_swap_id, state, _entered_at)| !state.swap_finished()) + .collect::>(); - for (swap_id, state) in unfinished_swaps { + for (swap_id, state, _) in unfinished_swaps { let peer_id = match self.db.get_peer_id(swap_id).await { Ok(peer_id) => peer_id, Err(_) => { @@ -539,7 +540,7 @@ where let all_swaps = self.db.all().await?; let alice_states: Vec<_> = all_swaps .into_iter() - .filter_map(|(_, state)| match state { + .filter_map(|(_, state, _)| match state { State::Alice(state) => Some(state), _ => None, }) @@ -1017,7 +1018,7 @@ mod tests { let balance = monero::Amount::from_monero(5.0).unwrap(); let reserved_amounts: Vec = vec![]; - let result = unreserved_monero_balance(balance, reserved_amounts.into_iter()); + let result = unreserved_monero_balance(balance, reserved_amounts.into_iter().map(|item| item.reserved_monero())); assert_eq!(result, balance); } diff --git a/swap/src/asb/rpc/server.rs b/swap/src/asb/rpc/server.rs index 2b48d000e..b55e1402a 100644 --- a/swap/src/asb/rpc/server.rs +++ b/swap/src/asb/rpc/server.rs @@ -143,7 +143,7 @@ impl AsbApiServer for RpcImpl { let swaps = swaps .into_iter() - .map(|(swap_id, state)| { + .map(|(swap_id, state, _)| { let state_str = match state { crate::protocol::State::Alice(state) => format!("{state}"), crate::protocol::State::Bob(state) => format!("{state}"), diff --git a/swap/src/cli/api.rs b/swap/src/cli/api.rs index def481304..ca2a975a9 100644 --- a/swap/src/cli/api.rs +++ b/swap/src/cli/api.rs @@ -5,17 +5,23 @@ use crate::cli::api::tauri_bindings::SeedChoice; use crate::cli::command::{Bitcoin, Monero}; use crate::common::tor::{bootstrap_tor_client, create_tor_client}; use crate::common::tracing_util::Format; +use crate::database::eigensync::{EigensyncDatabaseAdapter, EigensyncDocument}; use crate::database::{open_db, AccessMode}; +use crate::libp2p_ext::MultiAddrExt; use crate::network::rendezvous::XmrBtcNamespace; use crate::protocol::Database; -use crate::seed::Seed; +use crate::seed::{self, Seed}; use crate::{bitcoin, common, monero}; use anyhow::{bail, Context as AnyContext, Error, Result}; use arti_client::TorClient; +use eigensync_client::{EigensyncHandle, EigensyncHandleBackgroundSync}; use futures::future::try_join_all; +use libp2p::{Multiaddr, PeerId}; +use tokio_util::task::AbortOnDropHandle; use std::fmt; use std::future::Future; use std::path::{Path, PathBuf}; +use std::str::FromStr; use std::sync::{Arc, Once}; use swap_env::env::{Config as EnvConfig, GetConfig, Mainnet, Testnet}; use swap_fs::system_data_dir; @@ -24,7 +30,6 @@ use tauri_bindings::{ }; use tokio::sync::{broadcast, broadcast::Sender, Mutex as TokioMutex, RwLock}; use tokio::task::JoinHandle; -use tokio_util::task::AbortOnDropHandle; use tor_rtcompat::tokio::TokioRustlsRuntime; use tracing::level_filters::LevelFilter; use tracing::Level; @@ -181,7 +186,6 @@ impl Default for SwapLock { /// For example, the `history` command doesn't require wallet initialization. /// /// Many fields are wrapped in `Arc` for thread-safe sharing. -#[derive(Clone)] pub struct Context { pub db: Arc, pub swap_lock: Arc, @@ -193,6 +197,9 @@ pub struct Context { tor_client: Option>>, #[allow(dead_code)] monero_rpc_pool_handle: Option>, + #[allow(dead_code)] + background_sync_task: Option>, + eigensync: Option>, } /// A conveniant builder struct for [`Context`]. @@ -207,6 +214,7 @@ pub struct ContextBuilder { tor: bool, enable_monero_tor: bool, tauri_handle: Option, + eigensync_server_multiaddr: Option, } impl ContextBuilder { @@ -231,6 +239,7 @@ impl ContextBuilder { tor: false, enable_monero_tor: false, tauri_handle: None, + eigensync_server_multiaddr: None, } } @@ -289,6 +298,12 @@ impl ContextBuilder { self } + /// Configures the Eigensync server multiaddr + pub fn with_eigensync_server(mut self, eigensync_server: impl Into>) -> Self { + self.eigensync_server_multiaddr = eigensync_server.into(); + self + } + /// Takes the builder, initializes the context by initializing the wallets and other components and returns the Context. pub async fn build(self) -> Result { // This is the data directory for the eigenwallet (wallet files) @@ -504,13 +519,43 @@ impl ContextBuilder { (), ); + // Initialize eigensync only if a server address is provided + let mut eigensync_handle: Option>>> = None; + let mut background_sync_task: Option> = None; + + if let Some(addr_str) = self.eigensync_server_multiaddr.as_deref() { + let multiaddr = Multiaddr::from_str(addr_str) + .context("Failed to parse Eigensync server multiaddr")?; + if let Some(peer_id) = multiaddr.extract_peer_id() { + let server_peer_id = PeerId::from_str(peer_id.to_string().as_str())?; + let enc_key = seed.derive_eigensync_secret_key(); + let handle = Arc::new(RwLock::new( + EigensyncHandle::new(multiaddr, server_peer_id, seed.derive_eigensync_identity(), enc_key).await? + )); + background_sync_task = Some(handle.clone().background_sync()); + eigensync_handle = Some(handle); + } else { + tracing::warn!("Eigensync server multiaddr does not contain a peer id"); + }; + } + let db = open_db( data_dir.join("sqlite"), AccessMode::ReadWrite, - self.tauri_handle.clone(), + self.tauri_handle.clone() ) .await?; + let eigensync = if let Some(handle) = eigensync_handle.clone() { + let mut db_adapter = EigensyncDatabaseAdapter::new(handle, db.clone()); + Some(AbortOnDropHandle::new(tokio::task::spawn(async move { + db_adapter.run().await.context("Failed to run eigensync").unwrap(); + }))) + } else { + tracing::info!("Eigensync disabled: no server address provided"); + None + }; + database_progress_handle.finish(); let tauri_handle = &self.tauri_handle.clone(); @@ -582,6 +627,8 @@ impl ContextBuilder { tauri_handle: self.tauri_handle, tor_client: unbootstrapped_tor_client, monero_rpc_pool_handle, + background_sync_task, + eigensync, }; tauri_handle.emit_context_init_progress_event(TauriContextStatusEvent::Available); @@ -618,6 +665,8 @@ impl Context { tauri_handle: None, tor_client: None, monero_rpc_pool_handle: None, + background_sync_task: None, + eigensync: None, } } diff --git a/swap/src/cli/api/request.rs b/swap/src/cli/api/request.rs index 8cb5fb012..f90925e35 100644 --- a/swap/src/cli/api/request.rs +++ b/swap/src/cli/api/request.rs @@ -219,7 +219,8 @@ pub struct GetSwapInfoResponse { pub swap_id: Uuid, pub seller: AliceAddress, pub completed: bool, - pub start_date: String, + #[typeshare(serialized_as = "number")] + pub start_date: i64, #[typeshare(serialized_as = "string")] pub state_name: String, #[typeshare(serialized_as = "number")] @@ -796,7 +797,7 @@ pub async fn get_swap_infos_all(context: Arc) -> Result swap_infos.push(swap_info), Err(error) => { @@ -825,13 +826,13 @@ pub async fn get_swap_info( .db .get_peer_id(args.swap_id) .await - .with_context(|| "Could not get PeerID")?; + .with_context(|| format!("Could not get PeerID for swap {}", args.swap_id))?; let addresses = context .db .get_addresses(peer_id) .await - .with_context(|| "Could not get addressess")?; + .unwrap_or(vec![]); let start_date = context.db.get_swap_start_date(args.swap_id).await?; @@ -884,7 +885,10 @@ pub async fn get_swap_info( let timelock = swap_state.expired_timelocks(bitcoin_wallet.clone()).await?; - let monero_receive_pool = context.db.get_monero_address_pool(args.swap_id).await?; + let monero_receive_pool = context + .db + .get_monero_address_pool(args.swap_id) + .await?; Ok(GetSwapInfoResponse { swap_id: args.swap_id, @@ -893,7 +897,7 @@ pub async fn get_swap_info( addresses: addresses.iter().map(|a| a.to_string()).collect(), }, completed: is_completed, - start_date, + start_date: start_date.unix_timestamp(), state_name: format!("{}", swap_state), xmr_amount, btc_amount, @@ -1321,7 +1325,7 @@ pub async fn cancel_and_refund( pub async fn get_history(context: Arc) -> Result { let swaps = context.db.all().await?; let mut vec: Vec = Vec::new(); - for (swap_id, state) in swaps { + for (swap_id, state, _) in swaps { let state: BobState = state.try_into()?; vec.push(GetHistoryEntry { swap_id, diff --git a/swap/src/cli/api/tauri_bindings.rs b/swap/src/cli/api/tauri_bindings.rs index 6de1f3a7e..293a23a1b 100644 --- a/swap/src/cli/api/tauri_bindings.rs +++ b/swap/src/cli/api/tauri_bindings.rs @@ -1010,6 +1010,8 @@ pub struct TauriSettings { pub use_tor: bool, /// Whether to route Monero wallet traffic through Tor pub enable_monero_tor: bool, + /// Eigensync server Multiaddr + pub eigensync_server_multiaddr: Option, } #[typeshare] diff --git a/swap/src/cli/watcher.rs b/swap/src/cli/watcher.rs index 7374c6229..3f7d7de58 100644 --- a/swap/src/cli/watcher.rs +++ b/swap/src/cli/watcher.rs @@ -160,7 +160,7 @@ impl Watcher { .await? .into_iter() // Filter for BobState - .filter_map(|(uuid, state)| match state { + .filter_map(|(uuid, state, _)| match state { State::Bob(bob_state) => Some((uuid, bob_state)), _ => None, }) diff --git a/swap/src/database.rs b/swap/src/database.rs index 48c60bcbd..ea926bdd9 100644 --- a/swap/src/database.rs +++ b/swap/src/database.rs @@ -13,7 +13,8 @@ use swap_fs::ensure_directory_exists; mod alice; mod bob; -mod sqlite; +pub mod sqlite; +pub mod eigensync; #[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] pub enum Swap { diff --git a/swap/src/database/eigensync.rs b/swap/src/database/eigensync.rs new file mode 100644 index 000000000..e01723dfd --- /dev/null +++ b/swap/src/database/eigensync.rs @@ -0,0 +1,427 @@ +use std::{cmp::Ordering, collections::{HashMap, HashSet}, str::FromStr, sync::Arc, time::Duration}; + +use autosurgeon::{Hydrate, HydrateError, Reconcile, Reconciler}; +use autosurgeon::reconcile::NoKey; +use eigensync_client::EigensyncHandle; +use std::collections::HashSet as StdHashSet; +use libp2p::{Multiaddr, PeerId}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; +use tokio::sync::RwLock; +use uuid::Uuid; +use rust_decimal::Decimal; +use anyhow::anyhow; + +use crate::{database::Swap, monero::{LabeledMoneroAddress, MoneroAddressPool, TransferProof}, protocol::{Database, State}}; + +/// Hybrid Logical Clock timestamp +/// l: logical time in UNIX seconds +/// c: counter to capture causality when logical times are equal +#[derive(Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq, Hash)] +pub struct HlcTimestamp { + l: i64, + c: u32, +} + +impl HlcTimestamp { + pub fn new(logical_seconds: i64, counter: u32) -> Self { + Self { l: logical_seconds, c: counter } + } + + pub fn logical_seconds(&self) -> i64 { + self.l + } + + pub fn counter(&self) -> u32 { + self.c + } +} + +impl Ord for HlcTimestamp { + fn cmp(&self, other: &Self) -> Ordering { + match self.l.cmp(&other.l) { + Ordering::Equal => self.c.cmp(&other.c), + non_eq => non_eq, + } + } +} + +impl PartialOrd for HlcTimestamp { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + + +#[derive(Serialize, Deserialize, Clone, Eq, Hash, PartialEq, Hydrate, Reconcile, Debug)] +struct KeyWrapper(T, String); + +impl KeyWrapper { + fn new(key: T) -> Self { + let json = serde_json::to_string(&key).unwrap(); + Self(key, json) + } +} + +impl AsRef for KeyWrapper { + fn as_ref(&self) -> &str { + &self.1 + } +} + +impl From for KeyWrapper { + fn from(s: String) -> Self { + Self(serde_json::from_str(&s).unwrap(), s) + } +} + +#[derive(Debug, Clone, PartialEq, Default)] +pub struct EigensyncDocument { + // swap_id, swap + states: HashMap, // swap_states table + // peer_addresses table + peer_addresses: HashMap, // (peer_id, address) + // peers table + peers: HashMap, // (swap_id, peer_id) + // monero_addresses table + monero_addresses: HashMap, // (swap_id, address) -> (percentage, label) + // buffered_transfer_proofs table + buffered_transfer_proofs: HashMap, // (swap_id, proof) +} + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +struct MoneroAddressValue(Decimal, String); + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +struct InnerMoneroAddressKey(Uuid, Option); + +type MoneroAddressKey = KeyWrapper; + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +struct InnerStateKey(Uuid, HlcTimestamp); + +type StateKey = KeyWrapper; + +#[derive(Debug, Clone, Eq, Hash, PartialEq, Serialize, Deserialize)] +struct InnerPeerAddressesKey(PeerId, Multiaddr); + +type PeerAddressesKey = KeyWrapper; + +#[derive(Debug, Clone, Reconcile, Hydrate, PartialEq, Default)] +struct EigensyncWire { + states: HashMap, + peer_addresses: HashMap, + peers: HashMap, + monero_addresses: HashMap, + buffered_transfer_proofs: HashMap, +} + +impl From<&EigensyncDocument> for EigensyncWire { + fn from(src: &EigensyncDocument) -> Self { + let peer_addresses = src.peer_addresses.iter().map(|(key, _)| + (KeyWrapper::new(key.clone()), true)) + .collect(); + + let monero_addresses = src.monero_addresses.iter().map(|(key, value)| { + (KeyWrapper::new(key.clone()), serde_json::to_string(value).unwrap()) + }).collect(); + + let states = src.states.iter().map(|(key, state)| { + let state_json = serde_json::to_string(&Swap::from(state.clone())).unwrap(); + (KeyWrapper::new(key.clone()), state_json) + }).collect(); + + let peers = src.peers.iter().map(|(k, v)| (k.to_string(), v.to_string())).collect(); + + let buffered_transfer_proofs = src.buffered_transfer_proofs.iter().map(|(k, v)| (k.to_string(), serde_json::to_string(&v).unwrap())).collect(); + + EigensyncWire { + states, + peer_addresses, + peers, + monero_addresses, + buffered_transfer_proofs, + } + } +} + +impl TryFrom for EigensyncDocument { + type Error = anyhow::Error; + fn try_from(w: EigensyncWire) -> anyhow::Result { + let peer_addresses = w.peer_addresses.into_iter().map(|(k, _)| { + let (peer_id, addr) = (k.0.0, k.0.1); + Ok((InnerPeerAddressesKey(peer_id, addr), ())) + }).collect::>>()?; + + let monero_addresses = w.monero_addresses.into_iter().map(|(k, v)| { + let value: MoneroAddressValue = serde_json::from_str(&v)?; + Ok((k.0, value)) + }).collect::>>()?; + + let states = w + .states + .into_iter() + .map(|(k, v)| { + let swap_id = k.0.0; + let timestamp = k.0.1; + let swap: Swap = serde_json::from_str(&v)?; + let state: State = swap.into(); + + Ok((InnerStateKey(swap_id, timestamp), state)) + }) + .collect::>>()?; + + let peers = w.peers.into_iter().map(|(k, v)| { + let uuid = Uuid::parse_str(&k)?; + let peer_id = PeerId::from_str(&v)?; + Ok((uuid, peer_id)) + }).collect::>>()?; + + let buffered_transfer_proofs = w.buffered_transfer_proofs.into_iter().map(|(k, v)| { + let uuid = Uuid::parse_str(&k)?; + let proof: TransferProof = serde_json::from_str(&v)?; + Ok((uuid, proof)) + }).collect::>>()?; + + Ok(EigensyncDocument { + states, + peer_addresses, + peers, + monero_addresses, + buffered_transfer_proofs, + }) + } +} + +impl Reconcile for EigensyncDocument { + type Key<'a> = NoKey; + + fn reconcile(&self, reconciler: R) -> Result<(), R::Error> { + let wire = EigensyncWire::from(self); + wire.reconcile(reconciler) + } +} + +impl Hydrate for EigensyncDocument { + fn hydrate_map( + doc: &D, + obj: &automerge::ObjId, + ) -> Result { + let wire: EigensyncWire = ::hydrate_map(doc, obj)?; + EigensyncDocument::try_from(wire) + .map_err(|e| HydrateError::unexpected("EigensyncDocument", e.to_string())) + } +} + +pub struct EigensyncDatabaseAdapter { + eigensync_handle: Arc>>, + db: Arc, +} + +impl EigensyncDatabaseAdapter { + pub fn new(eigensync_handle: Arc>>, sqlite_database: Arc) -> Self { + Self { eigensync_handle, db: sqlite_database } + } + + pub async fn run(&mut self) -> anyhow::Result<()> { + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + tracing::info!("Eigensync sync running"); + + if let Err(e) = self.upload_states().await { + tracing::error!("Error uploading states: {:?}", e); + } + + if let Err(e) = self.download_states().await { + tracing::error!("Error downloading states: {:?}", e); + } + } + } + + pub async fn upload_states(&self) -> anyhow::Result<()> { + // get from db -> write into document + let mut new_states = HashMap::new(); + let mut new_peers = HashMap::new(); + let mut new_addresses = HashMap::new(); + let mut new_proof = HashMap::new(); + let mut new_peer_addresses = HashMap::new(); + + let mut document_lock = self.eigensync_handle.write().await; + let document_state = document_lock.get_document_state()?; + let swap_states = document_state.states; + let db_address_pools = self.db.get_monero_address_pools().await?; + + for (swap_id, address_pool) in db_address_pools { + let mut temp_monero_addresses = HashMap::new(); + for labeled in address_pool.iter() { + let address_opt_str = labeled.address().map(|a| a.to_string()); + let percentage = labeled.percentage(); + let label = labeled.label().to_string(); + temp_monero_addresses.insert(InnerMoneroAddressKey(swap_id, address_opt_str), MoneroAddressValue(percentage, label)); + } + + new_addresses.extend(temp_monero_addresses); + } + + // Use stored HLCs from DB + let db_states_all = self.db.get_all_states_with_hlc().await?; + for (swap_id, state, l_seconds, counter) in db_states_all.into_iter() { + let hlc = HlcTimestamp::new(l_seconds, counter); + if swap_states.contains_key(&InnerStateKey(swap_id, hlc)) { + continue; + } + + let peer_id = self.db.get_peer_id(swap_id).await?; + let proof = self.db.get_buffered_transfer_proof(swap_id).await?; + + new_states.insert(InnerStateKey(swap_id, hlc), state); + new_peers.insert(swap_id, peer_id); + if let Some(proof) = proof { + new_proof.insert(swap_id, proof); + } + } + + let document_peer_addresses = document_state.peer_addresses; + for (peer_id, addresses) in self.db.get_all_peer_addresses().await? { + for address in addresses { + let key = InnerPeerAddressesKey(peer_id, address); + if !document_peer_addresses.contains_key(&key) { + new_peer_addresses.insert(key, ()); + } + } + } + + document_lock.modify(|document| { + document.peers.extend(new_peers.clone()); + document.states.extend(new_states.clone()); + document.monero_addresses.extend(new_addresses.clone()); + document.buffered_transfer_proofs.extend(new_proof.clone()); + document.peer_addresses.extend(new_peer_addresses.clone()); + Ok(()) + })?; + + Ok(()) + } + + pub async fn download_states(&self) -> anyhow::Result<()> { + // get from document -> write into db + let document = self.eigensync_handle.write().await.get_document_state().expect("Eigensync document should be present"); + + // States table + let document_states: HashSet = document.states.keys().cloned().collect(); + let db_states_hlc = self.db.get_all_states_with_hlc().await?; + let db_hlc_keys: StdHashSet<(Uuid, i64, u32)> = db_states_hlc + .iter() + .map(|(id, _state, l_seconds, c)| (*id, *l_seconds, *c)) + .collect(); + let mut document_states = document_states.into_iter().collect::>(); + document_states.sort_by_key(|state_key| state_key.1); + + for state_key in document_states { + let (swap_id, hlc_ts) = (state_key.0, state_key.1); + + let document_state: State = document + .states + .get(&state_key) + .ok_or_else(|| anyhow!("State not found for key"))? + .clone(); + + let l_seconds = hlc_ts.logical_seconds(); + let counter = hlc_ts.counter(); + if db_hlc_keys.contains(&(swap_id, l_seconds, counter)) { + continue; + } + + let swap_uuid = swap_id; + + if let Err(e) = self.db.insert_existing_state_with_hlc(swap_uuid, document_state, l_seconds, counter).await { + tracing::error!("Error inserting existing state: {:?}", e); + } + } + + //peer_addresses table + let document_peer_addresses: HashSet = document.peer_addresses.keys().cloned().collect(); + let db_peer_addresses = self.db.get_all_peer_addresses().await?; + for peer_address_key in document_peer_addresses { + let (peer_id, address) = (peer_address_key.0, peer_address_key.1); + + if db_peer_addresses.iter().any(|(p, a)| p == &peer_id && a.contains(&address)) { + continue; + } + + self.db.insert_address(peer_id, address).await?; + } + + //peers table + let document_peers: HashSet = document.peers.keys().cloned().collect(); + for swap_id in document_peers { + let document_peer = document + .peers + .get(&swap_id) + .ok_or_else(|| anyhow!("Peer not found for key"))? + .clone(); + + if let Ok(peer_id) = self.db.get_peer_id(swap_id).await { + if peer_id == document_peer { + continue; + } + } + + self.db.insert_peer_id(swap_id, document_peer).await?; + } + + //monero_addresses table + let document_monero_addresses: HashSet = document.monero_addresses.keys().cloned().collect(); + let db_monero_addresses = self.db.get_monero_address_pools().await?; + for monero_address_key in document_monero_addresses { + let (swap_id, address) = (monero_address_key.0.clone(), monero_address_key.1.clone()); + + if db_monero_addresses.iter().any(|(s, pool)| { + s == &swap_id && pool.addresses().iter().any(|addr_opt| { + match (&address, addr_opt) { + (Some(addr_str), Some(addr)) => addr_str == &addr.to_string(), + (None, None) => true, // Both are internal addresses + _ => false, + } + }) + }) { + continue; + } + + // Get the percentage and label from the document + let MoneroAddressValue(percentage, label) = document.monero_addresses.get(&monero_address_key) + .ok_or_else(|| anyhow!("Monero address data not found"))?; + + // Create a MoneroAddressPool with the address data + let labeled = match &address { + Some(addr_str) => { + let addr = monero::Address::from_str(addr_str)?; + LabeledMoneroAddress::with_address(addr, *percentage, label.clone())? + } + None => { + LabeledMoneroAddress::with_internal_address(*percentage, label.clone())? + } + }; + + let address_pool = MoneroAddressPool::new(vec![labeled]); + + self.db.insert_monero_address_pool(swap_id, address_pool).await?; + } + + //buffered_transfer_proofs table + let document_buffered_transfer_proofs: HashSet = document.buffered_transfer_proofs.keys().cloned().collect(); + for swap_id in document_buffered_transfer_proofs { + let db_buffered_transfer_proof = self.db.get_buffered_transfer_proof(swap_id).await?; + let document_buffered_transfer_proof = document.buffered_transfer_proofs.get(&swap_id) + .ok_or_else(|| anyhow!("Buffered transfer proof not found for key"))? + .clone(); + + if db_buffered_transfer_proof == Some(document_buffered_transfer_proof.clone()) { + continue; + } + + self.db.insert_buffered_transfer_proof(swap_id, document_buffered_transfer_proof).await?; + } + + Ok(()) + } +} \ No newline at end of file diff --git a/swap/src/database/sqlite.rs b/swap/src/database/sqlite.rs index ec2aa4790..3a56c2997 100644 --- a/swap/src/database/sqlite.rs +++ b/swap/src/database/sqlite.rs @@ -7,11 +7,14 @@ use crate::monero::TransferProof; use crate::protocol::{Database, State}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; + use libp2p::{Multiaddr, PeerId}; use rust_decimal::prelude::{FromPrimitive, ToPrimitive}; use rust_decimal::Decimal; use sqlx::sqlite::{Sqlite, SqliteConnectOptions}; use sqlx::{ConnectOptions, Pool, SqlitePool}; +use time::UtcDateTime; +use std::collections::HashMap; use std::path::Path; use std::str::FromStr; use time::OffsetDateTime; @@ -120,7 +123,7 @@ impl Database for SqliteDatabase { sqlx::query!( r#" - insert into monero_addresses ( + insert or ignore into monero_addresses ( swap_id, address, percentage, @@ -154,10 +157,8 @@ impl Database for SqliteDatabase { .await?; if row.is_empty() { - return Err(anyhow!( - "No Monero address pool found for swap ID: {}", - swap_id - )); + // Return error + return Err(anyhow::anyhow!("No monero address pool found for swap: {}", swap_id)); } let addresses = row @@ -183,6 +184,46 @@ impl Database for SqliteDatabase { Ok(MoneroAddressPool::new(addresses)) } + async fn get_monero_address_pools(&self) -> Result> { + let rows = sqlx::query!( + r#" + SELECT swap_id, address, percentage, label + FROM monero_addresses + ORDER BY swap_id + "# + ) + .fetch_all(&self.pool) + .await?; + + let mut pools: HashMap> = HashMap::new(); + + for row in rows.iter() { + let swap_id = Uuid::from_str(&row.swap_id)?; + let address: Option = row + .address + .clone() + .map(|address| address.parse()) + .transpose()?; + let percentage = Decimal::from_f64(row.percentage).expect("Invalid percentage"); + let label = row.label.clone(); + let labeled_address = match address { + Some(address) => LabeledMoneroAddress::with_address(address, percentage, label) + .map_err(|e| anyhow::anyhow!("Invalid percentage in database: {}", e))?, + None => LabeledMoneroAddress::with_internal_address(percentage, label) + .map_err(|e| anyhow::anyhow!("Invalid percentage in database: {}", e))?, + }; + + pools.entry(swap_id).or_insert_with(Vec::new).push(labeled_address); + } + + let result = pools + .into_iter() + .map(|(swap_id, addresses)| (swap_id, MoneroAddressPool::new(addresses))) + .collect(); + + Ok(result) + } + async fn get_monero_addresses(&self) -> Result> { let rows = sqlx::query!("SELECT DISTINCT address FROM monero_addresses WHERE address IS NOT NULL") @@ -278,12 +319,12 @@ impl Database for SqliteDatabase { Ok(peer_map.into_iter().collect()) } - async fn get_swap_start_date(&self, swap_id: Uuid) -> Result { + async fn get_swap_start_date(&self, swap_id: Uuid) -> Result { let swap_id = swap_id.to_string(); let row = sqlx::query!( r#" - SELECT min(entered_at) as start_date + SELECT min(entered_at) as "start_date: i64" FROM swap_states WHERE swap_id = ? "#, @@ -292,28 +333,45 @@ impl Database for SqliteDatabase { .fetch_one(&self.pool) .await?; - row.start_date - .ok_or_else(|| anyhow!("Could not get swap start date")) + let start_date = row.start_date.ok_or_else(|| anyhow!("Could not get swap start date"))?; + let start_date = UtcDateTime::from_unix_timestamp(start_date)?; + Ok(start_date) } async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()> { - let entered_at = OffsetDateTime::now_utc(); - let swap = serde_json::to_string(&Swap::from(state))?; - let entered_at = entered_at.to_string(); + let now_seconds = OffsetDateTime::now_utc().unix_timestamp(); let swap_id_str = swap_id.to_string(); + // Compute next counter for this (swap_id, entered_at) + let row = sqlx::query!( + r#" + SELECT COALESCE(MAX(hlc_counter), -1) + 1 as "next_counter: i64" + FROM swap_states + WHERE swap_id = ? AND entered_at = ? + "#, + swap_id_str, + now_seconds + ) + .fetch_one(&self.pool) + .await?; + + let next_counter = row.next_counter; + + let entered_at = now_seconds; sqlx::query!( r#" - insert into swap_states ( + INSERT INTO swap_states ( swap_id, entered_at, - state - ) values (?, ?, ?); - "#, + state, + hlc_counter + ) VALUES (?, ?, ?, ?); + "#, swap_id_str, entered_at, - swap + swap, + next_counter ) .execute(&self.pool) .await?; @@ -325,6 +383,48 @@ impl Database for SqliteDatabase { Ok(()) } + async fn insert_existing_state(&self, swap_id: Uuid, state: State, entered_at: UtcDateTime) -> Result<()> { + let swap = serde_json::to_string(&Swap::from(state))?; + let swap_id_str = swap_id.to_string(); + + let entered_at = entered_at.unix_timestamp(); + let row = sqlx::query!( + r#" + SELECT COALESCE(MAX(hlc_counter), -1) + 1 as "next_counter: i64" + FROM swap_states + WHERE swap_id = ? AND entered_at = ? + "#, + swap_id_str, + entered_at + ) + .fetch_one(&self.pool) + .await?; + let next_counter = row.next_counter; + + sqlx::query!( + r#" + insert into swap_states ( + swap_id, + entered_at, + state, + hlc_counter + ) values (?, ?, ?, ?); + "#, + swap_id_str, + entered_at, + swap, + next_counter + ) + .execute(&self.pool) + .await?; + + tracing::info!("inserted existing state"); + + self.tauri_handle.emit_swap_state_change_event(swap_id); + + Ok(()) + } + async fn get_state(&self, swap_id: Uuid) -> Result { let swap_id = swap_id.to_string(); let row = sqlx::query!( @@ -349,12 +449,99 @@ impl Database for SqliteDatabase { Ok(swap.into()) } - async fn all(&self) -> Result> { + async fn get_all_states(&self) -> Result> { + let rows = sqlx::query!( + r#" + SELECT swap_id, state, entered_at as "entered_at: i64" + FROM swap_states + ORDER BY entered_at DESC + "# + ) + .fetch_all(&self.pool) + .await?; + + let result = rows + .iter() + .filter_map(|row| { + let (swap_id, state, entered_at) = (&row.swap_id, &row.state, &row.entered_at); + + let swap_id = Uuid::from_str(swap_id).ok().expect("Failed to parse swap_id"); + let state = serde_json::from_str::(state).ok().expect("Failed to parse state"); + let entered_at = UtcDateTime::from_unix_timestamp(*entered_at).ok().expect("Failed to parse entered_at"); + + Some((swap_id, State::from(state), entered_at)) + }) + .collect::>(); + + Ok(result) + } + + async fn get_all_states_with_hlc(&self) -> Result> { let rows = sqlx::query!( r#" - SELECT swap_id, state + SELECT swap_id, state, entered_at as "entered_at: i64", hlc_counter as "hlc_counter: i64" + FROM swap_states + ORDER BY entered_at DESC, hlc_counter DESC + "# + ) + .fetch_all(&self.pool) + .await?; + + let result = rows + .iter() + .filter_map(|row| { + let swap_id = Uuid::from_str(&row.swap_id).ok()?; + let state = serde_json::from_str::(&row.state).ok()?; + let counter_u32: u32 = row.hlc_counter.try_into().ok()?; + Some((swap_id, State::from(state), row.entered_at, counter_u32)) + }) + .collect::>(); + + Ok(result) + } + + async fn insert_existing_state_with_hlc( + &self, + swap_id: Uuid, + state: State, + hlc_l_seconds: i64, + hlc_counter: u32, + ) -> Result<()> { + let swap = serde_json::to_string(&Swap::from(state))?; + let swap_id_str = swap_id.to_string(); + // Store the provided remote event HLC unchanged; local clock will advance implicitly + // because we derive it from DB max when generating the next local event. + + sqlx::query!( + r#" + INSERT OR IGNORE INTO swap_states ( + swap_id, + entered_at, + state, + hlc_counter + ) VALUES (?, ?, ?, ?); + "#, + swap_id_str, + hlc_l_seconds, + swap, + hlc_counter + ) + .execute(&self.pool) + .await?; + + + tracing::info!("inserted existing state"); + + + Ok(()) + } + + async fn all(&self) -> Result> { + let rows = sqlx::query!( + r#" + SELECT swap_id, state, entered_at as "entered_at: i64" FROM ( - SELECT max(id), swap_id, state + SELECT max(id), swap_id, state, entered_at FROM swap_states GROUP BY swap_id ) @@ -366,8 +553,8 @@ impl Database for SqliteDatabase { let result = rows .iter() .filter_map(|row| { - let (Some(swap_id), Some(state)) = (&row.swap_id, &row.state) else { - tracing::error!("Row didn't contain state or swap_id when it should have"); + let (Some(swap_id), Some(state), Some(entered_at)) = (&row.swap_id, &row.state, &row.entered_at) else { + tracing::error!("Row didn't contain state, swap_id or entered_at when it should have"); return None; }; @@ -386,9 +573,18 @@ impl Database for SqliteDatabase { } }; - Some((swap_id, state)) + let entered_at = UtcDateTime::from_unix_timestamp(*entered_at).ok().expect("Failed to parse entered_at"); + + Some((swap_id, state, entered_at)) }) - .collect::>(); + .collect::>(); + + // if let Some(eigensync_handle) = &self.eigensync_handle { + // eigensync_handle.write().await.save_updates_local(|document| { + // document.states.insert(swap_id.clone(), swap.to_string()); + // Ok(()) + // })?; + // } Ok(result) } @@ -396,8 +592,6 @@ impl Database for SqliteDatabase { async fn get_states(&self, swap_id: Uuid) -> Result> { let swap_id = swap_id.to_string(); - // TODO: We should use query! instead of query here to allow for at-compile-time validation - // I didn't manage to generate the mappings for the query! macro because of problems with sqlx-cli let rows = sqlx::query!( r#" SELECT state @@ -421,7 +615,6 @@ impl Database for SqliteDatabase { Ok(state) }) .collect::>>(); - result } @@ -526,10 +719,17 @@ mod tests { assert_eq!(latest_loaded.len(), 2); - assert!(latest_loaded.contains(&(swap_id_1, state_2))); - assert!(latest_loaded.contains(&(swap_id_2, state_3))); - - assert!(!latest_loaded.contains(&(swap_id_1, state_1))); + // Check that the correct states are present for each swap_id + let swap_1_states: Vec<_> = latest_loaded.iter().filter(|(id, _, _)| *id == swap_id_1).collect(); + let swap_2_states: Vec<_> = latest_loaded.iter().filter(|(id, _, _)| *id == swap_id_2).collect(); + + assert_eq!(swap_1_states.len(), 1); + assert_eq!(swap_2_states.len(), 1); + assert_eq!(swap_1_states[0].1, state_2); + assert_eq!(swap_2_states[0].1, state_3); + + // Verify that state_1 is not the latest state for swap_id_1 + assert_ne!(swap_1_states[0].1, state_1); } #[tokio::test] @@ -634,6 +834,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_hlc_same_second_sequencing_existing() -> Result<()> { + let db = setup_test_db().await?; + + let swap_id = Uuid::new_v4(); + let entered_at_seconds: i64 = 1_700_000_000; + let entered_at = UtcDateTime::from_unix_timestamp(entered_at_seconds) + .ok() + .expect("valid timestamp"); + + let state_a = State::Alice(AliceState::BtcRedeemed); + let state_b = State::Alice(AliceState::SafelyAborted); + + db.insert_existing_state(swap_id, state_a, entered_at).await?; + db.insert_existing_state(swap_id, state_b, entered_at).await?; + + let all = db.get_all_states_with_hlc().await?; + let mut counters: Vec = all + .into_iter() + .filter(|(id, _s, l_seconds, _c)| *id == swap_id && *l_seconds == entered_at_seconds) + .map(|(_id, _s, _l_seconds, c)| c) + .collect(); + counters.sort_unstable(); + + assert_eq!(counters, vec![0, 1]); + Ok(()) + } + + #[tokio::test] + async fn test_hlc_idempotent_replay_existing_state_with_hlc() -> Result<()> { + let db = setup_test_db().await?; + + let swap_id = Uuid::new_v4(); + let l_seconds: i64 = 1_700_000_123; + let counter: u32 = 0; + + let first_state = State::Alice(AliceState::BtcRedeemed); + let second_state = State::Alice(AliceState::SafelyAborted); + + // First insert should create the row + db.insert_existing_state_with_hlc(swap_id, first_state.clone(), l_seconds, counter) + .await?; + // Second insert with same (swap_id, l_seconds, counter) should be ignored + db.insert_existing_state_with_hlc(swap_id, second_state, l_seconds, counter) + .await?; + + let all = db.get_all_states_with_hlc().await?; + let matching: Vec<(Uuid, State, i64, u32)> = all + .into_iter() + .filter(|(id, _s, l, c)| *id == swap_id && *l == l_seconds && *c == counter) + .collect(); + + assert_eq!(matching.len(), 1, "duplicate insert should be ignored"); + // Ensure the stored state is the first one + assert_eq!(matching[0].1, first_state); + Ok(()) + } + async fn setup_test_db() -> Result { let dir: TempDir = tempdir().unwrap(); let temp_db = dir.path().join("tempdb"); diff --git a/swap/src/monero.rs b/swap/src/monero.rs index 60086a9c7..9f6bff391 100644 --- a/swap/src/monero.rs +++ b/swap/src/monero.rs @@ -666,7 +666,7 @@ mod tests { use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] - pub struct MoneroPrivateKey(#[serde(with = "monero_private_key")] crate::monero::PrivateKey); + pub struct MoneroPrivateKey(#[serde(with = "swap_serde::monero::private_key")] crate::monero::PrivateKey); #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct MoneroAmount(#[serde(with = "monero_amount")] crate::monero::Amount); diff --git a/swap/src/protocol.rs b/swap/src/protocol.rs index ae0ca37e8..8ea8fd2de 100644 --- a/swap/src/protocol.rs +++ b/swap/src/protocol.rs @@ -12,6 +12,7 @@ use serde::{Deserialize, Serialize}; use sha2::Sha256; use sigma_fun::ext::dl_secp256k1_ed25519_eq::{CrossCurveDLEQ, CrossCurveDLEQProof}; use sigma_fun::HashTranscript; +use time::UtcDateTime; use std::convert::TryInto; use uuid::Uuid; @@ -146,15 +147,19 @@ pub trait Database { address: MoneroAddressPool, ) -> Result<()>; async fn get_monero_address_pool(&self, swap_id: Uuid) -> Result; + async fn get_monero_address_pools(&self) -> Result>; async fn get_monero_addresses(&self) -> Result>; async fn insert_address(&self, peer_id: PeerId, address: Multiaddr) -> Result<()>; async fn get_addresses(&self, peer_id: PeerId) -> Result>; async fn get_all_peer_addresses(&self) -> Result)>>; - async fn get_swap_start_date(&self, swap_id: Uuid) -> Result; + async fn get_swap_start_date(&self, swap_id: Uuid) -> Result; async fn insert_latest_state(&self, swap_id: Uuid, state: State) -> Result<()>; + async fn insert_existing_state(&self, swap_id: Uuid, state: State, entered_at: UtcDateTime) -> Result<()>; async fn get_state(&self, swap_id: Uuid) -> Result; async fn get_states(&self, swap_id: Uuid) -> Result>; - async fn all(&self) -> Result>; + async fn all(&self) -> Result>; + async fn get_all_states(&self) -> Result>; + async fn get_all_states_with_hlc(&self) -> Result>; async fn insert_buffered_transfer_proof( &self, swap_id: Uuid, @@ -164,4 +169,12 @@ pub trait Database { &self, swap_id: Uuid, ) -> Result>; + + async fn insert_existing_state_with_hlc( + &self, + swap_id: Uuid, + state: State, + hlc_l_millis: i64, + hlc_counter: u32, + ) -> Result<()>; } diff --git a/swap/src/seed.rs b/swap/src/seed.rs index 95fba6390..9a9bd3248 100644 --- a/swap/src/seed.rs +++ b/swap/src/seed.rs @@ -74,6 +74,11 @@ impl Seed { identity::Keypair::ed25519_from_bytes(bytes).expect("we always pass 32 bytes") } + pub fn derive_eigensync_identity(&self) -> identity::Keypair { + let bytes = self.derive(b"NETWORK").derive(b"EIGENSYNC_IDENTITY").bytes(); + identity::Keypair::ed25519_from_bytes(bytes).expect("we always pass 32 bytes") + } + /// Create seed from a Monero wallet mnemonic string pub fn from_mnemonic(mnemonic: String) -> Result { let monero_seed = MoneroSeed::from_string(Language::English, Zeroizing::new(mnemonic)) @@ -157,6 +162,11 @@ impl Seed { Ok(()) } + + pub fn derive_eigensync_secret_key(&self) -> [u8; 32] { + // Alias for clarity; use this for XChaCha20Poly1305 + self.derive(b"NETWORK").derive(b"EIGENSYNC_ENCRYPTION_KEY").bytes() + } } impl fmt::Debug for Seed {