Skip to content

Commit

Permalink
calculate transaction quantities
Browse files Browse the repository at this point in the history
* Get token name, symbol, and calculate real transaction values. But
this required adding support for the Blockscout API.
* adaptive rate limit for subscan.io depending on whether an API key is
provided.
* migrate URLs to be encapsulated within the block explorer wrappers.

contributes to ChaosDAO-org#8 decoding DEX swap transactions
  • Loading branch information
spazcoin committed Apr 12, 2022
1 parent c18e884 commit b802527
Show file tree
Hide file tree
Showing 5 changed files with 166 additions and 37 deletions.
20 changes: 12 additions & 8 deletions subscrape/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
from subscrape.scrapers.moonbeam_scraper import MoonbeamScraper
from subscrape.subscan_wrapper import SubscanWrapper
from subscrape.blockscout_wrapper import BlockscoutWrapper
from subscrape.moonscan_wrapper import MoonscanWrapper
from subscrape.scrapers.parachain_scraper import ParachainScraper
from subscrape.db.subscrape_db import SubscrapeDB
Expand All @@ -14,8 +15,11 @@ def moonscan_factory(chain):
f = open("config/moonscan-key")
moonscan_key = f.read()

endpoint = f"https://api-{chain}.moonscan.io/api"
return MoonscanWrapper(endpoint, moonscan_key)
return MoonscanWrapper(chain, moonscan_key)


def blockscout_factory(chain):
return BlockscoutWrapper(chain)


def subscan_factory(chain):
Expand All @@ -24,8 +28,7 @@ def subscan_factory(chain):
f = open("config/subscan-key")
subscan_key = f.read()

endpoint = f"https://{chain}.api.subscan.io"
return SubscanWrapper(endpoint, subscan_key)
return SubscanWrapper(chain, subscan_key)


def scraper_factory(name):
Expand All @@ -34,13 +37,14 @@ def scraper_factory(name):
if not os.path.exists(db_path):
os.makedirs(db_path)
db_path += f'/{name}_'
api = moonscan_factory(name)
scraper = MoonbeamScraper(db_path, api)
moonscan_api = moonscan_factory(name)
blockscout_api = blockscout_factory(name)
scraper = MoonbeamScraper(db_path, moonscan_api, blockscout_api)
return scraper
else:
db = SubscrapeDB(name)
api = subscan_factory(name)
scraper = ParachainScraper(db, api)
subscan_api = subscan_factory(name)
scraper = ParachainScraper(db, subscan_api)
return scraper


Expand Down
97 changes: 97 additions & 0 deletions subscrape/blockscout_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import httpx
import json
import logging
from ratelimit import limits, sleep_and_retry


class BlockscoutWrapper:
def __init__(self, chain):
self.logger = logging.getLogger("BlockscoutWrapper")
self.endpoint = f"https://blockscout.{chain}.moonbeam.network/api"

@sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit
@limits(calls=5, period=1) # No API limit stated on Blockscout website, so choose conservative 5 calls/sec
def query(self, params):
response = httpx.get(self.endpoint, params=params)
self.logger.debug(response)
return response.text

def iterate_pages(self, element_processor, params={}):
done = False # keep crunching until we are done
start_block = 0 # iterator for the page we want to query
previous_block = 0 # to check if the iterator actually moved forward
count = 0 # counter for how many items we queried already
limit = 0 # max amount of items to be queried. to be determined after the first call

while not done:
params["startblock"] = start_block
response = self.query(params)

# unpackage the payload
obj = json.loads(response)
if obj["status"] == "0":
self.logger.info("received empty result")
return

elements = obj["result"]

# process the elements
for element in elements:
element_processor(element)

# update counters and check if we should exit
count += len(elements)
self.logger.info(count)

start_block = element["blockNumber"]
if start_block == previous_block:
done = True
previous_block = start_block

def fetch_and_process_transactions(self, address, element_processor):
"""Fetch all transactions for an address, and then pass them to the processor for processing.
:param address: address to retrieve transactions for
:type address: str
:param element_processor: method to process each transaction as it is received
:type element_processor: function
"""
params = {"module": "account", "action": "txlist", "address": address, "startblock": "1",
"endblock": "99999999", "sort": "asc"}
self.iterate_pages(element_processor, params=params)

