Skip to content

Instantly share code, notes, and snippets.

@robmcl4

robmcl4/bug.py Secret

Created June 11, 2022 01:38
Show Gist options
  • Save robmcl4/164d02de04c8708817c5c32a9c325843 to your computer and use it in GitHub Desktop.
Save robmcl4/164d02de04c8708817c5c32a9c325843 to your computer and use it in GitHub Desktop.
import collections
import os
import web3
import web3.types
#
# Connect to both a mainnet archive node and ganache
#
web3_host = os.getenv('WEB3_HOST', 'ws://127.0.0.1:8546')
w3_mainnet = web3.Web3(web3.WebsocketProvider(
web3_host,
websocket_timeout=60 * 5,
websocket_kwargs={
'max_size': 1024 * 1024 * 1024, # 1 Gb max payload
},
))
assert w3_mainnet.isConnected()
w3 = web3.Web3(
web3.WebsocketProvider(
'ws://localhost:34451',
websocket_timeout=60 * 5,
websocket_kwargs={
'max_size': 1024 * 1024 * 1024,
}
)
)
assert w3.isConnected()
#
# Collect a debug_traceTransaction from both archive node + ganache
#
tx0 = '0xf514259f185019704004b241cb4c6e31ed5e6e474a998b99c4b89209e304f417'
print('querying archive node')
resp_mainnet = w3_mainnet.provider.make_request('debug_traceTransaction', [tx0])
print('done querying archive node, querying ganache')
resp_ganache = w3.provider.make_request('debug_traceTransaction', [tx0])
print('done querying ganache')
# sanity check
assert 'result' in resp_mainnet
assert 'result' in resp_ganache
assert len(resp_mainnet['result']['structLogs']) == len(resp_ganache['result']['structLogs'])
#
# Check reported gas
#
if resp_mainnet['result']['gas'] != resp_ganache['result']['gas']:
print('Gas mismatch!')
print(resp_mainnet['result']['gas'])
print(resp_ganache['result']['gas'])
print()
#
# Use the archive node's response as ground-truth to check consistency of ganache
#
# execution context stack, we push/pop as we CALL / RETURN from methods
# initialized with the `to` address of the transaction
ctx_stack = ['0x9D58779365B067D5D3fCc6e92d237aCd06F1e6a1']
# maps from address -> slot -> value
# for known values
known_state = collections.defaultdict(lambda: {})
paired_logs = list(zip(resp_mainnet['result']['structLogs'], resp_ganache['result']['structLogs']))
for i, (log_archive, log_ganache) in enumerate(paired_logs):
# Basic consistency checks (these should not fail)
assert log_archive['pc'] == log_ganache['pc']
archive_op = log_archive['op']
if archive_op == 'KECCAK256':
# archive node returns op == KECCAK256 instead of SHA3
archive_op = 'SHA3'
assert archive_op == log_ganache['op'], f"{log_archive['op']} == {log_ganache['op']}"
# Important: simplify our logic because there are no reverts (and no out-of-gas), so all SSTORE must persist
assert log_archive['op'] != 'REVERT'
# assert consistency in all slots
context = ctx_stack[-1]
for slot in log_ganache['storage']:
if slot in known_state[context]:
if known_state[context][slot] != log_ganache['storage'][slot]:
# !!!!!!!!!!!!!!!!!!!!!!!!!!!!! THIS IS THE BUG !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
print('!!!!!!!!!!!!!!!!!!!!!! BUG !!!!!!!!!!!!!!!!!!!!!!!!!')
print(f"expected storage {context} slot {slot} to be {known_state[context][slot]} but got {log_ganache['storage'][slot]}")
# print some logs surrounding this bug state
for _, lg in paired_logs[i-5: i+5]:
if lg['pc'] == log_ganache['pc']:
print(str(lg['pc']) + '*', lg['op'], 'slot', lg['storage'].get(slot))
else:
print(lg['pc'], lg['op'], 'slot', lg['storage'].get(slot))
exit(1)
# Record if the current contract changes
if log_archive['op'] == 'DELEGATECALL':
# the storage context is unchanged, BUT must be duplicated to account for an additional `return`
ctx_stack.append(ctx_stack[-1])
elif log_archive['op'] in ['CALL', 'STATICCALL']:
# get the call target from the stack (and ensure consistency across both)
# There's a bogus call to address '0x1' which fails, only modify
# execution stack if that is not the case
if log_archive['stack'][-2] != '0x1':
target_archive = web3.Web3.toChecksumAddress(log_archive['stack'][-2].lstrip('0x').rjust(40, '0'))
target_ganache = web3.Web3.toChecksumAddress(log_ganache['stack'][-2][-40:])
assert target_archive == target_ganache
ctx_stack.append(target_archive)
elif log_archive['op'] in ['RETURN', 'STOP']:
ctx_stack.pop()
elif log_ganache['op'] == 'SSTORE':
# Record SSTORE for later consistency checks
value = log_ganache['stack'][-2]
slot = log_ganache['stack'][-1]
# print('storing', slot, value, 'ctx', context)
known_state[context][slot] = value
print('no bug')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment