Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/push-notification-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/common/dbsetup"
"github.com/status-im/status-go/crypto"
"github.com/status-im/status-go/internal/timesource"
"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/messaging"
"github.com/status-im/status-go/params"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/status-im/status-go/protocol/sqlite"
mailserversDB "github.com/status-im/status-go/services/mailservers"
"github.com/status-im/status-go/services/personal"
"github.com/status-im/status-go/timesource"
"github.com/status-im/status-go/walletdatabase"
)

Expand Down Expand Up @@ -141,7 +141,7 @@ func main() {
ClusterID: 16,
},
InstallationID: installationID,
TimeSource: timesource.Default(),
TimeSource: timesource.DefaultService(),
},
messaging.WithLogger(logger.Named("messaging")),
)
Expand Down
21 changes: 21 additions & 0 deletions internal/timesource/local.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package timesource

import (
"context"
"time"
)

type localTimeSource struct {
}

func (l *localTimeSource) Now() time.Time {
return time.Now()
}

func (l *localTimeSource) Start(ctx context.Context) error {
return nil
}

func (l *localTimeSource) Stop() {
return
}
31 changes: 13 additions & 18 deletions timesource/timesource.go → internal/timesource/ntp.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func computeOffset(timeQuery ntpQuery, servers []string, allowedFailures int) (t
return offsets[mid], nil
}