def get_contract_abi(self, contract_address):
"""Get a contract's ABI (so that its transactions can be decoded).
:param contract_address: contract address
:type contract_address: str
:returns: string representing the contract's ABI, or None if not retrievable
:rtype: str or None
"""
params = {"module": "contract", "action": "getabi", "address": contract_address}
response = self.query(params)
response_dict = json.loads(response)
if response_dict['status'] == "0" or response_dict['message'] == "NOTOK":
self.logger.info(f'ABI not retrievable for {contract_address} because "{response_dict["result"]}"')
return None
else:
# response_dict['result'] should contain a long string representation of the contract abi.
return response_dict['result']

def get_token_info(self, token_address):
"""Get a token's basic info (name, ticker symbol, decimal places)
:param token_address: token address
:type token_address: str
:returns: dictionary of values about the token, or None if not retrievable
:rtype: dict or None
"""
params = {"module": "token", "action": "getToken", "contractaddress": token_address}
response = self.query(params)
response_dict = json.loads(response)
if response_dict['status'] == "0" or response_dict['message'] == "NOTOK":
self.logger.info(f'Token info not retrievable for {token_address} because "{response_dict["result"]}"')
return None
else:
# response_dict['result'] should contain the info about the token.
return response_dict['result']
6 changes: 3 additions & 3 deletions subscrape/moonscan_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
from tracemalloc import start
import httpx
import json
import logging
from ratelimit import limits, sleep_and_retry

# "Powered by https://moonbeam.moonscan.io APIs"
#https://moonbeam.moonscan.io/apis#contracts


class MoonscanWrapper:
def __init__(self, endpoint, api_key=None):
def __init__(self, chain, api_key=None):
self.logger = logging.getLogger("MoonscanWrapper")
self.endpoint = endpoint
self.endpoint = f"https://api-{chain}.moonscan.io/api"
self.api_key = api_key

@sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit
Expand Down
65 changes: 43 additions & 22 deletions subscrape/scrapers/moonbeam_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@


class MoonbeamScraper:
def __init__(self, db_path, api):
def __init__(self, db_path, moonscan_api, blockscout_api):
self.logger = logging.getLogger("MoonbeamScraper")
self.db_path = db_path
self.api = api
self.moonscan_api = moonscan_api
self.blockscout_api = blockscout_api
self.transactions = {}
self.abis = {}
self.abis = {} # cache of contract ABI interface definitions
self.tokens = {} # cache of token contract basic info

def scrape(self, operations, chain_config):
for operation in operations:
Expand Down Expand Up @@ -120,8 +122,8 @@ def fetch_transactions(self, address, processor, reference=None):
self.logger.warning(f"{file_path} already exists. Skipping.")
return

self.logger.info(f"Fetching transactions for {reference} from {self.api.endpoint}")
self.api.fetch_and_process_transactions(address, processor)
self.logger.info(f"Fetching transactions for {reference} from {self.moonscan_api.endpoint}")
self.moonscan_api.fetch_and_process_transactions(address, processor)

payload = json.dumps(self.transactions[reference], indent=4, sort_keys=False)
file = io.open(file_path, "w")
Expand Down Expand Up @@ -163,17 +165,24 @@ def process_transaction_on_account(transaction):

# retrieve and cache the abi for the contract
if contract_address not in self.abis and contract_address != account:
self.abis[contract_address] = self.api.get_contract_abi(contract_address)
if self.abis[contract_address] is not None:
self.logger.info(f'Contract abi found for {contract_address}.')
self.abis[contract_address] = self.moonscan_api.get_contract_abi(contract_address)
# if self.abis[contract_address] is not None:
# self.logger.info(f'Contract abi found for {contract_address}.')

if contract_address in self.abis and self.abis[contract_address] is not None:
decoded_transaction = decode_tx(contract_address, transaction['input'], self.abis[contract_address])

