event_reader.reorganisation_monitor

Documentation for eth_defi.event_reader.reorganisation_monitor Python module.

Chain reorganisation handling during the chain data reading.

All EMV based blockchains are subject to minor chain reorganisation, when nodes have not yet reached consensus on the chain tip around the world.

Functions

create_reorganisation_monitor(web3[, ...])

Set up a chain reorganisation monitor tactic based on the node supported APIs

Classes

ChainReorganisationResolution

How did we fare getting hashes and timestamps for the latest blocks.

GraphQLReorganisationMonitor

Watch blockchain for reorgs using GoEthereum /graphql API.

JSONRPCReorganisationMonitor

Watch blockchain for reorgs using eth_getBlockByNumber JSON-RPC API.

MockChainAndReorganisationMonitor

A dummy reorganisation monitor for unit testing.

ReorganisationMonitor

Watch blockchain for reorgs.

Exceptions

BlockNotAvailable

Tried to ask timestamp data for a block that does not exist yet.

ChainReorganisationDetected

ReorganisationResolutionFailure

Chould not figure out chain reorgs after mutliple attempt.

TooLongRange

Reorg scan range is too long.

class ChainReorganisationResolution

Bases: object

How did we fare getting hashes and timestamps for the latest blocks.

last_live_block: int

What we know is the chain tip on our node

This is the latest block at the JSON-RPC node. We can read data up to this block.

latest_block_with_good_data: int

What we know is the block for which we do not need to perform rollback

This is the block number that does not need to purged from your internal database. All previously read events that have higher block number should be purged.

reorg_detected: bool

Did we detect any reorgs in this chycle

get_read_range()

Get the range of blocks we should read on this poll cycle.

  • This range may overlap your previous event read range.

  • You should discard any data that’s older than the start of the range

  • You should be prepared to read an event again

Returns

(start block, end block) inclusive range

Return type

Tuple[int, int]

__init__(last_live_block, latest_block_with_good_data, reorg_detected)
Parameters
  • last_live_block (int) –

  • latest_block_with_good_data (int) –

  • reorg_detected (bool) –

Return type

None

exception ChainReorganisationDetected

Bases: Exception

__init__(block_number, original_hash, new_hash)
Parameters
  • block_number (int) –

  • original_hash (str) –

  • new_hash (str) –

__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception TooLongRange

Bases: Exception

Reorg scan range is too long.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception ReorganisationResolutionFailure

Bases: Exception

Chould not figure out chain reorgs after mutliple attempt.

Node in a bad state?

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception BlockNotAvailable

Bases: Exception

Tried to ask timestamp data for a block that does not exist yet.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class ReorganisationMonitor

Bases: abc.ABC

Watch blockchain for reorgs.

Most EMV blockchains have several minor chain organisations per day, when your node switched from one chain tip to another, due to block propagation issues. Any application reading blockchain event data must be able to detect such reorganisations and purge incorrect data from their data feeds.

  • Abstract base class for different ways to support chain reorganisations

  • Maintain the state where our blockchain read cursor is, using get_last_block_read()

  • Ingest and maintain the state of the last read blocks using update_chain()

  • Check block headers for chain reorganisations when reading events from the chain using check_block_reorg()

  • Manages the service for block timestamp lookups, get_block_timestamp()

  • Save and load block header state to disk cache, because APIs are slow, using load_pandas() and to_pandas()

Example:

import os
import time

from web3 import HTTPProvider, Web3

from eth_defi.abi import get_contract
from eth_defi.chain import install_chain_middleware
from eth_defi.event_reader.filter import Filter
from eth_defi.event_reader.reader import read_events, LogResult,
from eth_defi.event_reader.reorganisation_monitor import JSONRPCReorganisationMonitor