var defaultTimeSource = &NTPTimeSource{
var defaultNTPTimeSource = &ntpTimeSource{
servers: defaultServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: FastNTPSyncPeriod,
Expand All @@ -134,14 +134,9 @@ var defaultTimeSource = &NTPTimeSource{
now: time.Now,
}

// Default initializes time source with default config values.
func Default() *NTPTimeSource {
return defaultTimeSource
}

// NTPTimeSource provides source of time that tries to be resistant to time skews.
// ntpTimeSource provides source of time that tries to be resistant to time skews.
// It does so by periodically querying time offset from ntp servers.
type NTPTimeSource struct {
type ntpTimeSource struct {
servers []string
allowedFailures int
fastNTPSyncPeriod time.Duration
Expand All @@ -160,7 +155,7 @@ type NTPTimeSource struct {

// Now returns time adjusted by latest known offset
// and detects system time changes
func (s *NTPTimeSource) Now() time.Time {
func (s *ntpTimeSource) Now() time.Time {
s.timeDataMu.RLock()

currentTime := s.now()
Expand Down Expand Up @@ -197,7 +192,7 @@ func (s *NTPTimeSource) Now() time.Time {
return adjustedTime
}

func (s *NTPTimeSource) updateOffset() error {
func (s *ntpTimeSource) updateOffset() error {
offset, err := computeOffset(s.timeQuery, s.servers, s.allowedFailures)
if err != nil {
logutils.ZapLogger().Error("failed to compute offset", zap.Error(err))
Expand All @@ -208,14 +203,14 @@ func (s *NTPTimeSource) updateOffset() error {
defer s.timeDataMu.Unlock()
s.latestOffset = offset
//TBD: if we found offset is too large, we should notify user that system time might not be accurate via emit signal,
// and because go-waku doesn't use NTPTimeSource ATM (it just use time.Now()), this might be a problem for MissingMessageVerifier work normally.
// and because go-waku doesn't use ntpTimeSource ATM (it just use time.Now()), this might be a problem for MissingMessageVerifier work normally.
// e.g. might get errInvalidTimeRange when validate StoreQueryRequest
return nil
}

// runPeriodically runs periodically the given function based on NTPTimeSource
// runPeriodically runs periodically the given function based on ntpTimeSource
// synchronization limits (fastNTPSyncPeriod / slowNTPSyncPeriod)
func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
func (s *ntpTimeSource) runPeriodically(ctx context.Context, fn func() error, starWithSlowSyncPeriod bool) {
if s.started {
return
}
Expand Down Expand Up @@ -243,7 +238,7 @@ func (s *NTPTimeSource) runPeriodically(ctx context.Context, fn func() error, st
}

// Start initializes the local offset and starts a goroutine that periodically updates the local offset.
func (s *NTPTimeSource) Start(ctx context.Context) error {
func (s *ntpTimeSource) Start(ctx context.Context) error {
s.stateMu.Lock()
defer s.stateMu.Unlock()
if s.started {
Expand Down Expand Up @@ -272,28 +267,28 @@ func (s *NTPTimeSource) Start(ctx context.Context) error {
}

// Stop goroutine that updates time source.
func (s *NTPTimeSource) Stop() {
func (s *ntpTimeSource) Stop() {
if s.cancel == nil {
return
}
s.cancel()
s.started = false
}

func (s *NTPTimeSource) GetCurrentTime() time.Time {
func (s *ntpTimeSource) GetCurrentTime() time.Time {
err := s.Start(context.Background())
if err != nil {
panic("could not obtain timesource: " + err.Error())
}
return s.Now()
}

func (s *NTPTimeSource) GetCurrentTimeInMillis() uint64 {
func (s *ntpTimeSource) GetCurrentTimeInMillis() uint64 {
return convertToMillis(s.GetCurrentTime())
}

func GetCurrentTime() time.Time {
ts := Default()
ts := DefaultService()
err := ts.Start(context.Background())
if err != nil {
panic("could not obtain timesource: " + err.Error())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestComputeOffset(t *testing.T) {
func TestNTPTimeSource(t *testing.T) {
for _, tc := range newTestCases() {
t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
source := &ntpTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestRunningPeriodically(t *testing.T) {
slowHits := 1

t.Run(tc.description, func(t *testing.T) {
source := &NTPTimeSource{
source := &ntpTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
Expand All @@ -212,7 +212,7 @@ func TestRunningPeriodically(t *testing.T) {
}
lastCall := time.Now()
// we're simulating a calls to updateOffset, testing ntp calls happens
// on NTPTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
// on ntpTimeSource specified periods (fastNTPSyncPeriod & slowNTPSyncPeriod)
wg := sync.WaitGroup{}
wg.Add(1)
source.runPeriodically(context.TODO(), func() error {
Expand Down Expand Up @@ -263,7 +263,7 @@ func TestGetCurrentTimeInMillis(t *testing.T) {
}

currentTime := time.Now()
ts := NTPTimeSource{
ts := ntpTimeSource{
servers: tc.servers,
allowedFailures: tc.allowedFailures,
timeQuery: tc.query,
Expand All @@ -289,7 +289,7 @@ func TestGetCurrentTimeInMillis(t *testing.T) {

func TestGetCurrentTimeOffline(t *testing.T) {
// covers https://github.com/status-im/status-desktop/issues/12691
ts := &NTPTimeSource{
ts := &ntpTimeSource{
servers: defaultServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: 1 * time.Millisecond,
Expand Down Expand Up @@ -322,7 +322,7 @@ func TestSystemTimeChangeDetection(t *testing.T) {
}

// Create a time source with our mocks
ts := &NTPTimeSource{
ts := &ntpTimeSource{
servers: []string{"test-server"},
allowedFailures: 0,
fastNTPSyncPeriod: 1 * time.Hour,
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestTimeTrackingInitialization(t *testing.T) {
}

// Create the time source with our controlled functions
ts := &NTPTimeSource{
ts := &ntpTimeSource{
servers: mockedServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: 1 * time.Hour, // Use long periods to avoid actual periodic updates during test
Expand Down Expand Up @@ -486,7 +486,7 @@ func TestTimeChangeDetectionSkippedWhenNotInitialized(t *testing.T) {
}

// Create the time source with our controlled functions
ts := &NTPTimeSource{
ts := &ntpTimeSource{
servers: mockedServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: 1 * time.Hour,
Expand Down Expand Up @@ -529,7 +529,7 @@ func TestTimeChangeDetectionWithUpdateFailure(t *testing.T) {
}

// Create the time source with our controlled functions
ts := &NTPTimeSource{
ts := &ntpTimeSource{
servers: mockedServers,
allowedFailures: DefaultMaxAllowedFailures,
fastNTPSyncPeriod: 1 * time.Hour,
Expand Down
22 changes: 20 additions & 2 deletions internal/timesource/timesource.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
package timesource

import "time"
import (
"context"
"time"
)

type TimeSource interface {
type Provider interface {
Now() time.Time
}

type Service interface {
Provider
Start(ctx context.Context) error
Stop()
}

// DefaultService initializes time source with default config values.
func DefaultService() Service {
return defaultNTPTimeSource
}

func LocalService() Service {
return &localTimeSource{}
}
4 changes: 2 additions & 2 deletions messaging/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type CoreParams struct {
WakuConfig params.WakuV2Config
ClusterConfig params.ClusterConfig

TimeSource timesource.TimeSource
TimeSource timesource.Provider
}

func newCore(waku wakutypes.Waku, params CoreParams, config *config) (*Core, error) {
Expand Down Expand Up @@ -381,7 +381,7 @@ type wakuParams struct {
onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error)
onPeerStats func(wakutypes.ConnStatus)

timeSource timesource.TimeSource
timeSource timesource.Provider

logger *zap.Logger
}
Expand Down
13 changes: 6 additions & 7 deletions messaging/waku/gowaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"github.com/waku-org/go-waku/waku/v2/utils"

"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"

gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
cryptotypes "github.com/status-im/status-go/crypto/types"
Expand All @@ -90,10 +93,6 @@ import (
"github.com/status-im/status-go/messaging/waku/common"
"github.com/status-im/status-go/messaging/waku/persistence"
"github.com/status-im/status-go/messaging/waku/types"
ntptimesource "github.com/status-im/status-go/timesource"

"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

const messageQueueLimit = 1024
Expand Down Expand Up @@ -188,7 +187,7 @@ type Waku struct {

logger *zap.Logger

timesource timesource.TimeSource
timesource timesource.Provider

// seededBootnodesForDiscV5 indicates whether we manage to retrieve discovery
// bootnodes successfully
Expand Down Expand Up @@ -221,7 +220,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
}

// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.Provider, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
var err error
if logger == nil {
logger, err = zap.NewDevelopment()
Expand All @@ -231,7 +230,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
}

if ts == nil {
ts = ntptimesource.Default()
ts = timesource.DefaultService()
}

cfg = setDefaults(cfg)
Expand Down
21 changes: 10 additions & 11 deletions messaging/waku/nwaku.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

"github.com/waku-org/waku-go-bindings/waku"
bindingscommon "github.com/waku-org/waku-go-bindings/waku/common"

gocommon "github.com/status-im/status-go/common"
"github.com/status-im/status-go/connection"
cryptotypes "github.com/status-im/status-go/crypto/types"
Expand All @@ -62,12 +65,8 @@ import (
"github.com/status-im/status-go/messaging/waku/common"
"github.com/status-im/status-go/messaging/waku/persistence"
"github.com/status-im/status-go/messaging/waku/types"
ntptimesource "github.com/status-im/status-go/timesource"

"github.com/waku-org/waku-go-bindings/waku"
bindingscommon "github.com/waku-org/waku-go-bindings/waku/common"

node "github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
)

Expand Down Expand Up @@ -182,7 +181,7 @@ type Waku struct {
// timesource provided in constructor is managed by status-go; go-waku must not invoke Start or Stop.
// The adapter only fulfills the required interface; Start and Stop are no-ops.
type timesourceAdapter struct {
timesource.TimeSource
timesource.Provider
}

func (t timesourceAdapter) Start(ctx context.Context) error { return nil }
Expand All @@ -204,7 +203,7 @@ func newTTLCache() *ttlcache.Cache[gethcommon.Hash, bool] {
}

// New creates a WakuV2 client ready to communicate through the LibP2P network.
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.TimeSource, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTopicsPersistence persistence.ProtectedTopics, ts timesource.Provider, onHistoricMessagesRequestFailed func([]byte, peer.AddrInfo, error), onPeerStats func(types.ConnStatus)) (*Waku, error) {
var err error
if logger == nil {
logger, err = zap.NewDevelopment()
Expand All @@ -213,11 +212,11 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
}
}

var timesource gowakutimesource.Timesource
var wakuTimeSource gowakutimesource.Timesource
if ts != nil {
timesource = timesourceAdapter{ts}
wakuTimeSource = timesourceAdapter{ts}
} else {
timesource = ntptimesource.Default()
wakuTimeSource = timesource.DefaultService()
}

cfg = setDefaults(cfg)
Expand Down Expand Up @@ -269,7 +268,7 @@ func New(nodeKey *ecdsa.PrivateKey, cfg *Config, logger *zap.Logger, protectedTo
dnsAddressCacheLock: &sync.RWMutex{},
dnsDiscAsyncRetrievedSignal: make(chan struct{}),
storeMsgIDs: make(map[gethcommon.Hash]bool),
timesource: timesource,
timesource: wakuTimeSource,
storeMsgIDsMu: sync.RWMutex{},
logger: logger,
discV5BootstrapNodes: cfg.DiscV5BootstrapNodes,
Expand Down
Loading
Loading