diff --git a/subscrape/__init__.py b/subscrape/__init__.py index 651667a..7265f5c 100644 --- a/subscrape/__init__.py +++ b/subscrape/__init__.py @@ -2,6 +2,7 @@ import os from subscrape.scrapers.moonbeam_scraper import MoonbeamScraper from subscrape.subscan_wrapper import SubscanWrapper +from subscrape.blockscout_wrapper import BlockscoutWrapper from subscrape.moonscan_wrapper import MoonscanWrapper from subscrape.scrapers.parachain_scraper import ParachainScraper from subscrape.db.subscrape_db import SubscrapeDB @@ -14,8 +15,11 @@ def moonscan_factory(chain): f = open("config/moonscan-key") moonscan_key = f.read() - endpoint = f"https://api-{chain}.moonscan.io/api" - return MoonscanWrapper(endpoint, moonscan_key) + return MoonscanWrapper(chain, moonscan_key) + + +def blockscout_factory(chain): + return BlockscoutWrapper(chain) def subscan_factory(chain): @@ -24,8 +28,7 @@ def subscan_factory(chain): f = open("config/subscan-key") subscan_key = f.read() - endpoint = f"https://{chain}.api.subscan.io" - return SubscanWrapper(endpoint, subscan_key) + return SubscanWrapper(chain, subscan_key) def scraper_factory(name): @@ -34,13 +37,14 @@ def scraper_factory(name): if not os.path.exists(db_path): os.makedirs(db_path) db_path += f'/{name}_' - api = moonscan_factory(name) - scraper = MoonbeamScraper(db_path, api) + moonscan_api = moonscan_factory(name) + blockscout_api = blockscout_factory(name) + scraper = MoonbeamScraper(db_path, moonscan_api, blockscout_api) return scraper else: db = SubscrapeDB(name) - api = subscan_factory(name) - scraper = ParachainScraper(db, api) + subscan_api = subscan_factory(name) + scraper = ParachainScraper(db, subscan_api) return scraper diff --git a/subscrape/blockscout_wrapper.py b/subscrape/blockscout_wrapper.py new file mode 100644 index 0000000..c21e319 --- /dev/null +++ b/subscrape/blockscout_wrapper.py @@ -0,0 +1,97 @@ +import httpx +import json +import logging +from ratelimit import limits, sleep_and_retry + + +class BlockscoutWrapper: + def __init__(self, chain): + self.logger = logging.getLogger("BlockscoutWrapper") + self.endpoint = f"https://blockscout.{chain}.moonbeam.network/api" + + @sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit + @limits(calls=5, period=1) # No API limit stated on Blockscout website, so choose conservative 5 calls/sec + def query(self, params): + response = httpx.get(self.endpoint, params=params) + self.logger.debug(response) + return response.text + + def iterate_pages(self, element_processor, params={}): + done = False # keep crunching until we are done + start_block = 0 # iterator for the page we want to query + previous_block = 0 # to check if the iterator actually moved forward + count = 0 # counter for how many items we queried already + limit = 0 # max amount of items to be queried. to be determined after the first call + + while not done: + params["startblock"] = start_block + response = self.query(params) + + # unpackage the payload + obj = json.loads(response) + if obj["status"] == "0": + self.logger.info("received empty result") + return + + elements = obj["result"] + + # process the elements + for element in elements: + element_processor(element) + + # update counters and check if we should exit + count += len(elements) + self.logger.info(count) + + start_block = element["blockNumber"] + if start_block == previous_block: + done = True + previous_block = start_block + + def fetch_and_process_transactions(self, address, element_processor): + """Fetch all transactions for an address, and then pass them to the processor for processing. + + :param address: address to retrieve transactions for + :type address: str + :param element_processor: method to process each transaction as it is received + :type element_processor: function + """ + params = {"module": "account", "action": "txlist", "address": address, "startblock": "1", + "endblock": "99999999", "sort": "asc"} + self.iterate_pages(element_processor, params=params) + + def get_contract_abi(self, contract_address): + """Get a contract's ABI (so that its transactions can be decoded). + + :param contract_address: contract address + :type contract_address: str + :returns: string representing the contract's ABI, or None if not retrievable + :rtype: str or None + """ + params = {"module": "contract", "action": "getabi", "address": contract_address} + response = self.query(params) + response_dict = json.loads(response) + if response_dict['status'] == "0" or response_dict['message'] == "NOTOK": + self.logger.info(f'ABI not retrievable for {contract_address} because "{response_dict["result"]}"') + return None + else: + # response_dict['result'] should contain a long string representation of the contract abi. + return response_dict['result'] + + def get_token_info(self, token_address): + """Get a token's basic info (name, ticker symbol, decimal places) + + :param token_address: token address + :type token_address: str + :returns: dictionary of values about the token, or None if not retrievable + :rtype: dict or None + """ + params = {"module": "token", "action": "getToken", "contractaddress": token_address} + response = self.query(params) + response_dict = json.loads(response) + if response_dict['status'] == "0" or response_dict['message'] == "NOTOK": + self.logger.info(f'Token info not retrievable for {token_address} because "{response_dict["result"]}"') + return None + else: + # response_dict['result'] should contain the info about the token. + return response_dict['result'] diff --git a/subscrape/moonscan_wrapper.py b/subscrape/moonscan_wrapper.py index 12ab4ef..28618b1 100644 --- a/subscrape/moonscan_wrapper.py +++ b/subscrape/moonscan_wrapper.py @@ -1,16 +1,16 @@ -from tracemalloc import start import httpx import json import logging from ratelimit import limits, sleep_and_retry +# "Powered by https://moonbeam.moonscan.io APIs" #https://moonbeam.moonscan.io/apis#contracts class MoonscanWrapper: - def __init__(self, endpoint, api_key=None): + def __init__(self, chain, api_key=None): self.logger = logging.getLogger("MoonscanWrapper") - self.endpoint = endpoint + self.endpoint = f"https://api-{chain}.moonscan.io/api" self.api_key = api_key @sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit diff --git a/subscrape/scrapers/moonbeam_scraper.py b/subscrape/scrapers/moonbeam_scraper.py index 7bea56f..b42282a 100644 --- a/subscrape/scrapers/moonbeam_scraper.py +++ b/subscrape/scrapers/moonbeam_scraper.py @@ -9,12 +9,14 @@ class MoonbeamScraper: - def __init__(self, db_path, api): + def __init__(self, db_path, moonscan_api, blockscout_api): self.logger = logging.getLogger("MoonbeamScraper") self.db_path = db_path - self.api = api + self.moonscan_api = moonscan_api + self.blockscout_api = blockscout_api self.transactions = {} - self.abis = {} + self.abis = {} # cache of contract ABI interface definitions + self.tokens = {} # cache of token contract basic info def scrape(self, operations, chain_config): for operation in operations: @@ -120,8 +122,8 @@ def fetch_transactions(self, address, processor, reference=None): self.logger.warning(f"{file_path} already exists. Skipping.") return - self.logger.info(f"Fetching transactions for {reference} from {self.api.endpoint}") - self.api.fetch_and_process_transactions(address, processor) + self.logger.info(f"Fetching transactions for {reference} from {self.moonscan_api.endpoint}") + self.moonscan_api.fetch_and_process_transactions(address, processor) payload = json.dumps(self.transactions[reference], indent=4, sort_keys=False) file = io.open(file_path, "w") @@ -163,17 +165,24 @@ def process_transaction_on_account(transaction): # retrieve and cache the abi for the contract if contract_address not in self.abis and contract_address != account: - self.abis[contract_address] = self.api.get_contract_abi(contract_address) - if self.abis[contract_address] is not None: - self.logger.info(f'Contract abi found for {contract_address}.') + self.abis[contract_address] = self.moonscan_api.get_contract_abi(contract_address) + # if self.abis[contract_address] is not None: + # self.logger.info(f'Contract abi found for {contract_address}.') if contract_address in self.abis and self.abis[contract_address] is not None: decoded_transaction = decode_tx(contract_address, transaction['input'], self.abis[contract_address]) if decoded_transaction[0] == 'decode error': - self.logger.warning(f'Unable to decode contract interaction from transaction={transaction}\r\n' - f' abi={self.abis[contract_address]}\r\n' - f' and decoded_transaction={decoded_transaction}\r\n\r\n') + known_contracts_with_decode_errors = { + "0xbcc8a3022f69a5cdddc22c068049cd07581b1aa5", # 0xTaylor "puzzle" + "0xf48ea3bc302f6c0585eceddba70a1bc12d67e76f", # DPSDocks v1.0 + "0x421bff16bba3bca1720638482c647eb832fd9de4", # DPSDocks v1.5 + "0x77da4f1d66004bebfda4d5a42931388cecaf81c5" # DPSPlunderersGuild + } + if contract_address not in known_contracts_with_decode_errors: + self.logger.warning(f'Unable to decode contract interaction from transaction={transaction}\r\n' + f' abi={self.abis[contract_address]}\r\n' + f' and decoded_transaction={decoded_transaction}\r\n\r\n') else: # successfully decoded the input data to the contract interaction contract_method_name = decoded_transaction[0] @@ -182,20 +191,32 @@ def process_transaction_on_account(transaction): print('solarbeam function called: ', contract_method_name) print('arguments: ', json.dumps(decoded_tx, indent=2)) + # todo: add support for lots of dex swap methods + # todo: interpret liquidity provisioning and other events (like SwapExactTokensForETH) if contract_method_name == "swapExactTokensForTokens": token_path = decoded_tx['path'] - acct_tx['input_token'] = token_path[0] - acct_tx['output_token'] = token_path[len(token_path) - 1] - acct_tx['input_token_quantity'] = decoded_tx['amountIn'] - acct_tx['output_token_quantity'] = decoded_tx['amountOutMin'] - # We only have an estimate so far based on the inputs. - # todo: these amounts need to be converted to floats by dividing by the DECIMAL for each contract. - # todo: translate token contract address into the token's name to make it user readable in spreadsheet. - - # if moonscan API key then: - # todo: find the event logs that the dex router emits, to figure out exactly how much was swapped. + # retrieve and cache the token info for all tokens + for token in token_path: + if token not in self.tokens: + self.tokens[token] = self.blockscout_api.get_token_info(token) + # if self.tokens[token] is not None: + # self.logger.info(f'Token info found for {token} = {self.tokens[token]}') + + input_token = token_path[0] + input_token_info = self.tokens[input_token] + acct_tx['input_token_name'] = input_token_info['name'] + acct_tx['input_symbol'] = input_token_info['symbol'] + acct_tx['input_quantity'] = \ + decoded_tx['amountIn'] / (10 ** int(input_token_info['decimals'])) + output_token = token_path[len(token_path) - 1] + output_token_info = self.tokens[output_token] + acct_tx['output_token_name'] = output_token_info['name'] + acct_tx['output_symbol'] = output_token_info['symbol'] + acct_tx['output_quantity'] = \ + decoded_tx['amountOutMin'] / (10 ** int(output_token_info['decimals'])) - # todo: interpret liquidity provisioning and other events (like SwapExactTokensForETH) + # We only have an estimate so far based on the inputs. + # todo: find the event logs that the dex router emits, to figure out exactly how much was swapped. self.transactions[account][timestamp] = acct_tx return process_transaction_on_account diff --git a/subscrape/subscan_wrapper.py b/subscrape/subscan_wrapper.py index 244900b..e88e9ca 100644 --- a/subscrape/subscan_wrapper.py +++ b/subscrape/subscan_wrapper.py @@ -10,16 +10,23 @@ #requests_log.setLevel(logging.DEBUG) #requests_log.propagate = True +SUBSCAN_MAX_CALLS_PER_SEC_WITHOUT_API_KEY = 2 +SUBSCAN_MAX_CALLS_PER_SEC_WITH_AN_API_KEY = 30 +MAX_CALLS_PER_SEC = SUBSCAN_MAX_CALLS_PER_SEC_WITHOUT_API_KEY -class SubscanWrapper: - def __init__(self, endpoint, api_key=None): +class SubscanWrapper: + def __init__(self, chain, api_key=None): self.logger = logging.getLogger("SubscanWrapper") + self.endpoint = f"https://{chain}.api.subscan.io" self.api_key = api_key - self.endpoint = endpoint + global MAX_CALLS_PER_SEC + if api_key is not None: + MAX_CALLS_PER_SEC = SUBSCAN_MAX_CALLS_PER_SEC_WITH_AN_API_KEY + self.logger.info(f'Subscan rate limit set to {MAX_CALLS_PER_SEC} API calls per second.') @sleep_and_retry # be patient and sleep this thread to avoid exceeding the rate limit - @limits(calls=29, period=1) # API limits us to 30 calls every second + @limits(calls=MAX_CALLS_PER_SEC, period=1) # API limits us to 30 calls every second def query(self, method, headers={}, body={}): headers["Content-Type"] = "application/json" if self.api_key is not None: