@@ -32,6 +32,7 @@ import (
3232 "github.com/status-im/status-go/protocol/common"
3333 "github.com/status-im/status-go/protocol/protobuf"
3434 v1protocol "github.com/status-im/status-go/protocol/v1"
35+ sds "github.com/waku-org/sds-go-bindings/sds"
3536)
3637
3738// Whisper message properties.
@@ -65,6 +66,8 @@ type MessageSender struct {
6566
6667 // handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
6768 handleSharedSecrets func ([]* sharedsecret.Secret ) error
69+
70+ reliabilityManager * sds.ReliabilityManager
6871}
6972
7073func NewMessageSender (
@@ -75,16 +78,40 @@ func NewMessageSender(
7578 enc * encryption.Protocol ,
7679 logger * zap.Logger ,
7780) (* MessageSender , error ) {
81+ sds .SetLogger (logger )
82+ reliabilityManager , err := sds .NewReliabilityManager ()
83+ if err != nil {
84+ return nil , errors .Wrap (err , "SDS: failed to create reliability manager" )
85+ }
86+ callbacks := sds.EventCallbacks {
87+ OnMessageSent : func (messageId sds.MessageID , channelId string ) {
88+ logger .Debug ("SDS: message sent" , zap .String ("messageId" , string (messageId )), zap .String ("channelId" , channelId ))
89+ },
90+ OnMissingDependencies : func (messageId sds.MessageID , missingDeps []sds.MessageID , channelId string ) {
91+ logger .Debug ("SDS: missing dependencies" ,
92+ zap .String ("messageId" , string (messageId )),
93+ zap .String ("channelId" , channelId ),
94+ zap .Any ("missingDeps" , missingDeps ))
95+ },
96+ OnMessageReady : func (messageId sds.MessageID , channelId string ) {
97+ logger .Debug ("SDS: message ready" ,
98+ zap .String ("messageId" , string (messageId )),
99+ zap .String ("channelId" , channelId ))
100+ },
101+ }
102+ reliabilityManager .RegisterCallbacks (callbacks )
103+
78104 p := & MessageSender {
79- identity : identity ,
80- database : database ,
81- datasyncEnabled : true , // FIXME
82- protocol : enc ,
83- persistence : persistence ,
84- publisher : pubsub .NewPublisher (),
85- transport : transport ,
86- logger : logger ,
87- ephemeralKeys : make (map [string ]* ecdsa.PrivateKey ),
105+ identity : identity ,
106+ database : database ,
107+ datasyncEnabled : true , // FIXME
108+ protocol : enc ,
109+ persistence : persistence ,
110+ publisher : pubsub .NewPublisher (),
111+ transport : transport ,
112+ logger : logger ,
113+ ephemeralKeys : make (map [string ]* ecdsa.PrivateKey ),
114+ reliabilityManager : reliabilityManager ,
88115 }
89116
90117 return p , nil
@@ -311,6 +338,15 @@ func (s *MessageSender) sendCommunity(
311338 rawMessage .Sender = s .identity
312339 }
313340
341+ if rawMessage .CommunityID != nil && len (rawMessage .CommunityID ) > 0 && rawMessage .MessageType != protobuf .ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
342+ s .logger .Debug ("SDS: dispatchCommunityChatMessage with communityID" , zap .String ("communityID" , types .EncodeHex (rawMessage .CommunityID )))
343+ sdsWrappedPayload , err := s .wrapPayloadForSDS (rawMessage .Payload , rawMessage .CommunityID )
344+ if err != nil {
345+ return nil , errors .Wrap (err , "failed to wrap payload for SDS" )
346+ }
347+ rawMessage .Payload = sdsWrappedPayload
348+ }
349+
314350 messageID , err := s .getMessageID (rawMessage )
315351 if err != nil {
316352 return nil , err
@@ -676,6 +712,15 @@ func (s *MessageSender) SendPublic(
676712 rawMessage .Sender = s .identity
677713 }
678714
715+ if rawMessage .CommunityID != nil && len (rawMessage .CommunityID ) > 0 && rawMessage .MessageType != protobuf .ApplicationMetadataMessage_COMMUNITY_DESCRIPTION {
716+ s .logger .Debug ("SDS: SendPublic with communityID" , zap .String ("communityID" , types .EncodeHex (rawMessage .CommunityID )))
717+ sdsWrappedPayload , err := s .wrapPayloadForSDS (rawMessage .Payload , rawMessage .CommunityID )
718+ if err != nil {
719+ return nil , errors .Wrap (err , "failed to wrap payload for SDS" )
720+ }
721+ rawMessage .Payload = sdsWrappedPayload
722+ }
723+
679724 var wrappedMessage []byte
680725 var err error
681726 if rawMessage .SkipApplicationWrap {
@@ -932,6 +977,7 @@ func (s *MessageSender) handleMessage(receivedMsg *messagingtypes.ReceivedMessag
932977 }
933978
934979 // Not a critical error; message wasn't segmented, proceed with next layers.
980+ hlogger .Error ("failed to handle segmentation layer message" , zap .Error (err ))
935981 }
936982
937983 err = s .handleEncryptionLayer (context .Background (), message )
@@ -960,6 +1006,11 @@ func (s *MessageSender) handleMessage(receivedMsg *messagingtypes.ReceivedMessag
9601006 zap .String ("envelopeHash" , hexutil .Encode (msg .TransportLayer .Hash )),
9611007 zap .String ("messageId" , hexutil .Encode (msg .ApplicationLayer .ID )),
9621008 )
1009+
1010+ err = s .unwrapPayloadForSDS (msg )
1011+ if err != nil {
1012+ hlogger .Error ("failed to unwrap payload for SDS" , zap .Error (err ))
1013+ }
9631014 }
9641015
9651016 return response , nil
@@ -1386,3 +1437,36 @@ func populateMessageApplicationLayer(m *messagingtypes.Message) error {
13861437 m .ApplicationLayer .Type = message .Type
13871438 return nil
13881439}
1440+
1441+ // Wrap message with SDS protocol https://github.com/vacp2p/rfc-index/blob/main/vac/raw/sds.md
1442+ func (s * MessageSender ) wrapPayloadForSDS (payload []byte , communityID []byte ) ([]byte , error ) {
1443+ sdsMessageID := crypto .Keccak256 (payload )
1444+
1445+ s .logger .Debug ("SDS: original payload" ,
1446+ zap .String ("channelId" , types .EncodeHex (communityID )),
1447+ zap .Int ("payload-length" , len (payload )),
1448+ zap .String ("messageId" , types .EncodeHex (sdsMessageID )),
1449+ )
1450+ sdsWrappedPayload , err := s .reliabilityManager .WrapOutgoingMessage (payload , sds .MessageID (types .EncodeHex (sdsMessageID )), types .EncodeHex (communityID ))
1451+ if err != nil {
1452+ return nil , errors .Wrap (err , "SDS: failed to wrap a community message" )
1453+ }
1454+
1455+ return sdsWrappedPayload , nil
1456+ }
1457+
1458+ func (s * MessageSender ) unwrapPayloadForSDS (msg * messagingtypes.Message ) error {
1459+ if len (msg .EncryptionLayer .Payload ) > 0 {
1460+ unwrappedMessage , err := s .reliabilityManager .UnwrapReceivedMessage (msg .ApplicationLayer .Payload )
1461+ if err != nil {
1462+ s .logger .Error ("SDS: failed to unwrap received message" , zap .Error (err ))
1463+ } else {
1464+ msg .ApplicationLayer .Payload = * unwrappedMessage .Message
1465+ s .logger .Debug ("SDS: missing deps" ,
1466+ zap .Any ("missing-deps" , * unwrappedMessage .MissingDeps ),
1467+ )
1468+ }
1469+ }
1470+
1471+ return nil
1472+ }
0 commit comments