Skip to content

Commit cbe7387

Browse files
igor-sirotinAYAHASSAN287
authored andcommitted
feat: local timesource (#7003)
* feat: local timesource * fix: nwaku * fix: rebase issues
1 parent 4fe3de8 commit cbe7387

File tree

17 files changed

+103
-66
lines changed

17 files changed

+103
-66
lines changed

cmd/push-notification-server/main.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/status-im/status-go/appdatabase"
1818
"github.com/status-im/status-go/common/dbsetup"
1919
"github.com/status-im/status-go/crypto"
20+
"github.com/status-im/status-go/internal/timesource"
2021
"github.com/status-im/status-go/logutils"
2122
"github.com/status-im/status-go/messaging"
2223
"github.com/status-im/status-go/params"
@@ -27,7 +28,6 @@ import (
2728
"github.com/status-im/status-go/protocol/sqlite"
2829
mailserversDB "github.com/status-im/status-go/services/mailservers"
2930
"github.com/status-im/status-go/services/personal"
30-
"github.com/status-im/status-go/timesource"
3131
"github.com/status-im/status-go/walletdatabase"
3232
)
3333

@@ -141,7 +141,7 @@ func main() {
141141
ClusterID: 16,
142142
},
143143
InstallationID: installationID,
144-
TimeSource: timesource.Default(),
144+
TimeSource: timesource.DefaultService(),
145145
},
146146
messaging.WithLogger(logger.Named("messaging")),
147147
)

internal/timesource/local.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package timesource
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
type localTimeSource struct {
9+
}
10+
11+
func (l *localTimeSource) Now() time.Time {
12+
return time.Now()
13+
}
14+
15+
func (l *localTimeSource) Start(ctx context.Context) error {
16+
return nil
17+
}
18+
19+
func (l *localTimeSource) Stop() {
20+
return
21+
}

timesource/timesource.go renamed to internal/timesource/ntp.go

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t
125125
return offsets[mid], nil
126126
}
127127

