Skip to content

Commit 9b7fe8d

Browse files
committed
feat: feature parity between gill version ws acct sub and reg one + optional passing into driftClient
1 parent 540088c commit 9b7fe8d

File tree

6 files changed

+501
-79
lines changed

6 files changed

+501
-79
lines changed

sdk/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
"@switchboard-xyz/on-demand": "2.4.1",
5454
"@triton-one/yellowstone-grpc": "1.3.0",
5555
"anchor-bankrun": "0.3.0",
56+
"gill": "^0.10.2",
5657
"nanoid": "3.3.4",
5758
"node-cache": "5.1.2",
5859
"rpc-websockets": "7.5.1",

sdk/src/accounts/driftClientAccount/webSocketDriftClientAccountSubscriber.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import { findAllMarketAndOracles } from '../../config';
3232
import { findDelistedPerpMarketsAndOracles } from '../utils';
3333
import { getOracleId } from '../../oracles/oracleId';
3434
import { OracleSource } from '../../types';
35+
import { WebSocketAccountSubscriberV2Gill } from '../webSocketAccountSubscriberV2Gill';
3536

3637
const ORACLE_DEFAULT_ID = getOracleId(
3738
PublicKey.default,
@@ -101,7 +102,7 @@ export class WebSocketDriftClientAccountSubscriber
101102
decodeBuffer?: (buffer: Buffer) => any,
102103
resubOpts?: ResubOpts,
103104
commitment?: Commitment
104-
) => AccountSubscriber<any>
105+
) => WebSocketAccountSubscriberV2Gill<any> | WebSocketAccountSubscriber<any>
105106
) {
106107
this.isSubscribed = false;
107108
this.program = program;

sdk/src/accounts/webSocketAccountSubscriberV2Gill.ts

Lines changed: 78 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,15 @@
1-
import { DataAndSlot, AccountSubscriber, ResubOpts } from './types';
1+
import {
2+
DataAndSlot,
3+
AccountSubscriber,
4+
ResubOpts,
5+
BufferAndSlot,
6+
} from './types';
27
import { AnchorProvider, Program } from '@coral-xyz/anchor';
38
import { capitalize } from './utils';
49
import {
510
AccountInfoBase,
611
AccountInfoWithBase58EncodedData,
12+
AccountInfoWithBase64EncodedData,
713
createSolanaClient,
814
isAddress,
915
type Address,
@@ -16,6 +22,7 @@ export class WebSocketAccountSubscriberV2Gill<T>
1622
implements AccountSubscriber<T>
1723
{
1824
dataAndSlot?: DataAndSlot<T>;
25+
bufferAndSlot?: BufferAndSlot;
1926
accountName: string;
2027
logAccountName: string;
2128
program: Program;
@@ -38,6 +45,7 @@ export class WebSocketAccountSubscriberV2Gill<T>
3845
private rpcSubscriptions: ReturnType<
3946
typeof createSolanaClient
4047
>['rpcSubscriptions'];
48+
private abortController?: AbortController;
4149

4250
public constructor(
4351
accountName: string,
@@ -99,38 +107,35 @@ export class WebSocketAccountSubscriberV2Gill<T>
99107
await this.fetch();
100108
}
101109

110+
// Create abort controller for proper cleanup
111+
const abortController = new AbortController();
112+
this.abortController = abortController;
113+
102114
// Subscribe to account changes using gill's rpcSubscriptions
103115
const pubkey = this.accountPublicKey.toBase58();
104116
if (isAddress(pubkey)) {
105117
const subscription = await this.rpcSubscriptions
106118
.accountNotifications(pubkey, {
107119
commitment: this.commitment,
108-
encoding: 'base58',
120+
encoding: 'base64',
109121
})
110122
.subscribe({
111-
abortSignal: new AbortController().signal,
123+
abortSignal: abortController.signal,
112124
});
113125

114126
for await (const notification of subscription) {
115-
this.handleRpcResponse(notification.context, notification.value);
127+
if (this.resubOpts?.resubTimeoutMs) {
128+
this.receivingData = true;
129+
clearTimeout(this.timeoutId);
130+
this.handleRpcResponse(notification.context, notification.value);
131+
this.setTimeout();
132+
} else {
133+
this.handleRpcResponse(notification.context, notification.value);
134+
}
116135
}
117136
}
118137

119-
this.listenerId = Math.random(); // Unique ID for tracking subscription
120-
121-
// Set up polling for account changes
122-
const pollInterval = setInterval(async () => {
123-
if (this.isUnsubscribing) {
124-
clearInterval(pollInterval);
125-
return;
126-
}
127-
128-
try {
129-
await this.fetch();
130-
} catch (error) {
131-
console.error(`[${this.logAccountName}] Polling error:`, error);
132-
}
133-
}, 1000); // Poll every second
138+
this.listenerId = Math.random(); // Unique ID for logging purposes
134139

135140
if (this.resubOpts?.resubTimeoutMs) {
136141
this.receivingData = true;
@@ -198,7 +203,7 @@ export class WebSocketAccountSubscriberV2Gill<T>
198203
const rpcResponse = await this.rpc
199204
.getAccountInfo(accountAddress, {
200205
commitment: this.commitment,
201-
encoding: 'base58',
206+
encoding: 'base64',
202207
})
203208
.send();
204209

@@ -214,33 +219,58 @@ export class WebSocketAccountSubscriberV2Gill<T>
214219

215220
handleRpcResponse(
216221
context: { slot: bigint },
217-
accountInfo?: AccountInfoBase & AccountInfoWithBase58EncodedData
222+
accountInfo?: AccountInfoBase &
223+
(AccountInfoWithBase58EncodedData | AccountInfoWithBase64EncodedData)
218224
): void {
219225
const newSlot = context.slot;
226+
let newBuffer: Buffer | undefined = undefined;
227+
228+
if (accountInfo) {
229+
// Extract data from gill response
230+
if (accountInfo.data) {
231+
// Handle different data formats from gill
232+
if (Array.isArray(accountInfo.data)) {
233+
// If it's a tuple [data, encoding]
234+
const [data, encoding] = accountInfo.data;
235+
236+
if (encoding === 'base58') {
237+
// we know encoding will be base58
238+
// Convert base58 to buffer using bs58
239+
newBuffer = Buffer.from(bs58.decode(data));
240+
} else {
241+
newBuffer = Buffer.from(data, 'base64');
242+
}
243+
}
244+
}
245+
}
220246

221-
if (!accountInfo) {
247+
if (!this.bufferAndSlot) {
248+
this.bufferAndSlot = {
249+
buffer: newBuffer,
250+
slot: Number(newSlot),
251+
};
252+
if (newBuffer) {
253+
const account = this.decodeBuffer(newBuffer);
254+
this.dataAndSlot = {
255+
data: account,
256+
slot: Number(newSlot),
257+
};
258+
this.onChange(account);
259+
}
222260
return;
223261
}
224262

225-
// Extract data from gill response
226-
let buffer: Buffer | undefined;
227-
if (accountInfo.data) {
228-
// Handle different data formats from gill
229-
if (typeof accountInfo.data === 'string') {
230-
// If it's a base64 string
231-
buffer = Buffer.from(accountInfo.data, 'base64');
232-
} else if (Array.isArray(accountInfo.data)) {
233-
// If it's a tuple [data, encoding]
234-
const [data] = accountInfo.data;
235-
236-
// we know encoding will be base58
237-
// Convert base58 to buffer using bs58
238-
buffer = Buffer.from(bs58.decode(data));
239-
}
263+
if (Number(newSlot) < this.bufferAndSlot.slot) {
264+
return;
240265
}
241266

242-
if (buffer) {
243-
const account = this.decodeBuffer(buffer);
267+
const oldBuffer = this.bufferAndSlot.buffer;
268+
if (newBuffer && (!oldBuffer || !newBuffer.equals(oldBuffer))) {
269+
this.bufferAndSlot = {
270+
buffer: newBuffer,
271+
slot: Number(newSlot),
272+
};
273+
const account = this.decodeBuffer(newBuffer);
244274
this.dataAndSlot = {
245275
data: account,
246276
slot: Number(newSlot),
@@ -249,21 +279,6 @@ export class WebSocketAccountSubscriberV2Gill<T>
249279
}
250280
}
251281

252-
// Helper method to convert base58 to base64
253-
private base58ToBase64(base58String: string): string {
254-
try {
255-
// Decode base58 to buffer then encode to base64
256-
const buffer = new Buffer(bs58.decode(base58String));
257-
return buffer.toString('base64');
258-
} catch (error) {
259-
console.error(
260-
`[${this.logAccountName}] Base58 conversion failed:`,
261-
error
262-
);
263-
throw error;
264-
}
265-
}
266-
267282
decodeBuffer(buffer: Buffer): T {
268283
if (this.decodeBufferFn) {
269284
return this.decodeBufferFn(buffer);
@@ -283,25 +298,15 @@ export class WebSocketAccountSubscriberV2Gill<T>
283298
clearTimeout(this.timeoutId);
284299
this.timeoutId = undefined;
285300

286-
if (this.listenerId != null) {
287-
// For gill subscriptions, we need to handle cleanup differently
288-
// Since we don't have a direct unsubscribe method, we'll just mark as unsubscribed
289-
const promise = Promise.resolve()
290-
.then(() => {
291-
this.listenerId = undefined;
292-
this.isUnsubscribing = false;
293-
})
294-
.catch((error) => {
295-
console.error(
296-
`[${this.logAccountName}] Unsubscribe failed, forcing cleanup - listenerId=${this.listenerId}, isUnsubscribing=${this.isUnsubscribing}`,
297-
error
298-
);
299-
this.listenerId = undefined;
300-
this.isUnsubscribing = false;
301-
});
302-
return promise;
303-
} else {
304-
this.isUnsubscribing = false;
301+
// Abort the WebSocket subscription
302+
if (this.abortController) {
303+
this.abortController.abort('unsubscribing');
304+
this.abortController = undefined;
305305
}
306+
307+
this.listenerId = undefined;
308+
this.isUnsubscribing = false;
309+
310+
return Promise.resolve();
306311
}
307312
}

sdk/src/driftClient/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ export class DriftClient implements IDriftClient {
450450
logResubMessages: config.accountSubscription?.logResubMessages,
451451
},
452452
config.accountSubscription?.commitment,
453-
WebSocketAccountSubscriberV2Gill
453+
config.accountSubscription?.accountSubscriber
454454
);
455455
}
456456
this.eventEmitter = this.accountSubscriber.eventEmitter;

sdk/src/driftClientConfig.ts

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,14 @@ import { BulkAccountLoader } from './accounts/bulkAccountLoader/bulkAccountLoade
1111
import { DriftEnv } from './config/types';
1212
import { TxSender } from './tx/types';
1313
import { TxHandler, TxHandlerConfig } from './tx/txHandler';
14-
import { DelistedMarketSetting, GrpcConfigs } from './accounts/types';
15-
import { Coder } from '@coral-xyz/anchor';
14+
import {
15+
GrpcConfigs,
16+
ResubOpts,
17+
DelistedMarketSetting,
18+
} from './accounts/types';
19+
import { Coder, Program } from '@coral-xyz/anchor';
20+
import { WebSocketAccountSubscriber } from './accounts/webSocketAccountSubscriber';
21+
import { WebSocketAccountSubscriberV2Gill } from './accounts/webSocketAccountSubscriberV2Gill';
1622

1723
export type DriftClientConfig = {
1824
connection: Connection;
@@ -57,6 +63,16 @@ export type DriftClientSubscriptionConfig =
5763
resubTimeoutMs?: number;
5864
logResubMessages?: boolean;
5965
commitment?: Commitment;
66+
accountSubscriber?: new (
67+
accountName: string,
68+
program: Program,
69+
accountPublicKey: PublicKey,
70+
decodeBuffer?: (buffer: Buffer) => any,
71+
resubOpts?: ResubOpts,
72+
commitment?: Commitment
73+
) =>
74+
| WebSocketAccountSubscriberV2Gill<any>
75+
| WebSocketAccountSubscriber<any>;
6076
}
6177
| {
6278
type: 'polling';

0 commit comments

Comments
 (0)