def main():

    json_rpc_url = os.environ.get("JSON_RPC_POLYGON", "https://polygon-rpc.com")
    web3 = Web3(HTTPProvider(json_rpc_url))
    web3.middleware_onion.clear()
    install_chain_middleware(web3)

    # Get contracts
    Pair = get_contract(web3, "sushi/UniswapV2Pair.json")

    filter = Filter.create_filter(
        address=None,  # Listen events from any smart contract
        event_types=[Pair.events.Swap]
    )

    reorg_mon = JSONRPCReorganisationMonitor(web3, check_depth=3)

    reorg_mon.load_initial_block_headers(block_count=5)

    processed_events = set()

    latest_block = None

    # Keep reading events as they land
    while True:
        chain_reorg_resolution = reorg_mon.update_chain()
        start, end = chain_reorg_resolution.get_read_range()

        if chain_reorg_resolution.reorg_detected:
            print("Chain reorg warning")

        evt: LogResult
        for evt in read_events(
            web3,
            start_block=start,
            end_block=end,
            filter=filter,
        ):
            # How to uniquely identify EVM logs
            key = evt["blockHash"] + evt["transactionHash"] + evt["logIndex"]

            # The reader may cause duplicate events as the chain tip reorganises
            if key not in processed_events:
                print(f"Swap at block {evt['blockNumber']:,} tx: {evt['transactionHash']}")
                processed_events.add(key)

        if end != latest_block:
            print(f"Latest block is {end:,}")
            latest_block = end

        time.sleep(0.5)


if __name__ == "__main__":
    main()
block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]

Internal buffer of our block data

Block number -> Block header data

last_block_read: int = 0

Last block served by update_chain() in the duty cycle

check_depth: int = 20

How many blocks we replay from the blockchain to detect any chain organisations

Done by figure_reorganisation_and_new_blocks(). Adjust this for your EVM chain.

max_cycle_tries = 10

How many times we try to re-read data from the blockchain in the case of reorganisation.

If our node constantly feeds us changing data give up.

reorg_wait_seconds = 5

How long we allow our node to catch up in the case there has been a change in the chain tip.

If our node constantly feeds us changing data give up.

has_data()

Do we have any data available yet.

Return type

bool

get_last_block_read()

Get the number of the last block served by update_chain().

Return type

int

get_block_by_number(block_number)

Get block header data for a specific block number from our memory buffer.

Parameters

block_number (int) –

Return type

eth_defi.event_reader.block_header.BlockHeader

skip_to_block(block_number)

Skip scanning initial chain and directly start from a certain block.

Parameters

block_number (int) –

load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)

Get the initial block buffer filled up.

You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.

Parameters
  • block_count (Optional[int]) –

    How many latest block to load

    Give start_block or block_count.

  • start_block (Optional[int]) –

    What is the first block to read.

    Give start_block or block_count.

  • tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar

  • save_callable (Optional[Callable]) –

    Save after every block.

    Called after every block.

    TODO: Hack. Design a better interface.

Returns

The initial block range to start to work with

Return type

Tuple[int, int]

add_block(record)

Add new block to header tracking.

Blocks must be added in order.

Parameters

record (eth_defi.event_reader.block_header.BlockHeader) –

check_block_reorg(block_number, block_hash)

Check that newly read block matches our record.

  • Called during the event reader

  • Event reader gets the block number and hash with the event

  • We have initial block_map in memory, previously buffered in

  • We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation

If we do not have records, ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

Parameters
  • block_number (int) –

  • block_hash (str) –

Return type

Optional[int]

truncate(latest_good_block)

Delete data after a block number because chain reorg happened.

Parameters

latest_good_block (int) – Delete all data starting after this block (exclusive)

figure_reorganisation_and_new_blocks(max_range=1000000)

Compare the local block database against the live data from chain.

Spot the differences in (block number, block header) tuples and determine a chain reorg.

Parameters

max_range (Optional[int]) –

Abort if we need to scan more than this amount of blocks.

This is because giving too long block range to scan is likely to take forever on non-graphql nodes.

Set None to ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

