price_oracle.oracle

Documentation for eth_defi.price_oracle.oracle Python module.

Price oracle core functionality.

This core mechanism is used by outside event feeders, like eth_defi.uniswap_v2.oracle.

Functions

time_weighted_average_price(events)

Calculate TWAP price over all entries in the buffer.

Classes

BasePriceOracle

Base class for price oracles.

FixedPriceOracle

Always use the same hardcoded exchange rate.

PriceEntry

A single source entry for price calculations.

PriceFunction

A callable for calcualte

PriceOracle

Price oracle core.

PriceSource

Different price entry sources.

TrustedStablecoinOracle

Return a price for a token we trust we can always redeem for 1 USD.

Exceptions

DataPeriodTooShort

We do not have enough events for a longer period of time.

DataTooOld

The price buffer data does not have recent enough entries..

NotEnoughData

The price buffer does not have enough data.

PriceCalculationError

Something wrong with price calculation.

class PriceSource

Bases: enum.Enum

Different price entry sources.

uniswap_v2_like_pool_sync_event = 'uniswap_v2_like_pool_sync_event'

Uniswap v2 pool and sync event

uniswap_v3_like_pool = 'uniswap_v3_like_pool'

Uniswap v3 pool

unknown = 'unknown'

Not specified

class PriceEntry

Bases: object

A single source entry for price calculations.

PriceOracle maintains a buffer of these to calculate a smoothed out price, like py:func:time_weighted_average_price.

Price entry can be sourced from:

  • Manually entered price

  • Price from Uniswap v2 sync events

  • Price from some other event

timestamp: datetime.datetime

When price entry was booked. All timestamps must be UTC, Python naive datetimes.

price: decimal.Decimal

When price entry was booked. This should be base token / quote token, in its human readable format, all decimals converted correctly.

source: eth_defi.price_oracle.oracle.PriceSource

What was the source of this price entry

volume: Optional[decimal.Decimal] = None

How much volume this trade carried (if available) Expressed in the quote token.

pool_contract_address: Optional[str] = None

Uni v2 pair contract address or similar

block_number: Optional[int] = None

Block number where this transaction happened

tx_hash: Optional[str] = None

Transaction where did we pick the event logs

block_hash: Optional[str] = None

Hash of the block where this price was picked in. Can be used to remove data for blocks in unstable chain tip.

first_seen_at_block_number: Optional[int] = None

Chain reorganisation helper. This is set on the old event when we detect duplicate entry. We never remove items from heap, but mark them deprecated. Items are eventually cleaned up when they expire.

update_chain_reorg(new_entry)

Update entry data in the case of chain reorganisation.

TODO: We are not yet dealing with the situation if the transaction gets reorganisated and rejected.

Parameters

new_entry (eth_defi.price_oracle.oracle.PriceEntry) –

__init__(timestamp, price, source, volume=None, pool_contract_address=None, block_number=None, tx_hash=None, block_hash=None, first_seen_at_block_number=None)
Parameters
Return type

None

class PriceFunction

Bases: Protocol

A callable for calcualte

You can give different function for

  • Volume weighted average

  • Time weighted average

__init__(*args, **kwargs)
exception PriceCalculationError

Bases: Exception

Something wrong with price calculation.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception NotEnoughData

Bases: eth_defi.price_oracle.oracle.PriceCalculationError

The price buffer does not have enough data.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception DataTooOld

Bases: eth_defi.price_oracle.oracle.PriceCalculationError

The price buffer data does not have recent enough entries..

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception DataPeriodTooShort

Bases: eth_defi.price_oracle.oracle.PriceCalculationError

We do not have enough events for a longer period of time.

__init__(*args, **kwargs)
__new__(**kwargs)
add_note()

Exception.add_note(note) – add a note to the exception

with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class BasePriceOracle

Bases: object

Base class for price oracles.

abstract calculate_price(block_number=None)

Get a price for the current block.

Parameters

block_number (Optional[int]) – Hint of what is the current block. We do not support prices for historical blocks, but we may cache the result of the previous block calculation for speedups.

Return type

decimal.Decimal

class PriceOracle

Bases: eth_defi.price_oracle.oracle.BasePriceOracle

Price oracle core.

  • Suitable for real-time price calculation for data coming over WebSockets

  • Suitable for point of time calculation using historical data

  • Sample data over multiple events

  • Rotate ring buffer of events when new data comes in. Uses Python heapq for this.

Example:

# Randomly chosen block range.
# 100 blocks * 3 sec / block = ~300 seconds
start_block = 14_000_000
end_block = 14_000_100

pair_details = fetch_pair_details(web3, bnb_busd_address)
assert pair_details.token0.symbol == "WBNB"
assert pair_details.token1.symbol == "BUSD"

oracle = PriceOracle(
    time_weighted_average_price,
    max_age=PriceOracle.ANY_AGE,  # We are dealing with historical data
    min_duration=datetime.timedelta(minutes=1),
)

update_price_oracle_with_sync_events_single_thread(
    oracle,
    web3,
    bnb_busd_address,
    start_block,
    end_block,
)

assert oracle.calculate_price() == pytest.approx(Decimal("523.8243566658033237353702655"))

Create a new price oracle.

The security parameters are set for a simple defaults.

