Real-time token monitoring and live data streaming
WebSocket features require authentication tokens. See the Authentication Guide to get your tokens.
import asyncio
from axiomtradeapi import AxiomTradeClient
async def handle_new_tokens(tokens):
"""Handle incoming token data"""
for token in tokens:
print(f"šØ NEW TOKEN: {token['tokenName']}")
print(f" Address: {token['tokenAddress']}")
print(f" Liquidity: {token.get('liquiditySol', 0):.2f} SOL")
print(f" Market Cap: {token.get('marketCapSol', 0):.2f} SOL")
print()
async def main():
# Initialize with authentication
client = AxiomTradeClient(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
print("š Monitoring new token launches...")
print("Press Ctrl+C to stop\n")
# Start WebSocket subscription
await client.subscribe_new_tokens(
callback=handle_new_tokens
)
if __name__ == "__main__":
asyncio.run(main())Build a production-ready token sniping bot with filters and analysis:
import asyncio
import logging
from datetime import datetime
from axiomtradeapi import AxiomTradeClient
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class TokenSniperBot:
"""Advanced token sniping bot with filters"""
def __init__(self, auth_token, refresh_token):
self.client = AxiomTradeClient(
auth_token=auth_token,
refresh_token=refresh_token,
log_level=logging.INFO
)
# Sniping criteria
self.min_liquidity = 10.0 # Minimum 10 SOL liquidity
self.min_market_cap = 5.0 # Minimum 5 SOL market cap
self.max_market_cap = 100.0 # Maximum 100 SOL (early stage)
# Statistics
self.tokens_seen = 0
self.tokens_qualified = 0
self.start_time = datetime.now()
logger.info("ā
Token Sniper Bot initialized")
async def analyze_token(self, token):
"""Analyze token and decide if it meets criteria"""
# Extract key metrics
name = token.get('tokenName', 'Unknown')
ticker = token.get('tokenTicker', 'N/A')
address = token.get('tokenAddress', 'N/A')
liquidity = token.get('liquiditySol', 0)
market_cap = token.get('marketCapSol', 0)
protocol = token.get('protocol', 'Unknown')
# Check if token meets criteria
meets_liquidity = liquidity >= self.min_liquidity
meets_market_cap = (
market_cap >= self.min_market_cap and
market_cap <= self.max_market_cap
)
is_qualified = meets_liquidity and meets_market_cap
# Calculate risk score (0-100, lower is better)
risk_score = 50
if liquidity < 5:
risk_score += 30
if market_cap < 1:
risk_score += 20
return {
'qualified': is_qualified,
'name': name,
'ticker': ticker,
'address': address,
'liquidity': liquidity,
'market_cap': market_cap,
'protocol': protocol,
'risk_score': risk_score
}
async def handle_new_tokens(self, tokens):
"""Process incoming token data"""
for token in tokens:
self.tokens_seen += 1
# Analyze the token
analysis = await self.analyze_token(token)
if analysis['qualified']:
self.tokens_qualified += 1
# Log qualified token
logger.info(f"šÆ QUALIFIED TOKEN FOUND: {analysis['name']}")
# Display detailed info
print(f"\n{'='*60}")
print(f"šØ QUALIFIED TOKEN ALERT")
print(f"{'='*60}")
print(f"Name: {analysis['name']} ({analysis['ticker']})")
print(f"Address: {analysis['address']}")
print(f"Liquidity: {analysis['liquidity']:.2f} SOL")
print(f"Market Cap: {analysis['market_cap']:.2f} SOL")
print(f"Protocol: {analysis['protocol']}")
print(f"Risk Score: {analysis['risk_score']}/100")
print(f"Time: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*60}\n")
# Here you would add your trading logic
await self.execute_snipe(analysis)
else:
# Log filtered tokens (optional, for debugging)
logger.debug(f"ā Filtered: {analysis['name']} (L:{analysis['liquidity']:.1f}, MC:{analysis['market_cap']:.1f})")
async def execute_snipe(self, token_analysis):
"""Execute the snipe (buy the token)"""
# IMPORTANT: Add your actual trading logic here
logger.info(f"šÆ Would execute snipe on {token_analysis['name']}")
# Example: Check wallet balance first
# balance = self.client.GetBalance(your_wallet)
# if balance['sol'] >= required_amount:
# execute_trade(token_analysis['address'])
def print_statistics(self):
"""Print bot statistics"""
runtime = (datetime.now() - self.start_time).total_seconds()
rate = self.tokens_seen / runtime if runtime > 0 else 0
print(f"\nš BOT STATISTICS")
print(f" Runtime: {runtime:.0f}s")
print(f" Tokens Seen: {self.tokens_seen}")
print(f" Qualified: {self.tokens_qualified}")
print(f" Rate: {rate:.2f} tokens/sec")
print(f" Hit Rate: {(self.tokens_qualified/self.tokens_seen*100):.1f}%")
async def run(self):
"""Start the sniping bot"""
print("šÆ Token Sniper Bot - ARMED AND READY")
print(f" Min Liquidity: {self.min_liquidity} SOL")
print(f" Market Cap Range: {self.min_market_cap}-{self.max_market_cap} SOL")
print("\nPress Ctrl+C to stop\n")
try:
await self.client.subscribe_new_tokens(
callback=self.handle_new_tokens
)
except KeyboardInterrupt:
print("\nā¹ļø Stopping bot...")
self.print_statistics()
# Run the bot
async def main():
bot = TokenSniperBot(
auth_token="your-auth-token",
refresh_token="your-refresh-token"
)
await bot.run()
if __name__ == "__main__":
asyncio.run(main())import asyncio
import logging
from axiomtradeapi import AxiomTradeClient
logger = logging.getLogger(__name__)
class RobustWebSocketHandler:
def __init__(self, auth_token, refresh_token):
self.client = AxiomTradeClient(
auth_token=auth_token,
refresh_token=refresh_token
)
self.reconnect_attempts = 0
self.max_reconnect_attempts = 5
self.reconnect_delay = 5 # seconds
async def handle_tokens(self, tokens):
"""Handle incoming tokens with error handling"""
try:
for token in tokens:
# Process token
await self.process_token(token)
except Exception as e:
logger.error(f"Error processing tokens: {e}")
async def handle_errors(self, error):
"""Handle WebSocket errors"""
logger.error(f"WebSocket error: {error}")
if "authentication" in str(error).lower():
logger.warning("Authentication error - refreshing tokens...")
# Implement token refresh logic
elif "connection" in str(error).lower():
logger.warning("Connection lost - will reconnect...")
async def run_with_reconnect(self):
"""Run WebSocket with automatic reconnection"""
while self.reconnect_attempts < self.max_reconnect_attempts:
try:
logger.info("š Connecting to WebSocket...")
await self.client.subscribe_new_tokens(
callback=self.handle_tokens,
error_callback=self.handle_errors
)
# If we get here, connection was closed normally
logger.info("WebSocket connection closed normally")
break
except Exception as e:
self.reconnect_attempts += 1
logger.error(f"Connection failed (attempt {self.reconnect_attempts}/{self.max_reconnect_attempts}): {e}")
if self.reconnect_attempts < self.max_reconnect_attempts:
delay = self.reconnect_delay * (2 ** (self.reconnect_attempts - 1))
logger.info(f"Reconnecting in {delay}s...")
await asyncio.sleep(delay)
else:
logger.error("Max reconnection attempts reached")
raise
async def process_token(self, token):
"""Process individual token"""
# Your token processing logic here
passWebSocket connections can drop. Always implement automatic reconnection with exponential backoff to ensure continuous monitoring.
WebSocket callbacks are async functions. Don't block the event loop with synchronous operations:
ā Blocking:
async def handle(tokens):
time.sleep(1) # Blocks!
process(tokens)ā Non-blocking:
async def handle(tokens):
await asyncio.sleep(1)
await process(tokens)Authentication tokens expire. AxiomTradeAPI automatically refreshes tokens, but monitor for authentication errors and have a fallback plan.
When executing trades based on WebSocket data, implement rate limiting to avoid overwhelming your system or hitting API limits.
Detect new token launches in real-time and execute trades within milliseconds of liquidity pool creation.
Build real-time monitoring dashboards showing new tokens, prices, and market activity as they happen.
Send notifications (Discord, Telegram, email) when specific tokens or criteria are detected.
Build fully automated trading bots that react to market conditions and execute strategies in real-time.