Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/flamenco/gossip/crds/fd_crds.c
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,11 @@ fd_crds_entry_hash( fd_crds_entry_t const * entry ) {
return entry->value_hash;
}

long
fd_crds_entry_wallclock( fd_crds_entry_t const * entry ) {
return entry->wallclock_nanos;
}

inline static void
make_contact_info_key( uchar const * pubkey,
fd_crds_key_t * key_out ) {
Expand Down Expand Up @@ -1158,6 +1163,11 @@ fd_crds_peer_count( fd_crds_t const * crds ){
return crds_contact_info_pool_used( crds->contact_info.pool );
}

ulong
fd_crds_staked_peer_count( fd_crds_t const * crds ) {
return crds->metrics->peer_staked_cnt;
}

static inline void
set_peer_active_status( fd_crds_t * crds,
uchar const * peer_pubkey,
Expand Down
12 changes: 9 additions & 3 deletions src/flamenco/gossip/crds/fd_crds.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ fd_crds_entry_pubkey( fd_crds_entry_t const * entry );
uchar const *
fd_crds_entry_hash( fd_crds_entry_t const * entry );

long
fd_crds_entry_wallclock( fd_crds_entry_t const * entry );

/* fd_crds_entry_is_contact_info returns 1 if entry holds a Contact
Info CRDS value. Assumes entry was populated with either
fd_crds_populate_{preflight,full} */
Expand Down Expand Up @@ -229,12 +232,15 @@ fd_contact_info_t const *
fd_crds_contact_info_lookup( fd_crds_t const * crds,
uchar const * pubkey );

/* fd_crds_peer_count returns the number of Contact Info entries
present in the sidetable. The lifetime of a Contact Info entry
tracks the lifetime of the corresponding CRDS entry. */
/* fd_crds{_staked}_peer_count returns the number of (staked) Contact
Info entries present in the sidetable. The lifetime of a Contact Info
entry tracks the lifetime of the corresponding CRDS entry. */
ulong
fd_crds_peer_count( fd_crds_t const * crds );

ulong
fd_crds_staked_peer_count( fd_crds_t const * crds );

/* The CRDS table tracks whether a peer is active or not to determine
whether it should be sampled (see sample APIs).
fd_crds_peer_{active,inactive} provide a way to manage this state
Expand Down
74 changes: 70 additions & 4 deletions src/flamenco/gossip/fd_gossip.c
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ typedef struct stake stake_t;

#include "fd_push_set_private.c"

#define SET_NAME filtered_crds
#include "../../util/tmpl/fd_set_dynamic.c"

struct fd_gossip_private {
uchar identity_pubkey[ 32UL ];
ulong identity_stake;
Expand All @@ -77,6 +80,9 @@ struct fd_gossip_private {

fd_rng_t * rng;

filtered_crds_t * filtered_crds_set;
ulong pull_resp_tx_budget;

struct {
ulong count;
stake_t * pool;
Expand All @@ -88,6 +94,7 @@ struct fd_gossip_private {
long next_active_set_refresh;
long next_contact_info_refresh;
long next_flush_push_state;
long next_data_budget_update;
} timers;

/* Callbacks */
Expand Down Expand Up @@ -130,6 +137,7 @@ fd_gossip_footprint( ulong max_values,
l = FD_LAYOUT_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
l = FD_LAYOUT_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt_est( CRDS_MAX_CONTACT_INFO ) ) );
l = FD_LAYOUT_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) );
l = FD_LAYOUT_APPEND( l, filtered_crds_align(), filtered_crds_footprint( max_values ) );
l = FD_LAYOUT_FINI( l, fd_gossip_align() );
return l;
}
Expand Down Expand Up @@ -199,6 +207,7 @@ fd_gossip_new( void * shmem,
void * stake_pool = FD_SCRATCH_ALLOC_APPEND( l, stake_pool_align(), stake_pool_footprint( CRDS_MAX_CONTACT_INFO ) );
void * stake_weights = FD_SCRATCH_ALLOC_APPEND( l, stake_map_align(), stake_map_footprint( stake_map_chain_cnt ) );
void * active_ps = FD_SCRATCH_ALLOC_APPEND( l, push_set_align(), push_set_footprint( FD_ACTIVE_SET_MAX_PEERS ) );
void * filtered_crds = FD_SCRATCH_ALLOC_APPEND( l, filtered_crds_align(), filtered_crds_footprint( max_values ) );
FD_TEST( FD_SCRATCH_ALLOC_FINI( l, fd_gossip_align() ) == (ulong)shmem + fd_gossip_footprint( max_values, entrypoints_len ) );

gossip->gossip_net_out = gossip_net_out;
Expand All @@ -225,6 +234,9 @@ fd_gossip_new( void * shmem,
gossip->active_pset = push_set_join( push_set_new( active_ps, FD_ACTIVE_SET_MAX_PEERS ) );
FD_TEST( gossip->active_pset );

gossip->filtered_crds_set = filtered_crds_join( filtered_crds_new( filtered_crds, max_values ) );
FD_TEST( gossip->filtered_crds_set );

FD_TEST( fd_sha256_join( fd_sha256_new( gossip->sha256 ) ) );
FD_TEST( fd_sha512_join( fd_sha512_new( gossip->sha512 ) ) );

Expand All @@ -234,6 +246,7 @@ fd_gossip_new( void * shmem,
gossip->timers.next_active_set_refresh = 0L;
gossip->timers.next_contact_info_refresh = 0L;
gossip->timers.next_flush_push_state = 0L;
gossip->timers.next_data_budget_update = 0L;

gossip->send_fn = send_fn;
gossip->send_ctx = send_ctx;
Expand Down Expand Up @@ -448,6 +461,10 @@ rx_pull_request( fd_gossip_t * gossip,
fd_stem_context_t * stem,
long now ) {
/* TODO: Implement data budget? Or at least limit iteration range */
if( FD_UNLIKELY( gossip->pull_resp_tx_budget==0UL ) ) return;

filtered_crds_null( gossip->filtered_crds_set );
filtered_crds_t * filtered = gossip->filtered_crds_set;

fd_bloom_t filter[1];
filter->keys_len = pr_view->bloom_keys_len;
Expand All @@ -456,30 +473,67 @@ rx_pull_request( fd_gossip_t * gossip,
filter->bits_len = pr_view->bloom_bits_cnt;
filter->bits = (ulong *)( payload + pr_view->bloom_bits_offset );

fd_gossip_txbuild_t pull_resp[1];
fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );
long contact_wallclock_nanos = pr_view->pr_ci->wallclock_nanos;

uchar iter_mem[ 16UL ];

ulong i = 0UL;
ulong filtered_cnt = 0UL;
ulong filtered_sz = 0UL;
for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->mask, pr_view->mask_bits, iter_mem );
!fd_crds_mask_iter_done( it, gossip->crds );
it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
i++;
fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );

/* TODO: Add jitter here? */
// if( FD_UNLIKELY( fd_crds_value_wallclock( candidate )>contact_info->wallclock_nanos ) ) continue;

if( FD_UNLIKELY( fd_crds_entry_wallclock( candidate )>contact_wallclock_nanos ) ) continue;
if( FD_UNLIKELY( !fd_bloom_contains( filter, fd_crds_entry_hash( candidate ), 32UL ) ) ) continue;

filtered_crds_insert( filtered, i );
filtered_cnt++;

uchar const * crds_val;
ulong crds_size;
fd_crds_entry_value( candidate, &crds_val, &crds_size );
filtered_sz += crds_size;
}

if( FD_UNLIKELY( filtered_sz>gossip->pull_resp_tx_budget ) ) {
ulong avg_size = filtered_sz / filtered_cnt;
ulong num_to_send = gossip->pull_resp_tx_budget / avg_size;

for( ulong j=0UL; j<filtered_cnt; j++ ) {
if( filtered_crds_test( filtered, j) &&
fd_rng_ulong_roll( gossip->rng, filtered_cnt ) >=num_to_send ) {
filtered_crds_remove( filtered, j );
}
}
}

fd_gossip_txbuild_t pull_resp[1];
fd_gossip_txbuild_init( pull_resp, gossip->identity_pubkey, FD_GOSSIP_MESSAGE_PULL_RESPONSE );

i=0UL;
for( fd_crds_mask_iter_t * it=fd_crds_mask_iter_init( gossip->crds, pr_view->mask, pr_view->mask_bits, iter_mem );
!fd_crds_mask_iter_done( it, gossip->crds );
it=fd_crds_mask_iter_next( it, gossip->crds ) ) {
i++;
if( !filtered_crds_test( filtered, i ) ) continue;
fd_crds_entry_t const * candidate = fd_crds_mask_iter_entry( it, gossip->crds );

uchar const * crds_val;
ulong crds_size;
fd_crds_entry_value( candidate, &crds_val, &crds_size );

if( FD_UNLIKELY( !fd_gossip_txbuild_can_fit( pull_resp, crds_size ) ) ) {
gossip->pull_resp_tx_budget = fd_ulong_sat_sub( gossip->pull_resp_tx_budget, pull_resp->bytes_len ); /* OK if overflow, should be close to 0 TODO: put budget in gossip_t */
txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
}
fd_gossip_txbuild_append( pull_resp, crds_size, crds_val );
}

gossip->pull_resp_tx_budget = fd_ulong_sat_sub( gossip->pull_resp_tx_budget, pull_resp->bytes_len );
txbuild_flush( gossip, pull_resp, stem, peer_addr, now );
}

