From 91ee04e9d8ceeb5bad8d0053bc6f9ec7ee71ec67 Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Fri, 10 Jan 2025 15:27:37 -0800 Subject: [PATCH 1/4] new example for sub Raydium new pool --- 10-example-subRaydiumNewPool/README.md | 36 +++ 10-example-subRaydiumNewPool/index.ts | 331 +++++++++++++++++++++++++ 2 files changed, 367 insertions(+) create mode 100644 10-example-subRaydiumNewPool/README.md create mode 100644 10-example-subRaydiumNewPool/index.ts diff --git a/10-example-subRaydiumNewPool/README.md b/10-example-subRaydiumNewPool/README.md new file mode 100644 index 0000000..516c11f --- /dev/null +++ b/10-example-subRaydiumNewPool/README.md @@ -0,0 +1,36 @@ +subscribes Raydium new pool + +This example subscribes to raydiumLiquidityPoolv4 new pool based on transactions filter conditions. + +Run using “npm start”, the output should be as follows:: + +```bash +{ + id: '7RvCz29ADuNRVhfKaz3bEqCGQU883fBz73zDfPjAKxJp', + baseMint: '558ELUVzEwiyaP81cNCbdr2xCM4GeYqwKLmPxZr93FF', + quoteMint: 'So11111111111111111111111111111111111111112', + lpMint: 'ApyQukcZxFeEGFHrAjogvUC7VGAzhFHVaqaaCVLv8smK', + baseDecimals: 9, + quoteDecimals: 9, + lpDecimals: 9, + version: 4, + programId: '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8', + authority: '5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1', + openOrders: '81WGTTjWW7m5JzCVtT89bGv73b3c2M4arg7PFqSEJinp', + targetOrders: '9KBQPPNmSSRRVfVppuGTHCpnH3yJ9wqmuYtU76z4q4ov', + baseVault: 'EJn65uzLh5hJ3UNZ7itrCc8CG1kgPmmf1FBot4hEbHBQ', + quoteVault: 'CfCEDB6Tr6tHZ28wfyYf4UmdQ45SFWZLvuN6t9RwRGtE', + withdrawQueue: '11111111111111111111111111111111', + lpVault: '11111111111111111111111111111111', + marketVersion: 3, + marketProgramId: 'srmqPvymJeFKQ4zGQed1GFppgkRHL9kaELCbyksJtPX', + marketId: 'bW3feQyNjjBvK1GnYg5MwKskZVMGfv6qsg2m5q7TmeD', + marketAuthority: 'FMHeesi6g72GA6mju247RTwQBmyTgjs7SCoW4Cavwvnb', + marketBaseVault: '4ZVNYCmWF7LmxNRPibfdZ8ZcX6Bhn5Q6s9eZ36J9VMFY', + marketQuoteVault: 'HjwE6ibPEUSEqJAeduYqzzXH8XsT4Cwmz5wL45ywFFob', + marketBids: 'FJkPv9yJGdiDb65EEY64ZH2vaeKwqvy9TUA2tUbooNVr', + marketAsks: '8pfZ6wSP5PbkFhFfU5DMcYKPBtAEFmEjQ7c6zCVVKW9C', + marketEventQueue: 'H4TCk4yeBWPjuQxCTwNi7JB2mRuBZE4qXFDtZrTMWQhQ', + lookupTableAccount: '11111111111111111111111111111111' + } +``` diff --git a/10-example-subRaydiumNewPool/index.ts b/10-example-subRaydiumNewPool/index.ts new file mode 100644 index 0000000..429ec2d --- /dev/null +++ b/10-example-subRaydiumNewPool/index.ts @@ -0,0 +1,331 @@ +import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; +import bs58 from "bs58"; +import net from 'net'; +import { web3,AnchorProvider, Program, Wallet } from "@coral-xyz/anchor"; +import {PublicKey, Connection} from "@solana/web3.js"; +import {getKeypairFromEnvironment} from "@solana-developers/helpers"; +import * as fs from "fs"; +import * as path from "path"; +import "dotenv/config"; +import { ApiPoolInfoV4, Market, MARKET_STATE_LAYOUT_V3, SPL_MINT_LAYOUT } from "@raydium-io/raydium-sdk-v2"; + + +const RAYDIUM_PROGRAM_ID = '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8'; +const RAYDIUM_CREATEPOOL_FEE_ACCOUNT = '7YttLkHDoNj9wyDur5pM1ejNaAvT9X4eqaYcHQqtj2G5' + +const GRPC_URL = "https://testgrpc.chainbuff.com"; +const MAINNET_URL = "https://api.mainnet-beta.solana.com"; +const connection = new Connection('https://api.mainnet-beta.solana.com', 'confirmed'); +const SOL_MINT = "So11111111111111111111111111111111111111112"; + + +let latestBlockHash: string = ""; + +class RadiumSwapSubscriber { + private bondingCurveSet: Set; + private stream: any; + private client: Client | null = null; + private pingInterval: NodeJS.Timeout | null = null; + private isCleaningUp = false; // 添加清理状态标志 + private isReconnecting = false; // 添加重连状态标志 + private processingQueue: Array = []; + private isProcessing: boolean = false; + + + + constructor() { + } + + + public async cleanup() { + if (this.isCleaningUp) return; + try { + this.isCleaningUp = true; + console.log('Starting cleanup process...'); + + if (this.pingInterval) { + clearInterval(this.pingInterval); + this.pingInterval = null; + console.log('1. Ping interval cleared'); + } + + if (this.stream) { + try { + this.stream.on('error', (error) => { + if (error.code === 1 && error.details === 'Cancelled on client') { + console.log('Expected cancellation error, safely ignored'); + } else { + console.error('Stream error during cleanup:', error); + } + }); + + this.stream.removeAllListeners('data'); + this.stream.removeAllListeners('end'); + this.stream.removeAllListeners('close'); + console.log('2. Stream listeners removed'); + + this.stream.cancel(); + await new Promise(resolve => setTimeout(resolve, 1000)); + } catch (error) { + console.error('Error during stream cleanup:', error); + } finally { + this.stream = null; + console.log('3. Stream cancelled and cleared'); + } + } + + if (this.client) { + this.client = null; + console.log('4. Client reference cleared'); + } + + console.log('Cleanup completed successfully'); + } finally { + this.isCleaningUp = false; + } + } + + async listen() { + try { + console.log("Subscribing to event stream with new mint"); + + this.client = new Client(GRPC_URL, undefined, { + "grpc.max_receive_message_length": 100 * 1024 * 1024, // 10MiB + }); + + this.stream = await this.client.subscribe(); + + this.setupStreamListeners(); + + const request: SubscribeRequest = { + slots: {}, + accounts: {}, + transactions: { + transactionsSubKey: { + accountInclude: [RAYDIUM_CREATEPOOL_FEE_ACCOUNT], + accountExclude: [], + accountRequired: [] + } + }, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + entry: {}, + commitment: CommitmentLevel.CONFIRMED + }; + + await new Promise((resolve, reject) => { + this.stream.write(request, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }); + + const pingRequest: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: {}, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + entry: {}, + accountsDataSlice: [], + commitment: undefined, + ping: { id: 1 }, + }; + + this.pingInterval = setInterval(async () => { + try { + if (this.stream) { + await new Promise((resolve, reject) => { + this.stream.write(pingRequest, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }); + } + } catch (error) { + console.error('Ping error:', error); + this.handleStreamError(); + } + }, 5000); + + } catch (error) { + console.error('Error in listen:', error); + throw error; + } + } + + private setupStreamListeners() { + if (!this.stream) return; + + this.stream.on('error', (error) => { + console.error('Stream error:', error); + this.handleStreamError(); + }); + + this.stream.on('end', () => { + console.log('Stream ended'); + this.handleStreamEnd(); + }); + + this.stream.on('close', () => { + console.log('Stream closed'); + this.handleStreamClose(); + }); + + this.stream.on("data", async (data) => { + if (data) { + this.processSaveNewMint(data); + } + }); + } + + private async handleStreamError() { + if (this.isCleaningUp || this.isReconnecting) return; + + try { + this.isReconnecting = true; + console.log('Attempting to reconnect due to stream error...'); + await this.cleanup(); + + setTimeout(async () => { + try { + await this.listen(); + console.log('Successfully reconnected'); + } catch (error) { + console.error('Failed to reconnect:', error); + // 重置状态并重试 + this.isReconnecting = false; + this.handleStreamError(); + } + }, 5000); + } finally { + this.isReconnecting = false; + } + } + + private async handleStreamEnd() { + if (this.isCleaningUp || this.isReconnecting) return; + console.log('Stream ended, checking if reconnection needed...'); + + if (this.client) { + await this.handleStreamError(); + } + } + + private async handleStreamClose() { + if (this.isCleaningUp || this.isReconnecting) return; + console.log('Stream closed'); + + if (this.client) { + this.client = null; + } + } + + + private slotExists(slot: number): boolean { + //return leaderSchedule.has(slot); + return true + } + + private async processSaveNewMint(data: any){ + if (!data.filters.includes('transactionsSubKey')) return undefined + + const info = data.transaction + if (info.transaction.meta.err !== undefined) return undefined + + const formatData: { + updateTime: number, slot: number, txid: string, poolInfos: ApiPoolInfoV4[] + } = { + updateTime: new Date().getTime(), + slot: info.slot, + txid: bs58.encode(info.transaction.signature), + poolInfos: [] + } + + const accounts = info.transaction.transaction.message.accountKeys.map((i: Buffer) => bs58.encode(i)) + for (const item of [...info.transaction.transaction.message.instructions, ...info.transaction.meta.innerInstructions.map((i: any) => i.instructions).flat()]) { + if (accounts[item.programIdIndex] !== RAYDIUM_PROGRAM_ID) continue + + //if ([...(item.data as Buffer).values()][0] != 1) continue + if (Array.from(item.data as Buffer)[0] !== 1) continue; + + //const keyIndex = [...(item.accounts as Buffer).values()] + const keyIndex = Buffer.from(item.accounts as Buffer); + + const startTime = new Date().getTime() + console.info(new Date().toJSON(), 'new pool Id: ', accounts[keyIndex[4]]); + + const [baseMintAccount, quoteMintAccount, marketAccount] = await connection.getMultipleAccountsInfo([ + new PublicKey(accounts[keyIndex[8]]), + new PublicKey(accounts[keyIndex[9]]), + new PublicKey(accounts[keyIndex[16]]), + ]) + + if (baseMintAccount === null || quoteMintAccount === null || marketAccount === null) throw Error('get account info error') + + const baseMintInfo = SPL_MINT_LAYOUT.decode(baseMintAccount.data) + const quoteMintInfo = SPL_MINT_LAYOUT.decode(quoteMintAccount.data) + const marketInfo = MARKET_STATE_LAYOUT_V3.decode(marketAccount.data) + + formatData.poolInfos.push({ + id: accounts[keyIndex[4]], + baseMint: accounts[keyIndex[8]], + quoteMint: accounts[keyIndex[9]], + lpMint: accounts[keyIndex[7]], + baseDecimals: baseMintInfo.decimals, + quoteDecimals: quoteMintInfo.decimals, + lpDecimals: baseMintInfo.decimals, + version: 4, + programId: RAYDIUM_PROGRAM_ID, + authority: accounts[keyIndex[5]], + openOrders: accounts[keyIndex[6]], + targetOrders: accounts[keyIndex[12]], + baseVault: accounts[keyIndex[10]], + quoteVault: accounts[keyIndex[11]], + withdrawQueue: PublicKey.default.toString(), + lpVault: PublicKey.default.toString(), + marketVersion: 3, + marketProgramId: marketAccount.owner.toString(), + marketId: accounts[keyIndex[16]], + marketAuthority: Market.getAssociatedAuthority({ programId: marketAccount.owner, marketId: new PublicKey(accounts[keyIndex[16]]) }).publicKey.toString(), + marketBaseVault: marketInfo.baseVault.toString(), + marketQuoteVault: marketInfo.quoteVault.toString(), + marketBids: marketInfo.bids.toString(), + marketAsks: marketInfo.asks.toString(), + marketEventQueue: marketInfo.eventQueue.toString(), + lookupTableAccount: PublicKey.default.toString() + }) + } + + + const poolInfo = formatData.poolInfos[0]; + + console.info(poolInfo) + + + return poolInfo + } + } + +async function main() { + const subscriber = new RadiumSwapSubscriber(); + + process.on('SIGINT', async () => { + console.log('Received SIGINT. Cleaning up...'); + await subscriber.cleanup(); + process.exit(0); + }); + + await subscriber.listen(); +} + +main().catch(console.error); \ No newline at end of file From b687c91ebd58b3663a7d40c3c3141b912f5f55d6 Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Mon, 27 Jan 2025 10:47:34 -0800 Subject: [PATCH 2/4] Modify the code to simple --- 10-example-subRaydiumNewPool/index.ts | 462 +++++++++----------------- 1 file changed, 153 insertions(+), 309 deletions(-) diff --git a/10-example-subRaydiumNewPool/index.ts b/10-example-subRaydiumNewPool/index.ts index 429ec2d..73fac78 100644 --- a/10-example-subRaydiumNewPool/index.ts +++ b/10-example-subRaydiumNewPool/index.ts @@ -1,331 +1,175 @@ import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; -import bs58 from "bs58"; -import net from 'net'; -import { web3,AnchorProvider, Program, Wallet } from "@coral-xyz/anchor"; -import {PublicKey, Connection} from "@solana/web3.js"; -import {getKeypairFromEnvironment} from "@solana-developers/helpers"; -import * as fs from "fs"; -import * as path from "path"; +import { PublicKey, Connection } from "@solana/web3.js"; +import { ApiPoolInfoV4, Market, MARKET_STATE_LAYOUT_V3, SPL_MINT_LAYOUT } from "@raydium-io/raydium-sdk"; import "dotenv/config"; -import { ApiPoolInfoV4, Market, MARKET_STATE_LAYOUT_V3, SPL_MINT_LAYOUT } from "@raydium-io/raydium-sdk-v2"; - +import bs58 from "bs58"; const RAYDIUM_PROGRAM_ID = '675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8'; -const RAYDIUM_CREATEPOOL_FEE_ACCOUNT = '7YttLkHDoNj9wyDur5pM1ejNaAvT9X4eqaYcHQqtj2G5' - -const GRPC_URL = "https://testgrpc.chainbuff.com"; -const MAINNET_URL = "https://api.mainnet-beta.solana.com"; +const RAYDIUM_POOL_FEE_ID = '7YttLkHDoNj9wyDur5pM1ejNaAvT9X4eqaYcHQqtj2G5'; const connection = new Connection('https://api.mainnet-beta.solana.com', 'confirmed'); -const SOL_MINT = "So11111111111111111111111111111111111111112"; - - -let latestBlockHash: string = ""; - -class RadiumSwapSubscriber { - private bondingCurveSet: Set; - private stream: any; - private client: Client | null = null; - private pingInterval: NodeJS.Timeout | null = null; - private isCleaningUp = false; // 添加清理状态标志 - private isReconnecting = false; // 添加重连状态标志 - private processingQueue: Array = []; - private isProcessing: boolean = false; - - - - constructor() { - } - - - public async cleanup() { - if (this.isCleaningUp) return; - try { - this.isCleaningUp = true; - console.log('Starting cleanup process...'); - - if (this.pingInterval) { - clearInterval(this.pingInterval); - this.pingInterval = null; - console.log('1. Ping interval cleared'); - } - - if (this.stream) { - try { - this.stream.on('error', (error) => { - if (error.code === 1 && error.details === 'Cancelled on client') { - console.log('Expected cancellation error, safely ignored'); - } else { - console.error('Stream error during cleanup:', error); - } - }); - this.stream.removeAllListeners('data'); - this.stream.removeAllListeners('end'); - this.stream.removeAllListeners('close'); - console.log('2. Stream listeners removed'); +async function main() { - this.stream.cancel(); - await new Promise(resolve => setTimeout(resolve, 1000)); - } catch (error) { - console.error('Error during stream cleanup:', error); - } finally { - this.stream = null; - console.log('3. Stream cancelled and cleared'); - } + // 创建订阅客户端 + // const client = new Client( + // 如遇到TypeError: Client is not a constructor错误 + // 请使用以下方式创建 + // 见 https://github.com/rpcpool/yellowstone-grpc/issues/428 + // @ts-ignore + const client = new Client.default( + "https://test-grpc.chainbuff.com", + undefined, + { + "grpc.max_receive_message_length": 16 * 1024 * 1024, // 16MB + } + ); + + // 创建订阅数据流 + const stream = await client.subscribe(); + + // 创建订阅请求 + const request: SubscribeRequest = { + slots: {}, + accounts: {}, + transactions: { + transactionsSubKey: { + accountInclude: [RAYDIUM_POOL_FEE_ID], + accountExclude: [], + accountRequired: [] } - - if (this.client) { - this.client = null; - console.log('4. Client reference cleared'); + }, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + entry: {}, + commitment: CommitmentLevel.CONFIRMED + }; + + // 发送订阅请求 + await new Promise((resolve, reject) => { + stream.write(request, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); } - - console.log('Cleanup completed successfully'); - } finally { - this.isCleaningUp = false; - } - } - - async listen() { - try { - console.log("Subscribing to event stream with new mint"); - - this.client = new Client(GRPC_URL, undefined, { - "grpc.max_receive_message_length": 100 * 1024 * 1024, // 10MiB - }); - - this.stream = await this.client.subscribe(); - - this.setupStreamListeners(); - - const request: SubscribeRequest = { - slots: {}, - accounts: {}, - transactions: { - transactionsSubKey: { - accountInclude: [RAYDIUM_CREATEPOOL_FEE_ACCOUNT], - accountExclude: [], - accountRequired: [] - } - }, - transactionsStatus: {}, - blocks: {}, - blocksMeta: {}, - accountsDataSlice: [], - entry: {}, - commitment: CommitmentLevel.CONFIRMED - }; - - await new Promise((resolve, reject) => { - this.stream.write(request, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }); - - const pingRequest: SubscribeRequest = { - accounts: {}, - slots: {}, - transactions: {}, - transactionsStatus: {}, - blocks: {}, - blocksMeta: {}, - entry: {}, - accountsDataSlice: [], - commitment: undefined, - ping: { id: 1 }, - }; - - this.pingInterval = setInterval(async () => { - try { - if (this.stream) { - await new Promise((resolve, reject) => { - this.stream.write(pingRequest, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }); - } - } catch (error) { - console.error('Ping error:', error); - this.handleStreamError(); - } - }, 5000); - - } catch (error) { - console.error('Error in listen:', error); - throw error; - } - } - - private setupStreamListeners() { - if (!this.stream) return; - - this.stream.on('error', (error) => { - console.error('Stream error:', error); - this.handleStreamError(); - }); - - this.stream.on('end', () => { - console.log('Stream ended'); - this.handleStreamEnd(); - }); - - this.stream.on('close', () => { - console.log('Stream closed'); - this.handleStreamClose(); }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); - this.stream.on("data", async (data) => { - if (data) { - this.processSaveNewMint(data); + // 获取订阅数据 + stream.on("data", async (data) => { + if (data?.transaction) { + if (!data.filters.includes('transactionsSubKey')) return undefined + + const info = data.transaction + if (info.transaction.meta.err !== undefined) return undefined + + const formatData: { + updateTime: number, slot: number, txid: string, poolInfos: ApiPoolInfoV4[] + } = { + updateTime: new Date().getTime(), + slot: info.slot, + txid: bs58.encode(info.transaction.signature), + poolInfos: [] } - }); - } - private async handleStreamError() { - if (this.isCleaningUp || this.isReconnecting) return; - - try { - this.isReconnecting = true; - console.log('Attempting to reconnect due to stream error...'); - await this.cleanup(); - - setTimeout(async () => { - try { - await this.listen(); - console.log('Successfully reconnected'); - } catch (error) { - console.error('Failed to reconnect:', error); - // 重置状态并重试 - this.isReconnecting = false; - this.handleStreamError(); - } - }, 5000); - } finally { - this.isReconnecting = false; - } - } + const accounts = info.transaction.transaction.message.accountKeys.map((i: Buffer) => bs58.encode(i)) + for (const item of [...info.transaction.transaction.message.instructions, ...info.transaction.meta.innerInstructions.map((i: any) => i.instructions).flat()]) { + if (accounts[item.programIdIndex] !== RAYDIUM_PROGRAM_ID) continue + + //if ([...(item.data as Buffer).values()][0] != 1) continue + if (Array.from(item.data as Buffer)[0] !== 1) continue; + + //const keyIndex = [...(item.accounts as Buffer).values()] + const keyIndex = Buffer.from(item.accounts as Buffer); + + const startTime = new Date().getTime() + console.info(new Date().toJSON(), 'new pool Id: ', accounts[keyIndex[4]]); + + const [baseMintAccount, quoteMintAccount, marketAccount] = await connection.getMultipleAccountsInfo([ + new PublicKey(accounts[keyIndex[8]]), + new PublicKey(accounts[keyIndex[9]]), + new PublicKey(accounts[keyIndex[16]]), + ]) + + if (baseMintAccount === null || quoteMintAccount === null || marketAccount === null) throw Error('get account info error') + + const baseMintInfo = SPL_MINT_LAYOUT.decode(baseMintAccount.data) + const quoteMintInfo = SPL_MINT_LAYOUT.decode(quoteMintAccount.data) + const marketInfo = MARKET_STATE_LAYOUT_V3.decode(marketAccount.data) + + formatData.poolInfos.push({ + id: accounts[keyIndex[4]], + baseMint: accounts[keyIndex[8]], + quoteMint: accounts[keyIndex[9]], + lpMint: accounts[keyIndex[7]], + baseDecimals: baseMintInfo.decimals, + quoteDecimals: quoteMintInfo.decimals, + lpDecimals: baseMintInfo.decimals, + version: 4, + programId: RAYDIUM_PROGRAM_ID, + authority: accounts[keyIndex[5]], + openOrders: accounts[keyIndex[6]], + targetOrders: accounts[keyIndex[12]], + baseVault: accounts[keyIndex[10]], + quoteVault: accounts[keyIndex[11]], + withdrawQueue: PublicKey.default.toString(), + lpVault: PublicKey.default.toString(), + marketVersion: 3, + marketProgramId: marketAccount.owner.toString(), + marketId: accounts[keyIndex[16]], + marketAuthority: Market.getAssociatedAuthority({ programId: marketAccount.owner, marketId: new PublicKey(accounts[keyIndex[16]]) }).publicKey.toString(), + marketBaseVault: marketInfo.baseVault.toString(), + marketQuoteVault: marketInfo.quoteVault.toString(), + marketBids: marketInfo.bids.toString(), + marketAsks: marketInfo.asks.toString(), + marketEventQueue: marketInfo.eventQueue.toString(), + lookupTableAccount: PublicKey.default.toString() + }) + } - private async handleStreamEnd() { - if (this.isCleaningUp || this.isReconnecting) return; - console.log('Stream ended, checking if reconnection needed...'); - - if (this.client) { - await this.handleStreamError(); - } - } - private async handleStreamClose() { - if (this.isCleaningUp || this.isReconnecting) return; - console.log('Stream closed'); - - if (this.client) { - this.client = null; - } - } - - private slotExists(slot: number): boolean { - //return leaderSchedule.has(slot); - return true - } - - private async processSaveNewMint(data: any){ - if (!data.filters.includes('transactionsSubKey')) return undefined + const poolInfo = formatData.poolInfos[0]; - const info = data.transaction - if (info.transaction.meta.err !== undefined) return undefined - - const formatData: { - updateTime: number, slot: number, txid: string, poolInfos: ApiPoolInfoV4[] - } = { - updateTime: new Date().getTime(), - slot: info.slot, - txid: bs58.encode(info.transaction.signature), - poolInfos: [] - } - - const accounts = info.transaction.transaction.message.accountKeys.map((i: Buffer) => bs58.encode(i)) - for (const item of [...info.transaction.transaction.message.instructions, ...info.transaction.meta.innerInstructions.map((i: any) => i.instructions).flat()]) { - if (accounts[item.programIdIndex] !== RAYDIUM_PROGRAM_ID) continue - - //if ([...(item.data as Buffer).values()][0] != 1) continue - if (Array.from(item.data as Buffer)[0] !== 1) continue; - - //const keyIndex = [...(item.accounts as Buffer).values()] - const keyIndex = Buffer.from(item.accounts as Buffer); - - const startTime = new Date().getTime() - console.info(new Date().toJSON(), 'new pool Id: ', accounts[keyIndex[4]]); - - const [baseMintAccount, quoteMintAccount, marketAccount] = await connection.getMultipleAccountsInfo([ - new PublicKey(accounts[keyIndex[8]]), - new PublicKey(accounts[keyIndex[9]]), - new PublicKey(accounts[keyIndex[16]]), - ]) - - if (baseMintAccount === null || quoteMintAccount === null || marketAccount === null) throw Error('get account info error') - - const baseMintInfo = SPL_MINT_LAYOUT.decode(baseMintAccount.data) - const quoteMintInfo = SPL_MINT_LAYOUT.decode(quoteMintAccount.data) - const marketInfo = MARKET_STATE_LAYOUT_V3.decode(marketAccount.data) - - formatData.poolInfos.push({ - id: accounts[keyIndex[4]], - baseMint: accounts[keyIndex[8]], - quoteMint: accounts[keyIndex[9]], - lpMint: accounts[keyIndex[7]], - baseDecimals: baseMintInfo.decimals, - quoteDecimals: quoteMintInfo.decimals, - lpDecimals: baseMintInfo.decimals, - version: 4, - programId: RAYDIUM_PROGRAM_ID, - authority: accounts[keyIndex[5]], - openOrders: accounts[keyIndex[6]], - targetOrders: accounts[keyIndex[12]], - baseVault: accounts[keyIndex[10]], - quoteVault: accounts[keyIndex[11]], - withdrawQueue: PublicKey.default.toString(), - lpVault: PublicKey.default.toString(), - marketVersion: 3, - marketProgramId: marketAccount.owner.toString(), - marketId: accounts[keyIndex[16]], - marketAuthority: Market.getAssociatedAuthority({ programId: marketAccount.owner, marketId: new PublicKey(accounts[keyIndex[16]]) }).publicKey.toString(), - marketBaseVault: marketInfo.baseVault.toString(), - marketQuoteVault: marketInfo.quoteVault.toString(), - marketBids: marketInfo.bids.toString(), - marketAsks: marketInfo.asks.toString(), - marketEventQueue: marketInfo.eventQueue.toString(), - lookupTableAccount: PublicKey.default.toString() - }) + console.info(poolInfo) } + }); - - const poolInfo = formatData.poolInfos[0]; + // 为保证连接稳定,需要定期向服务端发送ping请求以维持连接 + const pingRequest: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: {}, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + entry: {}, + accountsDataSlice: [], + commitment: undefined, + ping: { id: 1 }, + }; + // 每5秒发送一次ping请求 + setInterval(async () => { + await new Promise((resolve, reject) => { + stream.write(pingRequest, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); + }, 5000); +} - console.info(poolInfo) - - - return poolInfo - } - } -async function main() { - const subscriber = new RadiumSwapSubscriber(); - - process.on('SIGINT', async () => { - console.log('Received SIGINT. Cleaning up...'); - await subscriber.cleanup(); - process.exit(0); - }); - - await subscriber.listen(); -} -main().catch(console.error); \ No newline at end of file +main(); \ No newline at end of file From 2386268aef903c4b11373279e3e76fc3274e4cae Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Mon, 27 Jan 2025 11:00:13 -0800 Subject: [PATCH 3/4] Modify the code to simple --- 09-example-subRaydiumPrice/README.md | 21 +++++ 09-example-subRaydiumPrice/index.ts | 132 +++++++++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 09-example-subRaydiumPrice/README.md create mode 100644 09-example-subRaydiumPrice/index.ts diff --git a/09-example-subRaydiumPrice/README.md b/09-example-subRaydiumPrice/README.md new file mode 100644 index 0000000..4b25a25 --- /dev/null +++ b/09-example-subRaydiumPrice/README.md @@ -0,0 +1,21 @@ +# subscribes Raydium price + +This example subscribes to raydiumLiquidityPoolv4 pool transactions based on transactions filter conditions. By monitoring transactions of the raydiumLiquidityPoolv4 account, it obtains price change information for Raydium trading pairs. + +Run using “npm start”, the output should be as follows:: + +```bash +[20:50:09.323] INFO: 3ef48CZQ7nAKKL6WGckFw9mhgXqA3Y9GvfY4qqsZHgKhD4BkchMx3KHL9ygbpJrM8mSi65u4hcfuMWBtDiFNgyjv : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.323] INFO: 31zV68Gnt67ap1fC1SGnv9n7xFTEETwsPCEqfAXJYxmpBYajLVSG1mP1uUTnMDoS2hPJsVbvFdLBXNPc87gtirPx : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.324] INFO: 66aZkhCvs47mVsjmQPKS9TLd81ZKKupyazPGsUz4gUqdzvtCkhyVZ4HH3LGxb51fxqaVnzjMTrdpWzEuRCYaCtqF : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.325] INFO: 2x98ttkvLtaWUyTBZdUFgpkMYFJ7W9AuF6YBrTuB6SG9YMiLS9LoYA1iTV5G8ZWB3LYstJJhRMHGK6tGBM9DEvpZ : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.325] INFO: 5AdiGuDXyzbc6bDXburrvufrch3Q3qEjmSTZNVhCtLJMDZD4tL2H17N5Lbn3Cie482ZD1AynhMwHS5km2RRunriE : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.329] INFO: 5CnHXDcUr6VYu9LDmYQsdfext5LFm53UjgBBvHcxbuFosT3HKVqy8oM4x8yt3g4re7DzPjqAFPe99W8yYke9uiUf : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +[20:50:09.329] INFO: 3QtbbN9ts5TcstXgqCmKDe4mD1225CTLdNLc5G6dKVBZGDyQNBduY4jfveqWK3QmmfcWhnMwct72zQxQdypGSjXR : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +^C[20:50:09.330] INFO: 65fE5d83qKJ4sZ9PYeMnerHUTekfF5kTqGoUGn6nWB2NTt3Yb4LggKGLSh2mAQCuchgPgGAAwbMpoKfJUvrKfVp5 : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 +``` + +Output data explanation: +First column: signature - Transaction signature +Second column: mint - Trading pair mint +Third column: price for sol \ No newline at end of file diff --git a/09-example-subRaydiumPrice/index.ts b/09-example-subRaydiumPrice/index.ts new file mode 100644 index 0000000..26f5379 --- /dev/null +++ b/09-example-subRaydiumPrice/index.ts @@ -0,0 +1,132 @@ +import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; +import "dotenv/config"; +import bs58 from "bs58"; + +const RAYDIUM_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; +const SOL_MINT = "So11111111111111111111111111111111111111112"; +const RAYDIUM_AUTHORITY = "5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1"; + +async function main() { + + // 创建订阅客户端 + // const client = new Client( + // 如遇到TypeError: Client is not a constructor错误 + // 请使用以下方式创建 + // 见 https://github.com/rpcpool/yellowstone-grpc/issues/428 + // @ts-ignore + const client = new Client.default( + "https://test-grpc.chainbuff.com", + undefined, + { + "grpc.max_receive_message_length": 16 * 1024 * 1024, // 16MB + } + ); + + // 创建订阅数据流 + const stream = await client.subscribe(); + + // 创建订阅请求 + const request: SubscribeRequest = { + slots: {}, + accounts: {}, + transactions: { + transactionsSubKey: { + accountInclude: [RAYDIUM_PROGRAM_ID], + accountExclude: [], + accountRequired: [] + } + }, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + entry: {}, + commitment: CommitmentLevel.CONFIRMED + }; + + // 发送订阅请求 + await new Promise((resolve, reject) => { + stream.write(request, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); + + // 获取订阅数据 + stream.on("data", async (data) => { + if (data?.transaction) { + + const transaction = data.transaction.transaction; + if (!transaction) { + return; + } + const signature = bs58.encode(transaction.signature); + + const preTokenBalances = transaction.meta.preTokenBalances; + const postTokenBalances = transaction.meta.postTokenBalances; + let targetToken = "", postPoolSOL = 0, postPoolToken = 0, prePoolSOL = 0, prePoolToken = 0, side = ""; + for (const account of preTokenBalances) { + if (targetToken !== "" && prePoolSOL !== 0 && prePoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + prePoolSOL = account.uiTokenAmount.uiAmount; + } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + prePoolToken = account.uiTokenAmount.uiAmount; + } + } + for (const account of postTokenBalances) { + if (postPoolSOL !== 0 && postPoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; + if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { + postPoolSOL = account.uiTokenAmount.uiAmount; + } + if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { + postPoolToken = account.uiTokenAmount.uiAmount; + } + } + if (targetToken === "") return; + console.info(`${signature} : ${targetToken} : ${postPoolSOL / postPoolToken}`) + + } + }); + + // 为保证连接稳定,需要定期向服务端发送ping请求以维持连接 + const pingRequest: SubscribeRequest = { + accounts: {}, + slots: {}, + transactions: {}, + transactionsStatus: {}, + blocks: {}, + blocksMeta: {}, + entry: {}, + accountsDataSlice: [], + commitment: undefined, + ping: { id: 1 }, + }; + // 每5秒发送一次ping请求 + setInterval(async () => { + await new Promise((resolve, reject) => { + stream.write(pingRequest, (err) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }).catch((reason) => { + console.error(reason); + throw reason; + }); + }, 5000); +} + + + +main(); \ No newline at end of file From 8dd60433e6074782495e2fcad61256c6429fb51e Mon Sep 17 00:00:00 2001 From: jamesrealweb3 Date: Mon, 27 Jan 2025 11:05:56 -0800 Subject: [PATCH 4/4] change to the other version --- 09-example-subRaydiumPrice/README.md | 21 ----- 09-example-subRaydiumPrice/index.ts | 132 --------------------------- 2 files changed, 153 deletions(-) delete mode 100644 09-example-subRaydiumPrice/README.md delete mode 100644 09-example-subRaydiumPrice/index.ts diff --git a/09-example-subRaydiumPrice/README.md b/09-example-subRaydiumPrice/README.md deleted file mode 100644 index 4b25a25..0000000 --- a/09-example-subRaydiumPrice/README.md +++ /dev/null @@ -1,21 +0,0 @@ -# subscribes Raydium price - -This example subscribes to raydiumLiquidityPoolv4 pool transactions based on transactions filter conditions. By monitoring transactions of the raydiumLiquidityPoolv4 account, it obtains price change information for Raydium trading pairs. - -Run using “npm start”, the output should be as follows:: - -```bash -[20:50:09.323] INFO: 3ef48CZQ7nAKKL6WGckFw9mhgXqA3Y9GvfY4qqsZHgKhD4BkchMx3KHL9ygbpJrM8mSi65u4hcfuMWBtDiFNgyjv : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.323] INFO: 31zV68Gnt67ap1fC1SGnv9n7xFTEETwsPCEqfAXJYxmpBYajLVSG1mP1uUTnMDoS2hPJsVbvFdLBXNPc87gtirPx : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.324] INFO: 66aZkhCvs47mVsjmQPKS9TLd81ZKKupyazPGsUz4gUqdzvtCkhyVZ4HH3LGxb51fxqaVnzjMTrdpWzEuRCYaCtqF : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.325] INFO: 2x98ttkvLtaWUyTBZdUFgpkMYFJ7W9AuF6YBrTuB6SG9YMiLS9LoYA1iTV5G8ZWB3LYstJJhRMHGK6tGBM9DEvpZ : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.325] INFO: 5AdiGuDXyzbc6bDXburrvufrch3Q3qEjmSTZNVhCtLJMDZD4tL2H17N5Lbn3Cie482ZD1AynhMwHS5km2RRunriE : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.329] INFO: 5CnHXDcUr6VYu9LDmYQsdfext5LFm53UjgBBvHcxbuFosT3HKVqy8oM4x8yt3g4re7DzPjqAFPe99W8yYke9uiUf : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -[20:50:09.329] INFO: 3QtbbN9ts5TcstXgqCmKDe4mD1225CTLdNLc5G6dKVBZGDyQNBduY4jfveqWK3QmmfcWhnMwct72zQxQdypGSjXR : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -^C[20:50:09.330] INFO: 65fE5d83qKJ4sZ9PYeMnerHUTekfF5kTqGoUGn6nWB2NTt3Yb4LggKGLSh2mAQCuchgPgGAAwbMpoKfJUvrKfVp5 : 3TDdSCw5xgBa4g6jk5oaQFGAaBcsP1Md7vEovrWzi2dE : 0.000019119343005368142 -``` - -Output data explanation: -First column: signature - Transaction signature -Second column: mint - Trading pair mint -Third column: price for sol \ No newline at end of file diff --git a/09-example-subRaydiumPrice/index.ts b/09-example-subRaydiumPrice/index.ts deleted file mode 100644 index 26f5379..0000000 --- a/09-example-subRaydiumPrice/index.ts +++ /dev/null @@ -1,132 +0,0 @@ -import Client, { CommitmentLevel, SubscribeRequest } from "@triton-one/yellowstone-grpc"; -import "dotenv/config"; -import bs58 from "bs58"; - -const RAYDIUM_PROGRAM_ID = "675kPX9MHTjS2zt1qfr1NYHuzeLXfQM9H24wFSUt1Mp8"; -const SOL_MINT = "So11111111111111111111111111111111111111112"; -const RAYDIUM_AUTHORITY = "5Q544fKrFoe6tsEbD7S8EmxGTJYAKtTVhAW5Q5pge4j1"; - -async function main() { - - // 创建订阅客户端 - // const client = new Client( - // 如遇到TypeError: Client is not a constructor错误 - // 请使用以下方式创建 - // 见 https://github.com/rpcpool/yellowstone-grpc/issues/428 - // @ts-ignore - const client = new Client.default( - "https://test-grpc.chainbuff.com", - undefined, - { - "grpc.max_receive_message_length": 16 * 1024 * 1024, // 16MB - } - ); - - // 创建订阅数据流 - const stream = await client.subscribe(); - - // 创建订阅请求 - const request: SubscribeRequest = { - slots: {}, - accounts: {}, - transactions: { - transactionsSubKey: { - accountInclude: [RAYDIUM_PROGRAM_ID], - accountExclude: [], - accountRequired: [] - } - }, - transactionsStatus: {}, - blocks: {}, - blocksMeta: {}, - accountsDataSlice: [], - entry: {}, - commitment: CommitmentLevel.CONFIRMED - }; - - // 发送订阅请求 - await new Promise((resolve, reject) => { - stream.write(request, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }).catch((reason) => { - console.error(reason); - throw reason; - }); - - // 获取订阅数据 - stream.on("data", async (data) => { - if (data?.transaction) { - - const transaction = data.transaction.transaction; - if (!transaction) { - return; - } - const signature = bs58.encode(transaction.signature); - - const preTokenBalances = transaction.meta.preTokenBalances; - const postTokenBalances = transaction.meta.postTokenBalances; - let targetToken = "", postPoolSOL = 0, postPoolToken = 0, prePoolSOL = 0, prePoolToken = 0, side = ""; - for (const account of preTokenBalances) { - if (targetToken !== "" && prePoolSOL !== 0 && prePoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; - if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { - prePoolSOL = account.uiTokenAmount.uiAmount; - } - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { - prePoolToken = account.uiTokenAmount.uiAmount; - } - } - for (const account of postTokenBalances) { - if (postPoolSOL !== 0 && postPoolToken !== 0) break; // make sure we get the target token and pool sol balances and trader address only - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) targetToken = account.mint; - if (account.owner === RAYDIUM_AUTHORITY && account.mint === SOL_MINT) { - postPoolSOL = account.uiTokenAmount.uiAmount; - } - if (account.owner === RAYDIUM_AUTHORITY && account.mint !== SOL_MINT) { - postPoolToken = account.uiTokenAmount.uiAmount; - } - } - if (targetToken === "") return; - console.info(`${signature} : ${targetToken} : ${postPoolSOL / postPoolToken}`) - - } - }); - - // 为保证连接稳定,需要定期向服务端发送ping请求以维持连接 - const pingRequest: SubscribeRequest = { - accounts: {}, - slots: {}, - transactions: {}, - transactionsStatus: {}, - blocks: {}, - blocksMeta: {}, - entry: {}, - accountsDataSlice: [], - commitment: undefined, - ping: { id: 1 }, - }; - // 每5秒发送一次ping请求 - setInterval(async () => { - await new Promise((resolve, reject) => { - stream.write(pingRequest, (err) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }).catch((reason) => { - console.error(reason); - throw reason; - }); - }, 5000); -} - - - -main(); \ No newline at end of file