-
Notifications
You must be signed in to change notification settings - Fork 26
feat: eigensync #480
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feat: eigensync #480
Conversation
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the ✨ Finishing Touches🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
eigensync/src/client/database.rs
Outdated
} | ||
|
||
/// Load an Automerge document | ||
pub async fn load_document(&self, document_id: &str) -> Result<Option<Vec<u8>>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should return an already parsed AutoCommit
(see AutoCommit::load
)
eigensync/src/client/document.rs
Outdated
self.documents.insert(document_id.to_string(), doc); | ||
} | ||
|
||
Ok(self.documents.get_mut(document_id).unwrap()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for testing unwrap
is ok, but when we merge this should be .expect("doc to exist because we just inserted it")
eigensync/src/client/document.rs
Outdated
|
||
let doc = self.get_or_create_document(document_id)?; | ||
|
||
for change in changes { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AutoCommit::apply_changes
does this for us
eigensync/src/client/document.rs
Outdated
|
||
/// Manager for Automerge documents | ||
pub struct DocumentManager { | ||
documents: HashMap<String, AutoCommit>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We probably want to use a more specific type as our document key. pub struct DocumentId(pub String)
for example. This ensures we don't accidentally mistake a values that's not a documentId as an id. Any reason we wouldn't use Uuid
?
eigensync/src/client/document.rs
Outdated
} | ||
|
||
/// Get document heads | ||
pub fn get_heads(&mut self, document_id: &str) -> Vec<ChangeHash> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should return Option<Vec<ChangeHash>>
and not an empty vector but None
if there's no such document
eigensync/src/server/database.rs
Outdated
&self, | ||
peer_id: PeerId, | ||
actor_id: ActorId, | ||
document_id: &str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, we want a more specific Id type, possibly uuid
eigensync/src/server/database.rs
Outdated
peer_id: PeerId, | ||
actor_id: ActorId, | ||
document_id: &str, | ||
patch_data: &[u8], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parse the patch before passing it to this function -> this function should take a Change
type (I think, I don't think we're supposed to use Patch
)
eigensync/src/server/database.rs
Outdated
peer_id: PeerId, | ||
document_id: &str, | ||
since_sequence: Option<u64>, | ||
) -> Result<Vec<(u64, Vec<u8>)>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here too, parse the bytes before returning it from the function
I haven't looked at this thoroughly yet but in case this wasn't clear. The data storage flow should be like this: Server <> Eigensync Client <> GUI The Eigensync Client stores the entire document in a database. The GUI also stores all the states (but not the entire document) in its own database. The Eigensync Client should function entirely without depending on the GUI (to avoid circular dependencies). The GUI then polls states from the Eigensync Client and pushes new states into the document. The Eigensync Client takes care of syncing that with the server. |
eigensync/src/metrics.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should ignore this for now
eigensync/src/protocol.rs
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of this is not necessary. Have a look at our implementation of the cooperative redeem protocol (link). Basically, only keep the Request and Response struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably you should delete everything except the request and response structs (and of those, only keep the GetChanges SubmitChanges (and Error for the Response) variants). Then use the same pattern we use for cooperative redeem.
Still to-do:
Rough sketch for the enum ChannelRequest {
Sync {
current_changes: Vec<SerializedChange>,
response_channel: OneshotChannel<Result<Vec<SerializedChanges>, String>>
}
}
struct Handle<T> {
channel: UnboundedSender<ChannelRequest>
_maker: PhantomData<T>
}
impl<T> Handle<T> {
pub async fn sync(&mut self, state: &T) -> Result<T> {
let (sender, receiver) = oneshot::channel();
self.channel.send(ChannelRequest::Sync { current_changes: self.document.changes(), sender }).await;
let new_changes = receiver.recv().await.context("channel didn't respond")??;
self.document.apply(new_changes).context("couldn't apply changes we got from server - are they corrupted?")?;
return hydrate(&self.document).context("couldn't hydrate specified type from document after updating - did we get corrupted?");
}
}
After that, there's of course the encryption we should implement:
|
dispatch(setEigensyncServer(newServer)); | ||
}; | ||
|
||
const isValidMultiaddr = (addr: string) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not implement this validation logic here like this.
We have the isValidMultiAddressWithPeerId
function in src-gui/src/utils/parseUtils.ts
which properly parses and checks the validity of multi addresses.
eigensync/src/database.rs
Outdated
} | ||
|
||
let db_path = data_dir.join("changes"); | ||
let database_url = format!("sqlite:{}?mode=rwc", db_path.display()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use SqlitePoolOptions
instead of encoding them into a string
swap/src/cli/api/request.rs
Outdated
pub punish_timelock: PunishTimelock, | ||
pub timelock: Option<ExpiredTimelocks>, | ||
pub monero_receive_pool: MoneroAddressPool, | ||
pub monero_receive_pool: Option<MoneroAddressPool>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Convert this back to MoneroAddressPool
from Option<MoneroAddressPool>
We only changed this for testing purposes.
eigensync/src/lib.rs
Outdated
|
||
pub fn sync_with_server(&mut self, request: ChannelRequest, server_id: PeerId) { | ||
let server_request = ServerRequest::UploadChangesToServer { encrypted_changes: request.encrypted_changes }; | ||
match &server_request { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This match isn't necessary, you just created the server_request.
swap/src/cli/api/tauri_bindings.rs
Outdated
/// Whether to route Monero wallet traffic through Tor | ||
pub enable_monero_tor: bool, | ||
/// Eigensync server Multiaddr | ||
pub eigensync_server_multiaddr: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change to Option<String>
which translates to string | null
in typescript (auto generated by typeshare)
swap/src/cli/api.rs
Outdated
tor: false, | ||
enable_monero_tor: false, | ||
tauri_handle: None, | ||
eigensync_server_multiaddr: "/ip4/127.0.0.1/tcp/3333".to_string(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default to None
eigensync/src/lib.rs
Outdated
pub async fn run(&mut self, server_id: PeerId) -> anyhow::Result<()> { | ||
loop { | ||
tokio::select! { | ||
event = self.swarm.select_next_some() => handle_event(event, server_id, &mut self.swarm, &mut self.response_map, self.connection_established.take()).await?, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't propagate the error here. Instead, match on the result, log the error if there is one, and then just continue the loop. we want the loop to continue running. This also enables much more succinct error handling in handle_event
swap/src/cli/api.rs
Outdated
|
||
/// Configures the Eigensync server multiaddr | ||
pub fn with_eigensync_server(mut self, eigensync_server: impl Into<Option<String>>) -> Self { | ||
self.eigensync_server_multiaddr = eigensync_server.into().unwrap().to_string(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Never call .unwrap
unless we can be entirely sure the String will be a valid Multiaddress
eigensync/src/lib.rs
Outdated
response, | ||
} => match response { | ||
Response::NewChanges { changes } => { | ||
let sender = match response_map.remove(&request_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Start using the ?
operator in handle_event
. This whole match statement could be: .context(format!("No sender for request id {:?}", request_id))?
swap/src/cli/api.rs
Outdated
); | ||
|
||
let multiaddr = Multiaddr::from_str(self.eigensync_server_multiaddr.as_ref()).context("Failed to parse Eigensync server multiaddr")?; | ||
let server_peer_id = PeerId::from_str("12D3KooWQsAFHUm32ThqfQRJhtcc57qqkYckSu8JkMsbGKkwTS6p")?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PeerId
has to be part of the multiaddr, not hardcoded
eigensync/src/lib.rs
Outdated
let _ = sender.send(Ok(changes)); | ||
}, | ||
Response::Error { reason } => { | ||
let sender = match response_map.remove(&request_id) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, too
eigensync/src/lib.rs
Outdated
} | ||
}; | ||
|
||
let _ = sender.send(Ok(changes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't ignore the result
swap/src/cli/api.rs
Outdated
|
||
let mut db_adapter = EigensyncDatabaseAdapter::new(eigensync_handle.clone(), db.clone()); | ||
|
||
tracing::info!("opened db"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ambigious log message, better to remove
eigensync/src/lib.rs
Outdated
channel, | ||
request_id, | ||
} => { | ||
tracing::error!("Received request of id {:?}", request_id); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this to something like "Received the request when we're the client"
eigensync/src/lib.rs
Outdated
let sender = match response_map.remove(&request_id) { | ||
Some(sender) => sender, | ||
None => { | ||
tracing::error!("No sender for request id {:?}", request_id); | ||
return Ok(()); | ||
} | ||
}; | ||
|
||
let _ = sender.send(Err(error.to_string())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apply the better error handling here too
swap/src/database/eigensync.rs
Outdated
#[derive(Debug, Clone, Reconcile, Hydrate, PartialEq, Default)] | ||
struct EigensyncWire { | ||
states: HashMap<StateKey, String>, | ||
// encode (peer_id, addr) -> "peer_id|addr", unit value as bool true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these comments are not up to date anymore?
We are not serialiazing as string anymore
swap/src/database/eigensync.rs
Outdated
peer_addresses: HashMap<PeerAddressesKey, bool>, | ||
// swap_id -> peer_id | ||
peers: HashMap<String, String>, | ||
// encode (swap_id, address?) -> "swap_id|address_or_-"; store (Decimal, String) as (String, String) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
these comments are not up to date anymore?
We are not serialiazing as string anymore
eigensync/src/lib.rs
Outdated
let _ = sender.send(()); | ||
} | ||
}, | ||
other => tracing::error!("Received event: {:?}", other), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Change this to debug
swap/src/database/eigensync.rs
Outdated
let timestamp = k.0.1; | ||
let swap: Swap = serde_json::from_str(&v)?; | ||
let state: State = swap.into(); | ||
// convert to utc date time from string like "2025-07-28 15:23:12.0 +00" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this comments are not up to date anymore?
we aren't using timestamps like this anymore
swap/src/database/eigensync.rs
Outdated
use eigensync::EigensyncHandle; | ||
use libp2p::{Multiaddr, PeerId}; | ||
use serde::{de::DeserializeOwned, Deserialize, Serialize}; | ||
// serde kept via Cargo features; no direct derives used here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove claude / gpt comments
swap/src/database/eigensync.rs
Outdated
pub async fn run(&mut self) -> anyhow::Result<()> { | ||
loop { | ||
tokio::time::sleep(Duration::from_secs(1)).await; | ||
tracing::info!("running eigensync sync"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: capitalize
swap/src/database/eigensync.rs
Outdated
let mut new_peer_addresses = HashMap::new(); | ||
|
||
let mut document_lock = self.eigensync_handle.write().await; | ||
let document_state = document_lock.get_document_state().expect("Eigensync document should be present"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fail properly, do not unwrap
eigensync/src/lib.rs
Outdated
|
||
// Initial pull from server. If it fails, continue (we may be offline). | ||
if let Err(e) = handle.sync_with_server().await { | ||
tracing::error!("Initial eigensync pull failed: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also say that we're continuing anyway
eigensync/src/lib.rs
Outdated
|
||
// Seed default only if the document is still empty after the pull. | ||
if handle.document.get_changes(&[]).is_empty() { | ||
let state = T::default(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This means that changing the default value of a type may break compatibility with the automerge history. That's not good. We will have to come up with a better solution to this.
eigensync/src/protocol.rs
Outdated
} | ||
|
||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] | ||
pub struct SignedEncryptedSerializedChange(Vec<u8>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shorten this name to EncryptedChange
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Split the eigensync
crate into the core protocol and a crate for the server/client each:
eigensync-protocol
which contains the protocol /ServerRequest
/Response
types and other common types/functions when they're used by server + clienteigensync-client
which contains theSyncLoop
,ChannelRequest
types andEigensyncHandle
and everything that goes along with that. This crates useseigensync-protocol
as a dependency.eigensync-server
which also useseigensync-protocol
and contains mostly whats in thebin/server.rs
file currently.
This is an eigensync prototype