Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions electrumx/server/block_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat

# Caches of unflushed items.
self.headers = []
self.tx_hashes = []
self.tx_hashes = [] # type: List[bytes]
self.undo_infos = [] # type: List[Tuple[Sequence[bytes], int]]

# UTXO cache
Expand Down Expand Up @@ -347,9 +347,16 @@ def estimate_txs_remaining(self):
def flush_data(self):
'''The data for a flush. The lock must be taken.'''
assert self.state_lock.locked()
return FlushData(self.height, self.tx_count, self.headers,
self.tx_hashes, self.undo_infos, self.utxo_cache,
self.db_deletes, self.tip)
return FlushData(
height=self.height,
tx_count=self.tx_count,
headers=self.headers,
block_tx_hashes=self.tx_hashes,
undo_infos=self.undo_infos,
adds=self.utxo_cache,
deletes=self.db_deletes,
tip=self.tip,
)

async def flush(self, flush_utxos):
def flush():
Expand All @@ -368,7 +375,7 @@ async def _maybe_flush(self):
await self.flush(flush_arg)
self.next_cache_check = time.monotonic() + 30

def check_cache_size(self):
def check_cache_size(self) -> Optional[bool]:
'''Flush a cache if it gets too big.'''
# Good average estimates based on traversal of subobjects and
# requesting size from Python (see deep_getsizeof).
Expand Down
33 changes: 11 additions & 22 deletions electrumx/server/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,11 @@ def __init__(self, env: 'Env'):
# "undo data: list of UTXOs spent at block height"
self.utxo_db = None

self.utxo_flush_count = 0
self.fs_height = -1
self.fs_tx_count = 0
self.db_height = -1
self.db_tx_count = 0
self.db_tip = None # type: Optional[bytes]
self.tx_counts = None
self.last_flush = time.time()
self.last_flush_tx_count = 0
self.wall_time = 0
Expand All @@ -129,6 +127,7 @@ def __init__(self, env: 'Env'):
# on-disk: raw block headers in chain order
self.headers_file = util.LogicalFile('meta/headers', 2, 16000000)
# on-disk: cumulative number of txs at the end of height N
self.tx_counts = None # type: Optional[array]
self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000)
# on-disk: 32 byte txids in chain order, allows (tx_num -> txid) map
self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000)
Expand All @@ -150,7 +149,7 @@ async def _read_tx_counts(self):
else:
assert self.db_tx_count == 0

async def _open_dbs(self, for_sync: bool, compacting: bool):
async def _open_dbs(self, *, for_sync: bool):
assert self.utxo_db is None

# First UTXO DB
Expand All @@ -169,25 +168,24 @@ async def _open_dbs(self, for_sync: bool, compacting: bool):
self.read_utxo_state()

# Then history DB
self.utxo_flush_count = self.history.open_db(self.db_class, for_sync,
self.utxo_flush_count,
compacting)
self.history.open_db(
db_class=self.db_class,
for_sync=for_sync,
utxo_db_tx_count=self.db_tx_count,
)
self.clear_excess_undo_info()

# Read TX counts (requires meta directory)
await self._read_tx_counts()

async def open_for_compacting(self):
await self._open_dbs(True, True)

async def open_for_sync(self):
'''Open the databases to sync to the daemon.

When syncing we want to reserve a lot of open files for the
synchronization. When serving clients we want the open files for
serving network connections.
'''
await self._open_dbs(True, False)
await self._open_dbs(for_sync=True)

async def open_for_serving(self):
'''Open the databases for serving. If they are already open they are
Expand All @@ -198,7 +196,7 @@ async def open_for_serving(self):
self.utxo_db.close()
self.history.close_db()
self.utxo_db = None
await self._open_dbs(False, False)
await self._open_dbs(for_sync=False)

# Header merkle cache

Expand Down Expand Up @@ -254,7 +252,7 @@ def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining):
self.flush_state(self.utxo_db)

elapsed = self.last_flush - start_time
self.logger.info(f'flush #{self.history.flush_count:,d} took '
self.logger.info(f'flush took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')

Expand Down Expand Up @@ -353,7 +351,6 @@ def flush_utxo_db(self, batch, flush_data: FlushData):
f'{spend_count:,d} spends in '
f'{elapsed:.1f}s, committing...')

self.utxo_flush_count = self.history.flush_count
self.db_height = flush_data.height
self.db_tx_count = flush_data.tx_count
self.db_tip = flush_data.tip
Expand Down Expand Up @@ -384,7 +381,7 @@ def flush_backup(self, flush_data, touched):
self.flush_state(batch)

elapsed = self.last_flush - start_time
self.logger.info(f'backup flush #{self.history.flush_count:,d} took '
self.logger.info(f'backup flush took '
f'{elapsed:.1f}s. Height {flush_data.height:,d} '
f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})')

Expand Down Expand Up @@ -595,7 +592,6 @@ def read_utxo_state(self):
self.db_tx_count = 0
self.db_tip = b'\0' * 32
self.db_version = max(self.DB_VERSIONS)
self.utxo_flush_count = 0
self.wall_time = 0
self.first_sync = True
else:
Expand All @@ -617,7 +613,6 @@ def read_utxo_state(self):
self.db_height = state['height']
self.db_tx_count = state['tx_count']
self.db_tip = state['tip']
self.utxo_flush_count = state['utxo_flush_count']
self.wall_time = state['wall_time']
self.first_sync = state['first_sync']

Expand Down Expand Up @@ -735,18 +730,12 @@ def write_utxo_state(self, batch):
'height': self.db_height,
'tx_count': self.db_tx_count,
'tip': self.db_tip,
'utxo_flush_count': self.utxo_flush_count,
'wall_time': self.wall_time,
'first_sync': self.first_sync,
'db_version': self.db_version,
}
batch.put(b'state', repr(state).encode())

def set_flush_count(self, count):
self.utxo_flush_count = count
with self.utxo_db.write_batch() as batch:
self.write_utxo_state(batch)

async def all_utxos(self, hashX):
'''Return all UTXOs for an address sorted in no particular order.'''
def read_utxos():
Expand Down
Loading