get_block_timestamp(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

int

get_block_timestamp_as_pandas(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

pandas._libs.tslibs.timestamps.Timestamp

update_chain()

Update the internal memory buffer of block headers from the blockchain node.

  • Do several attempt to read data (as a fork can cause other forks can cause fork)

  • Give up after some time if we detect the chain to be in a doom loop

Returns

What block range the consumer application should read.

What we think about the chain state.

Return type

eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution

to_pandas(partition_size=0)

Convert the data to Pandas DataFrame format for storing.

Parameters

partition_size (int) –

To partition the outgoing data.

Set 0 to ignore.

Return type

pandas.core.frame.DataFrame

load_pandas(df)

Load block header data from Pandas data frame.

Parameters

df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with to_pandas().

restore(block_map)

Restore the chain state from a saved data.

Parameters

block_map (dict) – Block number -> Block header dictionary

abstract fetch_block_data(start_block, end_block)

Read the new block headers.

Parameters
  • start_block – The first block where to read (inclusive)

  • end_block – The block where to read (inclusive)

Return type

Iterable[eth_defi.event_reader.block_header.BlockHeader]

abstract get_last_block_live()

Get last block number

Return type

int

__init__(block_map=<factory>, last_block_read=0, check_depth=20)
Parameters
Return type

None

class JSONRPCReorganisationMonitor

Bases: eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor

Watch blockchain for reorgs using eth_getBlockByNumber JSON-RPC API.

  • Use expensive eth_getBlockByNumber call to download block hash and timestamp from Ethereum compatible node

__init__(web3, **kwargs)
Parameters

web3 (web3.main.Web3) –

get_last_block_live()

Get last block number

fetch_block_data(start_block, end_block)

Read the new block headers.

Parameters
  • start_block – The first block where to read (inclusive)

  • end_block – The block where to read (inclusive)

Return type

Iterable[eth_defi.event_reader.block_header.BlockHeader]

add_block(record)

Add new block to header tracking.

Blocks must be added in order.

Parameters

record (eth_defi.event_reader.block_header.BlockHeader) –

check_block_reorg(block_number, block_hash)

Check that newly read block matches our record.

  • Called during the event reader

  • Event reader gets the block number and hash with the event

  • We have initial block_map in memory, previously buffered in

  • We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation

If we do not have records, ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

Parameters
  • block_number (int) –

  • block_hash (str) –

Return type

Optional[int]

figure_reorganisation_and_new_blocks(max_range=1000000)

Compare the local block database against the live data from chain.

Spot the differences in (block number, block header) tuples and determine a chain reorg.

Parameters

max_range (Optional[int]) –

Abort if we need to scan more than this amount of blocks.

This is because giving too long block range to scan is likely to take forever on non-graphql nodes.

Set None to ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

get_block_by_number(block_number)

Get block header data for a specific block number from our memory buffer.

Parameters

block_number (int) –

Return type

eth_defi.event_reader.block_header.BlockHeader

get_block_timestamp(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

int

get_block_timestamp_as_pandas(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

pandas._libs.tslibs.timestamps.Timestamp

get_last_block_read()

Get the number of the last block served by update_chain().

Return type

int

has_data()

Do we have any data available yet.

Return type

bool

load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)

Get the initial block buffer filled up.

You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.

Parameters
  • block_count (Optional[int]) –

    How many latest block to load

    Give start_block or block_count.

  • start_block (Optional[int]) –

    What is the first block to read.

    Give start_block or block_count.

  • tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar

  • save_callable (Optional[Callable]) –

    Save after every block.

    Called after every block.

    TODO: Hack. Design a better interface.

Returns

The initial block range to start to work with

Return type

Tuple[int, int]

load_pandas(df)

Load block header data from Pandas data frame.

Parameters

df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with to_pandas().

restore(block_map)

Restore the chain state from a saved data.

Parameters

block_map (dict) – Block number -> Block header dictionary

skip_to_block(block_number)

Skip scanning initial chain and directly start from a certain block.

Parameters

block_number (int) –

to_pandas(partition_size=0)

Convert the data to Pandas DataFrame format for storing.

Parameters

partition_size (int) –

To partition the outgoing data.

Set 0 to ignore.

Return type

pandas.core.frame.DataFrame

truncate(latest_good_block)

Delete data after a block number because chain reorg happened.

Parameters

latest_good_block (int) – Delete all data starting after this block (exclusive)

update_chain()

Update the internal memory buffer of block headers from the blockchain node.

  • Do several attempt to read data (as a fork can cause other forks can cause fork)

  • Give up after some time if we detect the chain to be in a doom loop

Returns

What block range the consumer application should read.

What we think about the chain state.

Return type

eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution

block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]

Internal buffer of our block data

Block number -> Block header data

class GraphQLReorganisationMonitor

Bases: eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor

Watch blockchain for reorgs using GoEthereum /graphql API.

Parameters
  • graphql_url – Give this or existing HTTPProvider

  • provider – Give this or graphql_url

__init__(graphql_url=None, provider=None, **kwargs)
Parameters
  • graphql_url (Optional[str]) – Give this or existing HTTPProvider

  • provider (Optional[web3.providers.rpc.rpc.HTTPProvider]) – Give this or graphql_url

get_last_block_live()

Get the chain tip using GraphQL.

Return type

int

fetch_block_data(start_block, end_block)

Read the new block headers.

Parameters
  • start_block – The first block where to read (inclusive)

  • end_block – The block where to read (inclusive)

Return type

Iterable[eth_defi.event_reader.block_header.BlockHeader]

add_block(record)

Add new block to header tracking.

Blocks must be added in order.

Parameters

record (eth_defi.event_reader.block_header.BlockHeader) –

check_block_reorg(block_number, block_hash)

Check that newly read block matches our record.

  • Called during the event reader

  • Event reader gets the block number and hash with the event

  • We have initial block_map in memory, previously buffered in

  • We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation

If we do not have records, ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

Parameters
  • block_number (int) –

  • block_hash (str) –

Return type

Optional[int]

figure_reorganisation_and_new_blocks(max_range=1000000)

Compare the local block database against the live data from chain.

Spot the differences in (block number, block header) tuples and determine a chain reorg.

Parameters

max_range (Optional[int]) –

Abort if we need to scan more than this amount of blocks.

This is because giving too long block range to scan is likely to take forever on non-graphql nodes.

Set None to ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

get_block_by_number(block_number)

Get block header data for a specific block number from our memory buffer.

Parameters

block_number (int) –

Return type

eth_defi.event_reader.block_header.BlockHeader

get_block_timestamp(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

int

get_block_timestamp_as_pandas(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

pandas._libs.tslibs.timestamps.Timestamp

get_last_block_read()

Get the number of the last block served by update_chain().

Return type

int

has_data()

Do we have any data available yet.

Return type

bool

load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)

Get the initial block buffer filled up.

You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.

Parameters
  • block_count (Optional[int]) –

    How many latest block to load

    Give start_block or block_count.

  • start_block (Optional[int]) –

    What is the first block to read.

    Give start_block or block_count.

  • tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar

  • save_callable (Optional[Callable]) –

    Save after every block.

    Called after every block.

    TODO: Hack. Design a better interface.

Returns

The initial block range to start to work with

Return type

Tuple[int, int]

load_pandas(df)

Load block header data from Pandas data frame.

Parameters

df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with to_pandas().

restore(block_map)

Restore the chain state from a saved data.

Parameters

block_map (dict) – Block number -> Block header dictionary

skip_to_block(block_number)

Skip scanning initial chain and directly start from a certain block.

Parameters

block_number (int) –

to_pandas(partition_size=0)

Convert the data to Pandas DataFrame format for storing.

Parameters

partition_size (int) –

To partition the outgoing data.

Set 0 to ignore.

Return type

pandas.core.frame.DataFrame

truncate(latest_good_block)

Delete data after a block number because chain reorg happened.

Parameters

latest_good_block (int) – Delete all data starting after this block (exclusive)

update_chain()

Update the internal memory buffer of block headers from the blockchain node.

  • Do several attempt to read data (as a fork can cause other forks can cause fork)

  • Give up after some time if we detect the chain to be in a doom loop

Returns

What block range the consumer application should read.

What we think about the chain state.

Return type

eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution

block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]

Internal buffer of our block data

Block number -> Block header data

class MockChainAndReorganisationMonitor

Bases: eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor

A dummy reorganisation monitor for unit testing.

  • Simulate block production and chain reorgs by minor forks, like a real blockchain.

  • We get the explicit control to introduce simulated forks

__init__(block_number=1, block_duration_seconds=1, **kwargs)
Parameters

block_number (int) –

simulated_block_number

Next available block number

produce_blocks(block_count=1)

Populate the fake blocks in mock chain.

These blocks will be “read” in py:meth:figure_reorganisation_and_new_blocks.

produce_fork(block_number, fork_marker='0x8888')

Mock a fork int he chain.

Parameters

block_number (int) –

get_last_block_live()

Get last block number

add_block(record)

Add new block to header tracking.

Blocks must be added in order.

Parameters

record (eth_defi.event_reader.block_header.BlockHeader) –

check_block_reorg(block_number, block_hash)

Check that newly read block matches our record.

  • Called during the event reader

  • Event reader gets the block number and hash with the event

  • We have initial block_map in memory, previously buffered in

  • We check if any of the blocks in the block map have different values on our event produces -> in this case we know there has been a chain reorganisation

If we do not have records, ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

Parameters
  • block_number (int) –

  • block_hash (str) –

Return type

Optional[int]

fetch_block_data(start_block, end_block)

Read the new block headers.

Parameters
  • start_block – The first block where to read (inclusive)

  • end_block – The block where to read (inclusive)

Return type

Iterable[eth_defi.event_reader.block_header.BlockHeader]

figure_reorganisation_and_new_blocks(max_range=1000000)

Compare the local block database against the live data from chain.

Spot the differences in (block number, block header) tuples and determine a chain reorg.

Parameters

max_range (Optional[int]) –

Abort if we need to scan more than this amount of blocks.

This is because giving too long block range to scan is likely to take forever on non-graphql nodes.

Set None to ignore.

Raises

ChainReorganisationDetected – When any if the block data in our internal buffer does not match those provided by events.

get_block_by_number(block_number)

Get block header data for a specific block number from our memory buffer.

Parameters

block_number (int) –

Return type

eth_defi.event_reader.block_header.BlockHeader

get_block_timestamp(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

int

get_block_timestamp_as_pandas(block_number)

Return UNIX UTC timestamp of a block.

Parameters

block_number (int) –

Return type

pandas._libs.tslibs.timestamps.Timestamp

get_last_block_read()

Get the number of the last block served by update_chain().

Return type

int

has_data()

Do we have any data available yet.

Return type

bool

load_initial_block_headers(block_count=None, start_block=None, tqdm=None, save_callable=None)

Get the initial block buffer filled up.

You can call this during the application start up, or when you start the chain. This interface is designed to keep the application on hold until new blocks have been served.

Parameters
  • block_count (Optional[int]) –

    How many latest block to load

    Give start_block or block_count.

  • start_block (Optional[int]) –

    What is the first block to read.

    Give start_block or block_count.

  • tqdm (Optional[Type[tqdm.std.tqdm]]) – To display a progress bar

  • save_callable (Optional[Callable]) –

    Save after every block.

    Called after every block.

    TODO: Hack. Design a better interface.

Returns

The initial block range to start to work with

Return type

Tuple[int, int]

load_pandas(df)

Load block header data from Pandas data frame.

Parameters

df (pandas.core.frame.DataFrame) – Pandas DataFrame exported with to_pandas().

restore(block_map)

Restore the chain state from a saved data.

Parameters

block_map (dict) – Block number -> Block header dictionary

skip_to_block(block_number)

Skip scanning initial chain and directly start from a certain block.

Parameters

block_number (int) –

to_pandas(partition_size=0)

Convert the data to Pandas DataFrame format for storing.

Parameters

partition_size (int) –

To partition the outgoing data.

Set 0 to ignore.

Return type

pandas.core.frame.DataFrame

truncate(latest_good_block)

Delete data after a block number because chain reorg happened.

Parameters

latest_good_block (int) – Delete all data starting after this block (exclusive)

update_chain()

Update the internal memory buffer of block headers from the blockchain node.

  • Do several attempt to read data (as a fork can cause other forks can cause fork)

  • Give up after some time if we detect the chain to be in a doom loop

Returns

What block range the consumer application should read.

What we think about the chain state.

Return type

eth_defi.event_reader.reorganisation_monitor.ChainReorganisationResolution

block_map: Dict[int, eth_defi.event_reader.block_header.BlockHeader]

Internal buffer of our block data

Block number -> Block header data

create_reorganisation_monitor(web3, check_depth=250)

Set up a chain reorganisation monitor tactic based on the node supported APIs

  • Chain reorganisation monitor detects if any of blocks at the chain tip have changed since the last read or during the read

  • create_reorgation_monitor() sets up a fast /graphql API endpoint based block scanner when the endpoint is offered by the node. This is 10x - 50x faster than JSON-RPC.

  • If /graphql endpoint is not available, then we fall back to JSON-RPC based slow reorganisation monitoring

Parameters
Returns

A reorg mon instance.

Either GraphQLReorganisationMonitor or JSONRPCReorganisationMonitor

Return type

eth_defi.event_reader.reorganisation_monitor.ReorganisationMonitor