Parameters
  • price_function – What function we use to calculate the price based on the events. Defaults to time-weighted average price.

  • target_time_window – What is the target time window for us to calculate the time function. Truncation will discard older data. Only relevant for real-time price oracles.

  • exchange_rate_oracle – If we depend on the secondary price data to calculate the price. E.g. converting AAVE/ETH rate to AAVE/USD using ETH/USDC pool price oracle.

  • max_age – A trip wire to detect corruption in real time data feeds. If the most recent entry in the buffer is older than this, throw an exception. This usually means we have stale data in our buffer and some price source pool has stopped working.

  • min_entries – The minimum number of entries we want to have to calculate the price reliably.

ANY_AGE = datetime.timedelta(days=36500)

An “infinite” place holder for max age

__init__(price_function, target_time_window=datetime.timedelta(seconds=300), min_duration=datetime.timedelta(seconds=3600), max_age=datetime.timedelta(seconds=14400), min_entries=8)

Create a new price oracle.

The security parameters are set for a simple defaults.

Parameters
  • price_function (eth_defi.price_oracle.oracle.PriceFunction) – What function we use to calculate the price based on the events. Defaults to time-weighted average price.

  • target_time_window (datetime.timedelta) – What is the target time window for us to calculate the time function. Truncation will discard older data. Only relevant for real-time price oracles.

  • exchange_rate_oracle – If we depend on the secondary price data to calculate the price. E.g. converting AAVE/ETH rate to AAVE/USD using ETH/USDC pool price oracle.

  • max_age (datetime.timedelta) – A trip wire to detect corruption in real time data feeds. If the most recent entry in the buffer is older than this, throw an exception. This usually means we have stale data in our buffer and some price source pool has stopped working.

  • min_entries (int) – The minimum number of entries we want to have to calculate the price reliably.

  • min_duration (datetime.timedelta) –

get_last_refreshed()

When the oracle data was refreshed last time.

To figure out max age in real time tracking mode.

Return type

datetime.datetime

update_last_refresh(block_number, timestamp)

Update the last seen block.

Parameters
check_data_quality(now_=None)

Raises one of PriceCalculationError subclasses if our data is not good enough to calculate the oracle price.

See PriceCalculationError

Parameters
Raises

PriceCalculationError – If we have data quality issues

calculate_price(block_number=None)

Calculate the price based on the data in the price data buffer.

Raises

PriceCalculationError – If we have data quality issues.

Parameters

block_number (Optional[int]) –

Return type

decimal.Decimal

add_price_entry(evt)

Add price entry to the ring buffer.

Note

It is not safe to call this function multiple times for the same event.

Further reading

Parameters

evt (eth_defi.price_oracle.oracle.PriceEntry) –

add_price_entry_reorg_safe(evt)

Add price entry to the ring buffer with support for fixing chain reorganisations.

Transactions may hop between different blocks when the chain tip reorganises, getting a new timestamp. In this case, we update the

Note

It is safe to call this function multiple times for the same event.

Returns

True if the transaction hopped to a different block

Parameters

evt (eth_defi.price_oracle.oracle.PriceEntry) –

Return type

bool

get_by_transaction_hash(tx_hash)

Get an event by transaction hash.

Parameters

tx_hash (str) –

Return type

Optional[eth_defi.price_oracle.oracle.PriceEntry]

get_newest()

Return the newest price entry.

Return type

Optional[eth_defi.price_oracle.oracle.PriceEntry]

get_oldest()

Return the oldest price entry.

Return type

Optional[eth_defi.price_oracle.oracle.PriceEntry]

get_buffer_duration()

How long time is the time we have price events in the buffer for.

Return type

datetime.timedelta

feed_simple_data(data, source=PriceSource.unknown)

Feed sample data to the price oracle from a Python dict.

This method is mostly for testing: for actual implementation construct your PriceEntry instances yourself.

Example:

price_data = {
    datetime.datetime(2021, 1, 3): Decimal(100),
    datetime.datetime(2021, 1, 2): Decimal(150),
    datetime.datetime(2021, 1, 1): Decimal(120),
}

oracle = PriceOracle(
    time_weighted_average_price,
)

oracle.feed_simple_data(price_data)
Parameters

data (Dict[datetime.datetime, decimal.Decimal]) –

truncate_buffer(current_timestamp)

Delete old data in the buffer that is no longer relevant for our price calculation.

Returns

Numbers of items that where discared

Parameters

current_timestamp (datetime.datetime) –

Return type

int

time_weighted_average_price(events)

Calculate TWAP price over all entries in the buffer.

Calculates the price using statistics.mean().

Further reading:

Parameters

events (List[eth_defi.price_oracle.oracle.PriceEntry]) –

Return type

decimal.Decimal

class TrustedStablecoinOracle

Bases: eth_defi.price_oracle.oracle.BasePriceOracle

Return a price for a token we trust we can always redeem for 1 USD.

calculate_price(block_number=None)

Get a price for the current block.

Parameters

block_number (Optional[int]) – Hint of what is the current block. We do not support prices for historical blocks, but we may cache the result of the previous block calculation for speedups.

Return type

decimal.Decimal

class FixedPriceOracle

Bases: eth_defi.price_oracle.oracle.BasePriceOracle

Always use the same hardcoded exchange rate.

Most useful for unit testing.

__init__(exchange_rate)
Parameters

exchange_rate (decimal.Decimal) –

calculate_price(block_number=None)

Get a price for the current block.

Parameters

block_number (Optional[int]) – Hint of what is the current block. We do not support prices for historical blocks, but we may cache the result of the previous block calculation for speedups.

Return type

decimal.Decimal