Expand Down Expand Up @@ -868,6 +922,14 @@ next_pull_request( fd_gossip_t const * gossip,
return now+1600L*1000L;
}

static inline void
update_data_budget( fd_gossip_t * gossip ) {
ulong const BYTES_PER_100MS = 1UL << 9; /* 512B per 100ms ~= 5KB/s = 40Kb/s */

ulong x = fd_ulong_if( fd_crds_has_staked_node( gossip->crds ), fd_crds_staked_peer_count( gossip->crds )*BYTES_PER_100MS, BYTES_PER_100MS );
gossip->pull_resp_tx_budget = fd_ulong_min( gossip->pull_resp_tx_budget + x, 5*x );
}

static inline void
rotate_active_set( fd_gossip_t * gossip,
fd_stem_context_t * stem,
Expand Down Expand Up @@ -926,6 +988,10 @@ fd_gossip_advance( fd_gossip_t * gossip,
rotate_active_set( gossip, stem, now );
gossip->timers.next_active_set_refresh = now+300L*1000L*1000L; /* TODO: Jitter */
}
if( FD_UNLIKELY( now - gossip->timers.next_data_budget_update >= 0L ) ) {
update_data_budget( gossip );
gossip->timers.next_data_budget_update = now+100L*1000L*1000L;
}
}

void
Expand Down
Loading