if decoded_transaction[0] == 'decode error':
self.logger.warning(f'Unable to decode contract interaction from transaction={transaction}\r\n'
f' abi={self.abis[contract_address]}\r\n'
f' and decoded_transaction={decoded_transaction}\r\n\r\n')
known_contracts_with_decode_errors = {
"0xbcc8a3022f69a5cdddc22c068049cd07581b1aa5", # 0xTaylor "puzzle"
"0xf48ea3bc302f6c0585eceddba70a1bc12d67e76f", # DPSDocks v1.0
"0x421bff16bba3bca1720638482c647eb832fd9de4", # DPSDocks v1.5
"0x77da4f1d66004bebfda4d5a42931388cecaf81c5" # DPSPlunderersGuild
}
if contract_address not in known_contracts_with_decode_errors:
self.logger.warning(f'Unable to decode contract interaction from transaction={transaction}\r\n'
f' abi={self.abis[contract_address]}\r\n'
f' and decoded_transaction={decoded_transaction}\r\n\r\n')
else:
# successfully decoded the input data to the contract interaction
contract_method_name = decoded_transaction[0]
Expand All @@ -182,20 +191,32 @@ def process_transaction_on_account(transaction):
print('solarbeam function called: ', contract_method_name)
print('arguments: ', json.dumps(decoded_tx, indent=2))

# todo: add support for lots of dex swap methods
# todo: interpret liquidity provisioning and other events (like SwapExactTokensForETH)
if contract_method_name == "swapExactTokensForTokens":
token_path = decoded_tx['path']
acct_tx['input_token'] = token_path[0]
acct_tx['output_token'] = token_path[len(token_path) - 1]
acct_tx['input_token_quantity'] = decoded_tx['amountIn']
acct_tx['output_token_quantity'] = decoded_tx['amountOutMin']
# We only have an estimate so far based on the inputs.
# todo: these amounts need to be converted to floats by dividing by the DECIMAL for each contract.
# todo: translate token contract address into the token's name to make it user readable in spreadsheet.

# if moonscan API key then:
# todo: find the event logs that the dex router emits, to figure out exactly how much was swapped.
# retrieve and cache the token info for all tokens
for token in token_path:
if token not in self.tokens:
self.tokens[token] = self.blockscout_api.get_token_info(token)
# if self.tokens[token] is not None:
# self.logger.info(f'Token info found for {token} = {self.tokens[token]}')

input_token = token_path[0]
input_token_info = self.tokens[input_token]
acct_tx['input_token_name'] = input_token_info['name']
acct_tx['input_symbol'] = input_token_info['symbol']
acct_tx['input_quantity'] = \
decoded_tx['amountIn'] / (10 ** int(input_token_info['decimals']))
output_token = token_path[len(token_path) - 1]
output_token_info = self.tokens[output_token]
acct_tx['output_token_name'] = output_token_info['name']
acct_tx['output_symbol'] = output_token_info['symbol']
acct_tx['output_quantity'] = \
decoded_tx['amountOutMin'] / (10 ** int(output_token_info['decimals']))

# todo: interpret liquidity provisioning and other events (like SwapExactTokensForETH)
# We only have an estimate so far based on the inputs.
# todo: find the event logs that the dex router emits, to figure out exactly how much was swapped.

self.transactions[account][timestamp] = acct_tx
return process_transaction_on_account
Expand Down
15 changes: 11 additions & 4 deletions subscrape/subscan_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,23 @@
#requests_log.setLevel(logging.DEBUG)
#requests_log.propagate = True

SUBSCAN_MAX_CALLS_PER_SEC_WITHOUT_API_KEY = 2
SUBSCAN_MAX_CALLS_PER_SEC_WITH_AN_API_KEY = 30
MAX_CALLS_PER_SEC = SUBSCAN_MAX_CALLS_PER_SEC_WITHOUT_API_KEY

class SubscanWrapper:

def __init__(self, endpoint, api_key=None):
class SubscanWrapper:
def __init__(self, chain, api_key=None):
self.logger = logging.getLogger("SubscanWrapper")
self.endpoint = f"https://{chain}.api.subscan.io"
self.api_key = api_key
self.endpoint = endpoint
global MAX_CALLS_PER_SEC
if api_key is not None:
MAX_CALLS_PER_SEC = SUBSCAN_MAX_CALLS_PER_SEC_WITH_AN_API_KEY
self.logger.info(f'Subscan rate limit set to {MAX_CALLS_PER_SEC} API calls per second.')

@sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit
@limits(calls=29, period=1) # API limits us to 30 calls every second
@limits(calls=MAX_CALLS_PER_SEC, period=1) # API limits us to 30 calls every second
def query(self, method, headers={}, body={}):
headers["Content-Type"] = "application/json"
if self.api_key is not None:
Expand Down

0 comments on commit b802527

Please sign in to comment.