128-
var defaultTimeSource = &NTPTimeSource{
128+
var defaultNTPTimeSource = &ntpTimeSource{
129129
servers: defaultServers,
130130
allowedFailures: DefaultMaxAllowedFailures,
131131
fastNTPSyncPeriod: FastNTPSyncPeriod,
@@ -134,14 +134,9 @@ var defaultTimeSource = &NTPTimeSource{
134134
now: time.Now,
135135
}
136136

137-
// Default initializes time source with default config values.
138-
func Default() *NTPTimeSource {
139-
return defaultTimeSource
140-
}
141-
142-
// NTPTimeSource provides source of time that tries to be resistant to time skews.
137+
// ntpTimeSource provides source of time that tries to be resistant to time skews.
143138
// It does so by periodically querying time offset from ntp servers.
144-
type NTPTimeSource struct {
139+
type ntpTimeSource struct {
145140
servers []string
146141
allowedFailures int
147142
fastNTPSyncPeriod time.Duration
@@ -160,7 +155,7 @@ type NTPTimeSource struct {
160155

161156
// Now returns time adjusted by latest known offset
162157
// and detects system time changes
163-
func (s *NTPTimeSource) Now() time.Time {
158+
func (s *ntpTimeSource) Now() time.Time {
164159
s.timeDataMu.RLock()
165160

166161
currentTime := s.now()
@@ -197,7 +192,7 @@ func (s *NTPTimeSource) Now() time.Time {
197192
return adjustedTime
198193
}
199194

200-
func (s *NTPTimeSource) updateOffset() error {
195+
func (s *ntpTimeSource) updateOffset() error {
201196
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
202197
if err != nil {
203198
logutils.ZapLogger().Error("failed to compute offset", zap.Error(err))
@@ -208,14 +203,14 @@ func (s *NTPTimeSource) updateOffset() error {
208203
defer s.timeDataMu.Unlock()
209204
s.latestOffset = offset
210205
//TBD: if we found offset is too large, we should notify user that system time might not be accurate via emit signal,
211-
// and because go-waku doesn't use NTPTimeSource ATM (it just use time.Now()), this might be a problem for MissingMessageVerifier work normally.
206+
// and because go-waku doesn't use ntpTimeSource ATM (it just use time.Now()), this might be a problem for MissingMessageVerifier work normally.
212207
// e.g. might get errInvalidTimeRange when validate StoreQueryRequest
213208
return nil
214209
}
215210

216-
// runPeriodically runs periodically the given function based on NTPTimeSource
211+
// runPeriodically runs periodically the given function based on ntpTimeSource
217212
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
218-
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
213+
func (s *ntpTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
219214
if s.started {
220215
return
221216
}
@@ -243,7 +238,7 @@ func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, st
243238
}
244239

245240
// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
246-
func (s *NTPTimeSource) Start(ctx context.Context) error {
241+
func (s *ntpTimeSource) Start(ctx context.Context) error {
247242
s.stateMu.Lock()
248243
defer s.stateMu.Unlock()
249244
if s.started {
@@ -272,28 +267,28 @@ func (s *NTPTimeSource) Start(ctx context.Context) error {
272267
}
273268

274269
// Stop goroutine that updates time source.
275-
func (s *NTPTimeSource) Stop() {
270+
func (s *ntpTimeSource) Stop() {
276271
if s.cancel == nil {
277272
return
278273
}
279274
s.cancel()
280275
s.started = false
281276
}
282277

283-
func (s *NTPTimeSource) GetCurrentTime() time.Time {
278+
func (s *ntpTimeSource) GetCurrentTime() time.Time {
284279
err := s.Start(context.Background())
285280
if err != nil {
286281
panic("could not obtain timesource: " + err.Error())
287282
}
288283
return s.Now()
289284
}
290285

291-
func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 {
286+
func (s *ntpTimeSource) GetCurrentTimeInMillis() uint64 {
292287
return convertToMillis(s.GetCurrentTime())
293288
}
294289

295290
func GetCurrentTime() time.Time {
296-
ts := Default()
291+
ts := DefaultService()
297292
err := ts.Start(context.Background())
298293
if err != nil {
299294
panic("could not obtain timesource: " + err.Error())

timesource/timesource_test.go renamed to internal/timesource/ntp_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func TestComputeOffset(t *testing.T) {
174174
func TestNTPTimeSource(t *testing.T) {
175175
for _, tc := range newTestCases() {
176176
t.Run(tc.description, func(t *testing.T) {
177-
source := &NTPTimeSource{
177+
source := &ntpTimeSource{
178178
servers: tc.servers,
179179
allowedFailures: tc.allowedFailures,
180180
timeQuery: tc.query,
@@ -202,7 +202,7 @@ func TestRunningPeriodically(t *testing.T) {
202202
slowHits := 1
203203

204204
t.Run(tc.description, func(t *testing.T) {
205-
source := &NTPTimeSource{
205+
source := &ntpTimeSource{
206206
servers: tc.servers,
207207
allowedFailures: tc.allowedFailures,
208208
timeQuery: tc.query,
@@ -212,7 +212,7 @@ func TestRunningPeriodically(t *testing.T) {
212212
}
213213
lastCall := time.Now()
214214
// we're simulating a calls to updateOffset, testing ntp calls happens
215-
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
215+
// on ntpTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
216216
wg := sync.WaitGroup{}
217217
wg.Add(1)
218218
source.runPeriodically(context.TODO(), func() error {
@@ -263,7 +263,7 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
263263
}
264264

265265
currentTime := time.Now()
266-
ts := NTPTimeSource{
266+
ts := ntpTimeSource{
267267
servers: tc.servers,
268268
allowedFailures: tc.allowedFailures,
269269
timeQuery: tc.query,
@@ -289,7 +289,7 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
289289

290290
func TestGetCurrentTimeOffline(t *testing.T) {
291291
// covers https://github.com/status-im/status-desktop/issues/12691
292-
ts := &NTPTimeSource{
292+
ts := &ntpTimeSource{
293293
servers: defaultServers,
294294
allowedFailures: DefaultMaxAllowedFailures,
295295
fastNTPSyncPeriod: 1 * time.Millisecond,
@@ -322,7 +322,7 @@ func TestSystemTimeChangeDetection(t *testing.T) {
322322
}
323323

324324
// Create a time source with our mocks
325-
ts := &NTPTimeSource{
325+
ts := &ntpTimeSource{
326326
servers: []string{"test-server"},
327327
allowedFailures: 0,
328328
fastNTPSyncPeriod: 1 * time.Hour,
@@ -438,7 +438,7 @@ func TestTimeTrackingInitialization(t *testing.T) {
438438
}
439439

440440
// Create the time source with our controlled functions
441-
ts := &NTPTimeSource{
441+
ts := &ntpTimeSource{
442442
servers: mockedServers,
443443
allowedFailures: DefaultMaxAllowedFailures,
444444
fastNTPSyncPeriod: 1 * time.Hour, // Use long periods to avoid actual periodic updates during test
@@ -486,7 +486,7 @@ func TestTimeChangeDetectionSkippedWhenNotInitialized(t *testing.T) {
486486
}
487487

488488
// Create the time source with our controlled functions
489-
ts := &NTPTimeSource{
489+
ts := &ntpTimeSource{
490490
servers: mockedServers,
491491
allowedFailures: DefaultMaxAllowedFailures,
492492
fastNTPSyncPeriod: 1 * time.Hour,
@@ -529,7 +529,7 @@ func TestTimeChangeDetectionWithUpdateFailure(t *testing.T) {
529529
}
530530

531531
// Create the time source with our controlled functions
532-
ts := &NTPTimeSource{
532+
ts := &ntpTimeSource{
533533
servers: mockedServers,
534534
allowedFailures: DefaultMaxAllowedFailures,
535535
fastNTPSyncPeriod: 1 * time.Hour,

internal/timesource/timesource.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
11
package timesource
22

3-
import "time"
3+
import (
4+
"context"
5+
"time"
6+
)
47

5-
type TimeSource interface {
8+
type Provider interface {
69
Now() time.Time
710
}
11+
12+
type Service interface {
13+
Provider
14+
Start(ctx context.Context) error
15+
Stop()
16+
}
17+
18+
// DefaultService initializes time source with default config values.
19+
func DefaultService() Service {
20+
return defaultNTPTimeSource
21+
}
22+
23+
func LocalService() Service {
24+
return &localTimeSource{}
25+
}

messaging/core.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ type CoreParams struct {
7373
WakuConfig params.WakuV2Config
7474
ClusterConfig params.ClusterConfig
7575

76-
TimeSource timesource.TimeSource
76+
TimeSource timesource.Provider
7777
}
7878

7979
func newCore(waku wakutypes.Waku, params CoreParams, config *config) (*Core, error) {
@@ -381,7 +381,7 @@ type wakuParams struct {
381381
onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error)
382382
onPeerStats func(wakutypes.ConnStatus)
383383

384-
timeSource timesource.TimeSource
384+
timeSource timesource.Provider
385385

386386
logger *zap.Logger
387387
}

messaging/waku/gowaku.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ import (
8282
"github.com/waku-org/go-waku/waku/v2/protocol/store"
8383
"github.com/waku-org/go-waku/waku/v2/utils"
8484

85+
"github.com/waku-org/go-waku/waku/v2/node"
86+
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
87+
8588
gocommon "github.com/status-im/status-go/common"
8689
"github.com/status-im/status-go/connection"
8790
cryptotypes "github.com/status-im/status-go/crypto/types"
@@ -90,10 +93,6 @@ import (
9093
"github.com/status-im/status-go/messaging/waku/common"
9194
"github.com/status-im/status-go/messaging/waku/persistence"
9295
"github.com/status-im/status-go/messaging/waku/types"
93-
ntptimesource "github.com/status-im/status-go/timesource"
94-
95-
"github.com/waku-org/go-waku/waku/v2/node"
96-
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
9796
)
9897

9998
const messageQueueLimit = 1024
@@ -188,7 +187,7 @@ type Waku struct {
188187

189188
logger *zap.Logger
190189

191-
timesource timesource.TimeSource
190+
timesource timesource.Provider
192191

193192
// seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery
194193
// bootnodes successfully
@@ -221,7 +220,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
221220
}
222221

223222
// New creates a WakuV2 client ready to communicate through the LibP2P network.
224-
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
223+
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.Provider, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
225224
var err error
226225
if logger == nil {
227226
logger, err = zap.NewDevelopment()
@@ -231,7 +230,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
231230
}
232231

233232
if ts == nil {
234-
ts = ntptimesource.Default()
233+
ts = timesource.DefaultService()
235234
}
236235

237236
cfg = setDefaults(cfg)

messaging/waku/nwaku.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ import (
5454
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
5555
"github.com/waku-org/go-waku/waku/v2/protocol/store"
5656

57+
"github.com/waku-org/waku-go-bindings/waku"
58+
bindingscommon "github.com/waku-org/waku-go-bindings/waku/common"
59+
5760
gocommon "github.com/status-im/status-go/common"
5861
"github.com/status-im/status-go/connection"
5962
cryptotypes "github.com/status-im/status-go/crypto/types"
@@ -62,12 +65,8 @@ import (
6265
"github.com/status-im/status-go/messaging/waku/common"
6366
"github.com/status-im/status-go/messaging/waku/persistence"
6467
"github.com/status-im/status-go/messaging/waku/types"
65-
ntptimesource "github.com/status-im/status-go/timesource"
66-
67-
"github.com/waku-org/waku-go-bindings/waku"
68-
bindingscommon "github.com/waku-org/waku-go-bindings/waku/common"
6968

70-
node "github.com/waku-org/go-waku/waku/v2/node"
69+
"github.com/waku-org/go-waku/waku/v2/node"
7170
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
7271
)
7372

@@ -182,7 +181,7 @@ type Waku struct {
182181
// timesource provided in constructor is managed by status-go; go-waku must not invoke Start or Stop.
183182
// The adapter only fulfills the required interface; Start and Stop are no-ops.
184183
type timesourceAdapter struct {
185-
timesource.TimeSource
184+
timesource.Provider
186185
}
187186

188187
func (t timesourceAdapter) Start(ctx context.Context) error { return nil }
@@ -204,7 +203,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
204203
}
205204

206205
// New creates a WakuV2 client ready to communicate through the LibP2P network.
207-
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
206+
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.Provider, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
208207
var err error
209208
if logger == nil {
210209
logger, err = zap.NewDevelopment()
@@ -213,11 +212,11 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
213212
}
214213
}
215214

216-
var timesource gowakutimesource.Timesource
215+
var wakuTimeSource gowakutimesource.Timesource
217216
if ts != nil {
218-
timesource = timesourceAdapter{ts}
217+
wakuTimeSource = timesourceAdapter{ts}
219218
} else {
220-
timesource = ntptimesource.Default()
219+
wakuTimeSource = timesource.DefaultService()
221220
}
222221

223222
cfg = setDefaults(cfg)
@@ -269,7 +268,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
269268
dnsAddressCacheLock: &sync.RWMutex{},
270269
dnsDiscAsyncRetrievedSignal: make(chan struct{}),
271270
storeMsgIDs: make(map[gethcommon.Hash]bool),
272-
timesource: timesource,
271+
timesource: wakuTimeSource,
273272
storeMsgIDsMu: sync.RWMutex{},
274273
logger: logger,
275274
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,

0 commit comments

Comments
 (0)