diff --git a/electrumx/server/block_processor.py b/electrumx/server/block_processor.py index dd4b9a590..6f1ecf54e 100644 --- a/electrumx/server/block_processor.py +++ b/electrumx/server/block_processor.py @@ -186,7 +186,7 @@ def __init__(self, env: 'Env', db: DB, daemon: Daemon, notifications: 'Notificat # Caches of unflushed items. self.headers = [] - self.tx_hashes = [] + self.tx_hashes = [] # type: List[bytes] self.undo_infos = [] # type: List[Tuple[Sequence[bytes], int]] # UTXO cache @@ -347,9 +347,16 @@ def estimate_txs_remaining(self): def flush_data(self): '''The data for a flush. The lock must be taken.''' assert self.state_lock.locked() - return FlushData(self.height, self.tx_count, self.headers, - self.tx_hashes, self.undo_infos, self.utxo_cache, - self.db_deletes, self.tip) + return FlushData( + height=self.height, + tx_count=self.tx_count, + headers=self.headers, + block_tx_hashes=self.tx_hashes, + undo_infos=self.undo_infos, + adds=self.utxo_cache, + deletes=self.db_deletes, + tip=self.tip, + ) async def flush(self, flush_utxos): def flush(): @@ -368,7 +375,7 @@ async def _maybe_flush(self): await self.flush(flush_arg) self.next_cache_check = time.monotonic() + 30 - def check_cache_size(self): + def check_cache_size(self) -> Optional[bool]: '''Flush a cache if it gets too big.''' # Good average estimates based on traversal of subobjects and # requesting size from Python (see deep_getsizeof). diff --git a/electrumx/server/db.py b/electrumx/server/db.py index fae1885d6..39bae2df1 100644 --- a/electrumx/server/db.py +++ b/electrumx/server/db.py @@ -107,13 +107,11 @@ def __init__(self, env: 'Env'): # "undo data: list of UTXOs spent at block height" self.utxo_db = None - self.utxo_flush_count = 0 self.fs_height = -1 self.fs_tx_count = 0 self.db_height = -1 self.db_tx_count = 0 self.db_tip = None # type: Optional[bytes] - self.tx_counts = None self.last_flush = time.time() self.last_flush_tx_count = 0 self.wall_time = 0 @@ -129,6 +127,7 @@ def __init__(self, env: 'Env'): # on-disk: raw block headers in chain order self.headers_file = util.LogicalFile('meta/headers', 2, 16000000) # on-disk: cumulative number of txs at the end of height N + self.tx_counts = None # type: Optional[array] self.tx_counts_file = util.LogicalFile('meta/txcounts', 2, 2000000) # on-disk: 32 byte txids in chain order, allows (tx_num -> txid) map self.hashes_file = util.LogicalFile('meta/hashes', 4, 16000000) @@ -150,7 +149,7 @@ async def _read_tx_counts(self): else: assert self.db_tx_count == 0 - async def _open_dbs(self, for_sync: bool, compacting: bool): + async def _open_dbs(self, *, for_sync: bool): assert self.utxo_db is None # First UTXO DB @@ -169,17 +168,16 @@ async def _open_dbs(self, for_sync: bool, compacting: bool): self.read_utxo_state() # Then history DB - self.utxo_flush_count = self.history.open_db(self.db_class, for_sync, - self.utxo_flush_count, - compacting) + self.history.open_db( + db_class=self.db_class, + for_sync=for_sync, + utxo_db_tx_count=self.db_tx_count, + ) self.clear_excess_undo_info() # Read TX counts (requires meta directory) await self._read_tx_counts() - async def open_for_compacting(self): - await self._open_dbs(True, True) - async def open_for_sync(self): '''Open the databases to sync to the daemon. @@ -187,7 +185,7 @@ async def open_for_sync(self): synchronization. When serving clients we want the open files for serving network connections. ''' - await self._open_dbs(True, False) + await self._open_dbs(for_sync=True) async def open_for_serving(self): '''Open the databases for serving. If they are already open they are @@ -198,7 +196,7 @@ async def open_for_serving(self): self.utxo_db.close() self.history.close_db() self.utxo_db = None - await self._open_dbs(False, False) + await self._open_dbs(for_sync=False) # Header merkle cache @@ -254,7 +252,7 @@ def flush_dbs(self, flush_data, flush_utxos, estimate_txs_remaining): self.flush_state(self.utxo_db) elapsed = self.last_flush - start_time - self.logger.info(f'flush #{self.history.flush_count:,d} took ' + self.logger.info(f'flush took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') @@ -353,7 +351,6 @@ def flush_utxo_db(self, batch, flush_data: FlushData): f'{spend_count:,d} spends in ' f'{elapsed:.1f}s, committing...') - self.utxo_flush_count = self.history.flush_count self.db_height = flush_data.height self.db_tx_count = flush_data.tx_count self.db_tip = flush_data.tip @@ -384,7 +381,7 @@ def flush_backup(self, flush_data, touched): self.flush_state(batch) elapsed = self.last_flush - start_time - self.logger.info(f'backup flush #{self.history.flush_count:,d} took ' + self.logger.info(f'backup flush took ' f'{elapsed:.1f}s. Height {flush_data.height:,d} ' f'txs: {flush_data.tx_count:,d} ({tx_delta:+,d})') @@ -595,7 +592,6 @@ def read_utxo_state(self): self.db_tx_count = 0 self.db_tip = b'\0' * 32 self.db_version = max(self.DB_VERSIONS) - self.utxo_flush_count = 0 self.wall_time = 0 self.first_sync = True else: @@ -617,7 +613,6 @@ def read_utxo_state(self): self.db_height = state['height'] self.db_tx_count = state['tx_count'] self.db_tip = state['tip'] - self.utxo_flush_count = state['utxo_flush_count'] self.wall_time = state['wall_time'] self.first_sync = state['first_sync'] @@ -735,18 +730,12 @@ def write_utxo_state(self, batch): 'height': self.db_height, 'tx_count': self.db_tx_count, 'tip': self.db_tip, - 'utxo_flush_count': self.utxo_flush_count, 'wall_time': self.wall_time, 'first_sync': self.first_sync, 'db_version': self.db_version, } batch.put(b'state', repr(state).encode()) - def set_flush_count(self, count): - self.utxo_flush_count = count - with self.utxo_db.write_batch() as batch: - self.write_utxo_state(batch) - async def all_utxos(self, hashX): '''Return all UTXOs for an address sorted in no particular order.''' def read_utxos(): diff --git a/electrumx/server/history.py b/electrumx/server/history.py index dc7dd8d54..1cd2f0008 100644 --- a/electrumx/server/history.py +++ b/electrumx/server/history.py @@ -25,46 +25,37 @@ TXNUM_LEN = 5 -FLUSHID_LEN = 2 class History: - DB_VERSIONS = (0, 1) + DB_VERSIONS = (0, 1, 2) db: Optional['Storage'] def __init__(self): self.logger = util.class_logger(__name__, self.__class__.__name__) - # For history compaction - self.max_hist_row_entries = 12500 self.unflushed = defaultdict(bytearray) self.unflushed_count = 0 - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 + self.hist_db_tx_count = 0 + self.hist_db_tx_count_next = 0 # after next flush, next value for self.hist_db_tx_count self.db_version = max(self.DB_VERSIONS) self.upgrade_cursor = -1 - # Key: address_hashX + flush_id - # Value: sorted "list" of tx_nums in history of hashX + # Key: address_hashX + tx_num + # Value: self.db = None def open_db( self, + *, db_class: Type['Storage'], for_sync: bool, - utxo_flush_count: int, - compacting: bool, - ): + utxo_db_tx_count: int, + ) -> None: self.db = db_class('hist', for_sync) self.read_state() - self.clear_excess(utxo_flush_count) - # An incomplete compaction needs to be cancelled otherwise - # restarting it will corrupt the history - if not compacting: - self._cancel_compaction() - return self.flush_count + self.clear_excess(utxo_db_tx_count) def close_db(self): if self.db: @@ -77,17 +68,10 @@ def read_state(self): state = ast.literal_eval(state.decode()) if not isinstance(state, dict): raise RuntimeError('failed reading state from history DB') - self.flush_count = state['flush_count'] - self.comp_flush_count = state.get('comp_flush_count', -1) - self.comp_cursor = state.get('comp_cursor', -1) self.db_version = state.get('db_version', 0) self.upgrade_cursor = state.get('upgrade_cursor', -1) - else: - self.flush_count = 0 - self.comp_flush_count = -1 - self.comp_cursor = -1 - self.db_version = max(self.DB_VERSIONS) - self.upgrade_cursor = -1 + self.hist_db_tx_count = state.get('hist_db_tx_count', 0) + self.hist_db_tx_count_next = self.hist_db_tx_count if self.db_version not in self.DB_VERSIONS: msg = (f'your history DB version is {self.db_version} but ' @@ -97,26 +81,37 @@ def read_state(self): if self.db_version != max(self.DB_VERSIONS): self.upgrade_db() self.logger.info(f'history DB version: {self.db_version}') - self.logger.info(f'flush count: {self.flush_count:,d}') - def clear_excess(self, utxo_flush_count): - # < might happen at end of compaction as both DBs cannot be - # updated atomically - if self.flush_count <= utxo_flush_count: + def clear_excess(self, utxo_db_tx_count: int) -> None: + # self.hist_db_tx_count != utxo_db_tx_count might happen as + # both DBs cannot be updated atomically + # FIXME when advancing blocks, hist_db is flushed first, so its count can be higher; + # but when backing up (e.g. reorg), hist_db is flushed first as well, + # so its count can be lower?! + # Shouldn't we flush utxo_db first when backing up? + if self.hist_db_tx_count <= utxo_db_tx_count: + assert self.hist_db_tx_count == utxo_db_tx_count return self.logger.info('DB shut down uncleanly. Scanning for ' 'excess history flushes...') + key_len = HASHX_LEN + TXNUM_LEN + txnum_padding = bytes(8-TXNUM_LEN) keys = [] - for key, _hist in self.db.iterator(prefix=b''): - flush_id, = unpack_be_uint16_from(key[-FLUSHID_LEN:]) - if flush_id > utxo_flush_count: - keys.append(key) + for db_key, db_val in self.db.iterator(prefix=b''): + # Ignore non-history entries + if len(db_key) != key_len: + continue + tx_numb = db_key[HASHX_LEN:] + tx_num, = unpack_le_uint64(tx_numb + txnum_padding) + if tx_num >= utxo_db_tx_count: + keys.append(db_key) self.logger.info(f'deleting {len(keys):,d} history entries') - self.flush_count = utxo_flush_count + self.hist_db_tx_count = utxo_db_tx_count + self.hist_db_tx_count_next = self.hist_db_tx_count with self.db.write_batch() as batch: for key in keys: batch.delete(key) @@ -127,19 +122,17 @@ def clear_excess(self, utxo_flush_count): def write_state(self, batch): '''Write state to the history DB.''' state = { - 'flush_count': self.flush_count, - 'comp_flush_count': self.comp_flush_count, - 'comp_cursor': self.comp_cursor, + 'hist_db_tx_count': self.hist_db_tx_count, 'db_version': self.db_version, 'upgrade_cursor': self.upgrade_cursor, } - # History entries are not prefixed; the suffix \0\0 ensures we - # look similar to other entries and aren't interfered with + # History entries are not prefixed; the suffix \0\0 is just for legacy reasons batch.put(b'state\0\0', repr(state).encode()) def add_unflushed(self, hashXs_by_tx, first_tx_num): unflushed = self.unflushed count = 0 + tx_num = None for tx_num, hashXs in enumerate(hashXs_by_tx, start=first_tx_num): tx_numb = pack_le_uint64(tx_num)[:TXNUM_LEN] hashXs = set(hashXs) @@ -147,6 +140,9 @@ def add_unflushed(self, hashXs_by_tx, first_tx_num): unflushed[hashX] += tx_numb count += len(hashXs) self.unflushed_count += count + if tx_num is not None: + assert self.hist_db_tx_count_next + len(hashXs_by_tx) == tx_num + 1 + self.hist_db_tx_count_next = tx_num + 1 def unflushed_memsize(self): return len(self.unflushed) * 180 + self.unflushed_count * TXNUM_LEN @@ -156,14 +152,15 @@ def assert_flushed(self): def flush(self): start_time = time.monotonic() - self.flush_count += 1 - flush_id = pack_be_uint16(self.flush_count) unflushed = self.unflushed + chunks = util.chunks with self.db.write_batch() as batch: for hashX in sorted(unflushed): - key = hashX + flush_id - batch.put(key, bytes(unflushed[hashX])) + for tx_num in chunks(unflushed[hashX], TXNUM_LEN): + db_key = hashX + tx_num + batch.put(db_key, b'') + self.hist_db_tx_count = self.hist_db_tx_count_next self.write_state(batch) count = len(unflushed) @@ -176,34 +173,24 @@ def flush(self): f'for {count:,d} addrs') def backup(self, hashXs, tx_count): - # Not certain this is needed, but it doesn't hurt - self.flush_count += 1 + self.assert_flushed() nremoves = 0 - bisect_left = bisect.bisect_left - chunks = util.chunks - txnum_padding = bytes(8-TXNUM_LEN) with self.db.write_batch() as batch: for hashX in sorted(hashXs): deletes = [] - puts = {} - for key, hist in self.db.iterator(prefix=hashX, reverse=True): - a = array( - 'Q', - b''.join(item + txnum_padding for item in chunks(hist, TXNUM_LEN)) - ) - # Remove all history entries >= tx_count - idx = bisect_left(a, tx_count) - nremoves += len(a) - idx - if idx > 0: - puts[key] = hist[:TXNUM_LEN * idx] + for db_key, db_val in self.db.iterator(prefix=hashX, reverse=True): + tx_numb = db_key[HASHX_LEN:] + tx_num, = unpack_le_uint64(tx_numb + txnum_padding) + if tx_num >= tx_count: + nremoves += 1 + deletes.append(db_key) + else: break - deletes.append(key) - for key in deletes: batch.delete(key) - for key, value in puts.items(): - batch.put(key, value) + self.hist_db_tx_count = tx_count + self.hist_db_tx_count_next = self.hist_db_tx_count self.write_state(batch) self.logger.info(f'backing up removed {nremoves:,d} history entries') @@ -214,191 +201,115 @@ def get_txnums(self, hashX, limit=1000): transactions. By default yields at most 1000 entries. Set limit to None to get them all. ''' limit = util.resolve_limit(limit) - chunks = util.chunks txnum_padding = bytes(8-TXNUM_LEN) - for _key, hist in self.db.iterator(prefix=hashX): - for tx_numb in chunks(hist, TXNUM_LEN): - if limit == 0: - return - tx_num, = unpack_le_uint64(tx_numb + txnum_padding) - yield tx_num - limit -= 1 - - # - # History compaction - # - - # comp_cursor is a cursor into compaction progress. - # -1: no compaction in progress - # 0-65535: Compaction in progress; all prefixes < comp_cursor have - # been compacted, and later ones have not. - # 65536: compaction complete in-memory but not flushed - # - # comp_flush_count applies during compaction, and is a flush count - # for history with prefix < comp_cursor. flush_count applies - # to still uncompacted history. It is -1 when no compaction is - # taking place. Key suffixes up to and including comp_flush_count - # are used, so a parallel history flush must first increment this - # - # When compaction is complete and the final flush takes place, - # flush_count is reset to comp_flush_count, and comp_flush_count to -1 - - def _flush_compaction(self, cursor, write_items, keys_to_delete): - '''Flush a single compaction pass as a batch.''' - # Update compaction state - if cursor == 65536: - self.flush_count = self.comp_flush_count - self.comp_cursor = -1 - self.comp_flush_count = -1 - else: - self.comp_cursor = cursor - - # History DB. Flush compacted history and updated state - with self.db.write_batch() as batch: - # Important: delete first! The keyspace may overlap. - for key in keys_to_delete: - batch.delete(key) - for key, value in write_items: - batch.put(key, value) - self.write_state(batch) - - def _compact_hashX(self, hashX, hist_map, hist_list, - write_items, keys_to_delete): - '''Compres history for a hashX. hist_list is an ordered list of - the histories to be compressed.''' - # History entries (tx numbers) are TXNUM_LEN bytes each. Distribute - # over rows of up to 50KB in size. A fixed row size means - # future compactions will not need to update the first N - 1 - # rows. - max_row_size = self.max_hist_row_entries * TXNUM_LEN - full_hist = b''.join(hist_list) - nrows = (len(full_hist) + max_row_size - 1) // max_row_size - if nrows > 4: - self.logger.info( - f'hashX {hash_to_hex_str(hashX)} is large: ' - f'{len(full_hist) // TXNUM_LEN:,d} entries across {nrows:,d} rows' - ) - - # Find what history needs to be written, and what keys need to - # be deleted. Start by assuming all keys are to be deleted, - # and then remove those that are the same on-disk as when - # compacted. - write_size = 0 - keys_to_delete.update(hist_map) - for n, chunk in enumerate(util.chunks(full_hist, max_row_size)): - key = hashX + pack_be_uint16(n) - if hist_map.get(key) == chunk: - keys_to_delete.remove(key) - else: - write_items.append((key, chunk)) - write_size += len(chunk) - - assert n + 1 == nrows - self.comp_flush_count = max(self.comp_flush_count, n) - - return write_size - - def _compact_prefix(self, prefix, write_items, keys_to_delete): - '''Compact all history entries for hashXs beginning with the - given prefix. Update keys_to_delete and write.''' - prior_hashX = None - hist_map = {} - hist_list = [] - - key_len = HASHX_LEN + FLUSHID_LEN - write_size = 0 - for key, hist in self.db.iterator(prefix=prefix): - # Ignore non-history entries - if len(key) != key_len: - continue - hashX = key[:-FLUSHID_LEN] - if hashX != prior_hashX and prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, - hist_list, write_items, - keys_to_delete) - hist_map.clear() - hist_list.clear() - prior_hashX = hashX - hist_map[key] = hist - hist_list.append(hist) - - if prior_hashX: - write_size += self._compact_hashX(prior_hashX, hist_map, hist_list, - write_items, keys_to_delete) - return write_size - - def _compact_history(self, limit): - '''Inner loop of history compaction. Loops until limit bytes have - been processed. - ''' - keys_to_delete = set() - write_items = [] # A list of (key, value) pairs - write_size = 0 - - # Loop over 2-byte prefixes - cursor = self.comp_cursor - while write_size < limit and cursor < 65536: - prefix = pack_be_uint16(cursor) - write_size += self._compact_prefix(prefix, write_items, - keys_to_delete) - cursor += 1 - - max_rows = self.comp_flush_count + 1 - self._flush_compaction(cursor, write_items, keys_to_delete) - - self.logger.info( - f'history compaction: wrote {len(write_items):,d} rows ' - f'({write_size / 1000000:.1f} MB), removed ' - f'{len(keys_to_delete):,d} rows, largest: {max_rows:,d}, ' - f'{100 * cursor / 65536:.1f}% complete' - ) - return write_size - - def _cancel_compaction(self): - if self.comp_cursor != -1: - self.logger.warning('cancelling in-progress history compaction') - self.comp_flush_count = -1 - self.comp_cursor = -1 + for db_key, db_val in self.db.iterator(prefix=hashX): + tx_numb = db_key[HASHX_LEN:] + if limit == 0: + return + tx_num, = unpack_le_uint64(tx_numb + txnum_padding) + yield tx_num + limit -= 1 # # DB upgrade # def upgrade_db(self): - self.logger.info(f'history DB version: {self.db_version}') + self.logger.info(f'history DB current version: {self.db_version}. ' + f'latest is: {max(self.DB_VERSIONS)}') self.logger.info('Upgrading your history DB; this can take some time...') - def upgrade_cursor(cursor): + def convert_version_1(): + def upgrade_cursor(cursor): + count = 0 + prefix = pack_be_uint16(cursor) + key_len = HASHX_LEN + 2 + chunks = util.chunks + with self.db.write_batch() as batch: + batch_put = batch.put + for key, hist in self.db.iterator(prefix=prefix): + # Ignore non-history entries + if len(key) != key_len: + continue + count += 1 + hist = b''.join(item + b'\0' for item in chunks(hist, 4)) + batch_put(key, hist) + self.upgrade_cursor = cursor + self.write_state(batch) + return count + + last = time.monotonic() count = 0 - prefix = pack_be_uint16(cursor) - key_len = HASHX_LEN + 2 - chunks = util.chunks + + for cursor in range(self.upgrade_cursor + 1, 65536): + count += upgrade_cursor(cursor) + now = time.monotonic() + if now > last + 10: + last = now + self.logger.info(f'history DB v0->v1: {count:,d} entries updated, ' + f'{cursor * 100 / 65536:.1f}% complete') + + self.db_version = 1 + self.upgrade_cursor = -1 with self.db.write_batch() as batch: - batch_put = batch.put - for key, hist in self.db.iterator(prefix=prefix): - # Ignore non-history entries - if len(key) != key_len: - continue - count += 1 - hist = b''.join(item + b'\0' for item in chunks(hist, 4)) - batch_put(key, hist) - self.upgrade_cursor = cursor self.write_state(batch) - return count + self.logger.info('history DB upgraded to v1 successfully') + + def convert_version_2(): + # old schema: + # Key: address_hashX + flush_id + # Value: sorted "list" of tx_nums in history of hashX + # ----- + # new schema: + # Key: address_hashX + tx_num + # Value: + + def upgrade_cursor(cursor): + count = 0 + prefix = pack_be_uint16(cursor) + key_len = HASHX_LEN + 2 + chunks = util.chunks + txnum_padding = bytes(8-TXNUM_LEN) + with self.db.write_batch() as batch: + batch_put = batch.put + batch_delete = batch.delete + max_tx_num = 0 + for db_key, db_val in self.db.iterator(prefix=prefix): + # Ignore non-history entries + if len(db_key) != key_len: + continue + count += 1 + batch_delete(db_key) + hashX = db_key[:HASHX_LEN] + for tx_numb in chunks(db_val, 5): + batch_put(hashX + tx_numb, b'') + tx_num, = unpack_le_uint64(tx_numb + txnum_padding) + max_tx_num = max(max_tx_num, tx_num) + self.upgrade_cursor = cursor + self.hist_db_tx_count = max(self.hist_db_tx_count, max_tx_num + 1) + self.hist_db_tx_count_next = self.hist_db_tx_count + self.write_state(batch) + return count + + last = time.monotonic() + count = 0 - last = time.monotonic() - count = 0 + for cursor in range(self.upgrade_cursor + 1, 65536): + count += upgrade_cursor(cursor) + now = time.monotonic() + if now > last + 10: + last = now + self.logger.info(f'history DB v1->v2: {count:,d} entries updated, ' + f'{cursor * 100 / 65536:.1f}% complete') - for cursor in range(self.upgrade_cursor + 1, 65536): - count += upgrade_cursor(cursor) - now = time.monotonic() - if now > last + 10: - last = now - self.logger.info(f'DB 3 of 3: {count:,d} entries updated, ' - f'{cursor * 100 / 65536:.1f}% complete') + self.db_version = 2 + self.upgrade_cursor = -1 + with self.db.write_batch() as batch: + self.write_state(batch) + self.logger.info('history DB upgraded to v2 successfully') + if self.db_version == 0: + convert_version_1() + if self.db_version == 1: + convert_version_2() self.db_version = max(self.DB_VERSIONS) - self.upgrade_cursor = -1 - with self.db.write_batch() as batch: - self.write_state(batch) - self.logger.info('DB 3 of 3 upgraded successfully') diff --git a/electrumx/server/session.py b/electrumx/server/session.py index 0b26eb542..26799d255 100644 --- a/electrumx/server/session.py +++ b/electrumx/server/session.py @@ -321,7 +321,6 @@ def _get_info(self): 'daemon': self.daemon.logged_url(), 'daemon height': self.daemon.cached_height(), 'db height': self.db.db_height, - 'db_flush_count': self.db.history.flush_count, 'groups': len(self.session_groups), 'history cache': cache_fmt.format( self._history_lookups, self._history_hits, len(self._history_cache)), diff --git a/electrumx_compact_history b/electrumx_compact_history deleted file mode 100755 index 35f25e726..000000000 --- a/electrumx_compact_history +++ /dev/null @@ -1,82 +0,0 @@ -#!/usr/bin/env python3 -# -# Copyright (c) 2017, Neil Booth -# -# All rights reserved. -# -# See the file "LICENCE" for information about the copyright -# and warranty status of this software. - -'''Script to compact the history database. This should save space and -will reset the flush counter to a low number, avoiding overflow when -the flush count reaches 65,536. - -This needs to lock the database so ElectrumX must not be running - -shut it down cleanly first. - -It is recommended you run this script with the same environment as -ElectrumX. However it is intended to be runnable with just -DB_DIRECTORY and COIN set (COIN defaults as for ElectrumX). - -If you use daemon tools, you might run this script like so: - - envdir /path/to/the/environment/directory ./compact_history.py - -Depending on your hardware this script may take up to 6 hours to -complete; it logs progress regularly. - -Compaction can be interrupted and restarted harmlessly and will pick -up where it left off. However, if you restart ElectrumX without -running the compaction to completion, it will not benefit and -subsequent compactions will restart from the beginning. -''' - -import asyncio -import logging -import sys -import traceback -from os import environ - -from electrumx import Env -from electrumx.server.db import DB - - -async def compact_history(): - if sys.version_info < (3, 7): - raise RuntimeError('Python >= 3.7 is required to run ElectrumX') - - environ['DAEMON_URL'] = '' # Avoid Env erroring out - env = Env() - db = DB(env) - await db.open_for_compacting() - - assert not db.first_sync - history = db.history - # Continue where we left off, if interrupted - if history.comp_cursor == -1: - history.comp_cursor = 0 - - history.comp_flush_count = max(history.comp_flush_count, 1) - limit = 8 * 1000 * 1000 - - while history.comp_cursor != -1: - history._compact_history(limit) - - # When completed also update the UTXO flush count - db.set_flush_count(history.flush_count) - -def main(): - logging.basicConfig(level=logging.INFO) - logging.info('Starting history compaction...') - loop = asyncio.get_event_loop() - try: - loop.run_until_complete(compact_history()) - except Exception: - traceback.print_exc() - logging.critical('History compaction terminated abnormally') - else: - logging.info('History compaction complete') - - -if __name__ == '__main__': - main() diff --git a/setup.py b/setup.py index fe794bede..cafdbc521 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setuptools.setup( name='e-x', version=version, - scripts=['electrumx_server', 'electrumx_rpc', 'electrumx_compact_history'], + scripts=['electrumx_server', 'electrumx_rpc'], python_requires='>=3.7', install_requires=['aiorpcX[ws]>=0.18.3,<0.19', 'attrs', 'plyvel', 'pylru', 'aiohttp>=3.3'], diff --git a/tests/server/test_compaction.py b/tests/server/test_compaction.py deleted file mode 100644 index ad6c96a43..000000000 --- a/tests/server/test_compaction.py +++ /dev/null @@ -1,133 +0,0 @@ -'''Test of compaction code in server/history.py''' -import array -import random -from os import environ, urandom - -import pytest - -from electrumx.lib.hash import HASHX_LEN -from electrumx.lib.util import pack_be_uint16, pack_le_uint64 -from electrumx.server.env import Env -from electrumx.server.db import DB - - -def create_histories(history, hashX_count=100): - '''Creates a bunch of random transaction histories, and write them - to disk in a series of small flushes.''' - hashXs = [urandom(HASHX_LEN) for n in range(hashX_count)] - mk_array = lambda : array.array('Q') - histories = {hashX : mk_array() for hashX in hashXs} - unflushed = history.unflushed - tx_num = 0 - while hashXs: - tx_numb = pack_le_uint64(tx_num)[:5] - hash_indexes = set(random.randrange(len(hashXs)) - for n in range(1 + random.randrange(4))) - for index in hash_indexes: - histories[hashXs[index]].append(tx_num) - unflushed[hashXs[index]].extend(tx_numb) - - tx_num += 1 - # Occasionally flush and drop a random hashX if non-empty - if random.random() < 0.1: - history.flush() - index = random.randrange(0, len(hashXs)) - if histories[hashXs[index]]: - del hashXs[index] - - return histories - - -def check_hashX_compaction(history): - history.max_hist_row_entries = 40 - row_size = history.max_hist_row_entries * 5 - full_hist = b''.join(pack_le_uint64(tx_num)[:5] for tx_num in range(100)) - hashX = urandom(HASHX_LEN) - pairs = ((1, 20), (26, 50), (56, 30)) - - cum = 0 - hist_list = [] - hist_map = {} - for flush_count, count in pairs: - key = hashX + pack_be_uint16(flush_count) - hist = full_hist[cum * 5: (cum+count) * 5] - hist_map[key] = hist - hist_list.append(hist) - cum += count - - write_items = [] - keys_to_delete = set() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - # Check results for sanity - assert write_size == len(full_hist) - assert len(write_items) == 3 - assert len(keys_to_delete) == 3 - assert len(hist_map) == len(pairs) - for n, item in enumerate(write_items): - assert item == (hashX + pack_be_uint16(n), - full_hist[n * row_size: (n + 1) * row_size]) - for flush_count, count in pairs: - assert hashX + pack_be_uint16(flush_count) in keys_to_delete - - # Check re-compaction is null - hist_map = {key: value for key, value in write_items} - hist_list = [value for key, value in write_items] - write_items.clear() - keys_to_delete.clear() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - assert write_size == 0 - assert len(write_items) == 0 - assert len(keys_to_delete) == 0 - assert len(hist_map) == len(pairs) - - # Check re-compaction adding a single tx writes the one row - hist_list[-1] += array.array('I', [100]).tobytes() - write_size = history._compact_hashX(hashX, hist_map, hist_list, - write_items, keys_to_delete) - assert write_size == len(hist_list[-1]) - assert write_items == [(hashX + pack_be_uint16(2), hist_list[-1])] - assert len(keys_to_delete) == 1 - assert write_items[0][0] in keys_to_delete - assert len(hist_map) == len(pairs) - - -def check_written(history, histories): - for hashX, hist in histories.items(): - db_hist = array.array('I', history.get_txnums(hashX, limit=None)) - assert hist == db_hist - - -def compact_history(history): - '''Synchronously compact the DB history.''' - history.comp_cursor = 0 - - history.comp_flush_count = max(history.comp_flush_count, 1) - limit = 5 * 1000 - - write_size = 0 - while history.comp_cursor != -1: - write_size += history._compact_history(limit) - assert write_size != 0 - - -@pytest.mark.asyncio -async def test_compaction(tmpdir): - db_dir = str(tmpdir) - print(f'Temp dir: {db_dir}') - environ.clear() - environ['DB_DIRECTORY'] = db_dir - environ['DAEMON_URL'] = '' - environ['COIN'] = 'BitcoinSV' - db = DB(Env()) - await db.open_for_serving() - history = db.history - - # Test abstract compaction - check_hashX_compaction(history) - # Now test in with random data - histories = create_histories(history) - check_written(history, histories) - compact_history(history) - check_written(history, histories)