From 8ef1fe27868f8bdbb1b04205a9b05f59f4df901d Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 19 Sep 2025 18:40:19 +0300 Subject: [PATCH 1/6] Fork detection script Rename network type to chain type in detector.py --- build-tools/fork-detection/README.md | 33 ++ build-tools/fork-detection/detector.py | 446 +++++++++++++++++++++++++ build-tools/fork-detection/utils.py | 397 ++++++++++++++++++++++ 3 files changed, 876 insertions(+) create mode 100644 build-tools/fork-detection/README.md create mode 100644 build-tools/fork-detection/detector.py create mode 100644 build-tools/fork-detection/utils.py diff --git a/build-tools/fork-detection/README.md b/build-tools/fork-detection/README.md new file mode 100644 index 000000000..b7461daa9 --- /dev/null +++ b/build-tools/fork-detection/README.md @@ -0,0 +1,33 @@ +## Fork detection script, for the extra peace of mind + +Here we have `detector.py`, which is a relatively crude way of detecting a permanent fork (split) +in the network if it happens. + +The script basically runs the full sync in a loop, checking the node's log output for certain errors +and comparing its mainchain block ids with those obtained from the API server.\ +If anything suspicious is detected during the full sync, the script will save the node's data +directory and log file.\ +In any case, the script will temporarily ban some of the peers that participated in the sync +(so that the next iteration has a chance to have different ones and to reduce the strain on +the network) and start the full sync all over again, reusing the peerdb from the previous iteration. + +The node is always run with checkpoints disabled, so that it has the chance to find older forks too. + +The structure of the script's working directory (specified via the command line): +- `current_attempt` - this corresponds to the current sync attempt (iteration). +- `saved_attempts` - this contains subdirectories corresponding to attempts that + are considered suspicious; each subdirectory's name is the datetime of the moment + when the attempt was finished. +- `saved_peer_dbs` - this is where peer dbs from previous attempts are stored; the script + only needs the one from the latest attempt, but, just in case, the penultimate one is + also stored. +- `log.txt` - this is the log of the script itself. + +Each attempt's directory has the following structure: +- `flags` - this directory contains flag files (which are usually zero-length) indicating + that certain problems were found during the sync. It is what determines whether the attempt's + directory will be saved in the end (i.e. if the directory is non-empty, the attempt will be saved). +- `node_data` - this is the node's data directory of this attempt. +- `node_log.txt` - the node's log. + +Note: currently the script requires Python 3.13 to run, though we may lift this requirement later. diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py new file mode 100644 index 000000000..536b3efdc --- /dev/null +++ b/build-tools/fork-detection/detector.py @@ -0,0 +1,446 @@ +import argparse +import os +import queue +import re +import shlex +import shutil +import subprocess +import sys +import time +from datetime import datetime +from pathlib import Path +from queue import Queue +from threading import Thread +from urllib.parse import urlparse + +from utils import ( + colored, dir_missing_or_empty, exhaustive_stream_line_reader, hide_cursor, show_cursor, + init_logger, pretty_print_banned_peers, + CONSOLE_PRINTER, LOGGER as log, NODE_OUTPUT_PREFIX_COLOR, NODE_RPC_USER, NODE_RPC_PWD, + Error, APIServerClient, NodeRPCClient, +) + + +DEFAULT_NODE_CMD = "cargo run --release --bin node-daemon --" +DEFAULT_NODE_RPC_BIND_ADDR = "127.0.0.1:12345" +DEFAULT_CHAIN_TYPE = "mainnet" +CHAIN_TYPE_CHOICES = ["mainnet", "testnet"] +CONTINUE_OPTION_NAME = "continue" + +CUR_ATTEMPT_SUBDIR = "current_attempt" +SAVED_ATTEMPTS_SUBDIR = "saved_attempts" + +FLAGS_SUBDIR = "flags" +NODE_DATA_SUBDIR = "node_data" +SAVED_PEER_DBS_SUBDIR = "saved_peer_dbs" + +LATEST_PEER_DB_SUBDIR = "latest" +PREV_PEER_DB_SUBDIR = "previous" + +# Note: this is defined by the node and cannot be changed. +PEER_DB_SUBDIR_IN_NODE_DATA = "peerdb-lmdb" + +# The mapping from node's output to the name of the flag that must be created as a result. +NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING = [ + (re.compile(r"\bCRITICAL\b"), "critical_error"), + (re.compile(r"Checkpoint mismatch"), "checkpoint_mismatch"), + (re.compile(r"\bERROR\b.+\bprocess_block\b"), "process_block_failure"), + (re.compile(r"\bERROR\b.+\bpreliminary_block_check\b"), "preliminary_block_check_failure"), + (re.compile(r"\bERROR\b.+\bpreliminary_headers_check\b"), "preliminary_headers_check_failure"), + (re.compile(r"Stale block received"), "stale_block_received"), +] + +# The "flag" that will be created if the node's mainchain block differs from the API server's. +# Note: technically this is not a flag, because it will contain additional data. +POTENTIAL_REORGS_FLAG_NAME = "potential_reorgs" + +NODE_OUTPUT_LINE_NEW_TIP_REGEX = re.compile( + r"NEW TIP in chainstate (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" +) +# The regex by which we determine that node's output line should be printed to the console +# (we want to avoid debug and info lines since they're both too noisy during sync and put extra +# strain on the console app). +# Note that this is not 100% reliable, because a log record can technically span multiple lines, +# only the first of which will contain the severity. But at this moment we don't seem to emit +# multi-line log records during syncing (except for the initial "Starting with the following config"). +# But even if we did, this approach is "good enough" anyway, since you can always look into the log +# file for the missing details. +NODE_OUTPUT_LINE_TO_PRINT_REGEX = re.compile(r"^\S+\s+(WARN|ERROR)\b") +# The regex by which we determine that the node is actually being started; this is mainly needed +# because by default we invoke cargo, which may have to do a lengthy compilation first. +# Also note that we use a log line indicating that p2p has already been started (instead of, say, +# an earlier log line such as "Starting mintlayer-core"). This helps catching the situation +# when the node starts and immediately exists due to the p2p port being unavailable. +NODE_STARTUP_OUTPUT_LINE_REGEX = re.compile(r"p2p.*Starting SyncManager") + +BAN_DURATION_SECS = 6 * 3600 # 6h + +# We use Queue.shutdown which is only available since Python v3.13 +MIN_PYTHON_VERSION_MAJOR = 3 +MIN_PYTHON_VERSION_MINOR = 13 + + +class Handler(): + def __init__(self, args): + CONSOLE_PRINTER.set_status("Initializing") + + self.entire_output_dir = Path(args.output_dir).resolve() + os.makedirs(self.entire_output_dir, exist_ok=True) + + init_logger(self.entire_output_dir.joinpath("log.txt")) + log.info("Initializing") + + self.node_cmd = shlex.split(args.node_cmd) + + self.node_rpc_client = NodeRPCClient(args.node_rpc_bind_address) + self.api_server_client = APIServerClient(args.api_server_url) + + self.saved_attempts_dir = self.entire_output_dir.joinpath(SAVED_ATTEMPTS_SUBDIR) + + self.saved_peer_dbs_dir = self.entire_output_dir.joinpath(SAVED_PEER_DBS_SUBDIR) + self.latest_peer_db_dir = self.saved_peer_dbs_dir.joinpath(LATEST_PEER_DB_SUBDIR) + self.prev_peer_db_dir = self.saved_peer_dbs_dir.joinpath(PREV_PEER_DB_SUBDIR) + + self.cur_attempt_dir = self.entire_output_dir.joinpath(CUR_ATTEMPT_SUBDIR) + if os.path.exists(self.cur_attempt_dir) and not args.can_continue: + raise Error( + (f"The directory {self.cur_attempt_dir} already exists. " + f"Either delete it or pass '--{CONTINUE_OPTION_NAME}' to continue.") + ) + + self.cur_attempt_flags_dir = self.cur_attempt_dir.joinpath(FLAGS_SUBDIR) + self.cur_attempt_node_data_dir = self.cur_attempt_dir.joinpath(NODE_DATA_SUBDIR) + self.cur_attempt_logs_file = self.cur_attempt_dir.joinpath("node_log.txt") + + self.unban_all = args.unban_all + + self.node_cmd += [ + "--datadir", self.cur_attempt_node_data_dir, + args.chain_type, + "--allow-checkpoints-mismatch", + "--rpc-bind-address", args.node_rpc_bind_address, + "--rpc-username", NODE_RPC_USER, + "--rpc-password", NODE_RPC_PWD, + ] + log.info(f"Node run command: {self.node_cmd}") + + def run(self): + try: + while True: + self.do_full_sync() + except KeyboardInterrupt: + log.info("Exiting due to Ctrl-C") + + def do_full_sync(self): + actual_tip_height = self.api_server_client.get_tip().height + log.info(f"Starting a new sync iteration, current chain height is {actual_tip_height}") + + os.makedirs(self.cur_attempt_flags_dir, exist_ok=True) + os.makedirs(self.cur_attempt_node_data_dir, exist_ok=True) + + self.restore_peer_db() + + node_proc_env = os.environ.copy() + # Note: "chainstate_verbose_block_ids=debug" allows to catch the "Stale block received" line + # and also forces certain block-processing functions in chainstate to print full block ids. + # We avoid using the "normal" debug log, because it's too noisy, e.g. even + # "info,chainstate=debug" produces hundreds of megabytes of logs during the full sync. + node_proc_env["RUST_LOG"] = "info,chainstate_verbose_block_ids=debug" + + node_proc = subprocess.Popen( + self.node_cmd, + encoding="utf-8", + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + env=node_proc_env) + + last_block_arrival_time = None + last_handled_height = None + + node_proc_stdout_queue = Queue() + Thread( + target=exhaustive_stream_line_reader, + args=(node_proc.stdout, node_proc_stdout_queue, self.cur_attempt_logs_file) + ).start() + + def handle_node_output_line(line: str): + nonlocal actual_tip_height, last_block_arrival_time, last_handled_height + + for line_re, flag in NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING: + if line_re.search(line) is not None: + self.touch_flag(flag) + + new_tip_match = NODE_OUTPUT_LINE_NEW_TIP_REGEX.search(line) + if new_tip_match is not None: + cur_seconds_since_epoch = time.time() + seconds_without_blocks = ( + cur_seconds_since_epoch - last_block_arrival_time + if last_block_arrival_time is not None else 0 + ) + last_block_arrival_time = cur_seconds_since_epoch + + block_id = new_tip_match.group("block_id") + height = int(new_tip_match.group("height")) + last_handled_height = height + timestamp = int(new_tip_match.group("timestamp")) + + if height % 10 == 0: + CONSOLE_PRINTER.set_status(f"Synced to height {height}") + + # Update actual_tip_height if we've reached it. + # Note: >= is used to make sure that the check "height == actual_tip_height" + # below uses the updated height. + if height >= actual_tip_height: + actual_tip_height = self.api_server_client.get_tip().height + + if height > actual_tip_height: + log_func = log.info if height == actual_tip_height + 1 else log.warning; + log_func( + f"Tip reached, height = {height} (the API server is {height-actual_tip_height} block(s) behind)" + ) + return False + + actual_block_id = self.api_server_client.get_block_id(height) + if block_id.lower() != actual_block_id.lower(): + self.touch_flag( + POTENTIAL_REORGS_FLAG_NAME, + f"height={height}, node block id={block_id}, api srv block id={actual_block_id}" + ) + + # Check if the block is fresh enough + if timestamp >= cur_seconds_since_epoch - 120: + log.info(f"Fresh block on a stale chain reached (height = {height})") + return False + + # Check if we haven't seen a block in awhile + if seconds_without_blocks >= 120: + log.info(f"No incoming blocks while on stale chain (height = {height})") + return False + + if height == actual_tip_height: + log.info(f"Tip reached, height = {height}") + return False + + return True + + # Will be called once the first non-empty line has been received from the node's output. + def on_node_started(): + self.node_rpc_client.ensure_rpc_started() + + banned_peers = self.node_rpc_client.get_banned_peers() + banned_peers_str = pretty_print_banned_peers(banned_peers) + + log.debug(f"Currently banned peers: {banned_peers_str}") + + if self.unban_all: + self.unban_all = False + if len(banned_peers) > 0: + log.info("Unbanning currently banned peers due to the command line option") + + for peer in banned_peers: + self.node_rpc_client.unban_peer(peer.ip) + + banned_peers_after_unban = self.node_rpc_client.get_banned_peers() + if len(banned_peers_after_unban) > 0: + banned_peers_after_unban_str = pretty_print_banned_peers(banned_peers_after_unban) + log.warning(f"Some peers are still banned after unban: {banned_peers_after_unban_str}") + + def on_attempt_completion(): + # When a syncing attempt has been finished, but before the node has been stopped, + # we ban some of the currently connected peers for a long-enough duration: + # a) so that the next attempt can use different peers; + # b) to reduce the strain on the network. + + peer_ips_to_ban = self.get_node_peer_ip_addrs_to_ban() + + # Before banning, force the disconnection of all peers by disabling networking, + # to avoid sending them the "scary" disconnection reason "Your address is banned" + # (though nodes may still see this reason if they try connecting to our node + # during the next attempt). + self.node_rpc_client.enable_networking(False) + # Give the node some time to actually disconnect all peers. + time.sleep(2) + + for ip_addr in peer_ips_to_ban: + log.debug(f"Banning {ip_addr}") + self.node_rpc_client.ban_peer(ip_addr, BAN_DURATION_SECS) + + try: + node_started = False + set_status_and_debug_log("Waiting for the node to start") + + while True: + try: + line = node_proc_stdout_queue.get() + + if NODE_OUTPUT_LINE_TO_PRINT_REGEX.search(line) is not None: + stdout_prefix = colored("node> ", NODE_OUTPUT_PREFIX_COLOR) + CONSOLE_PRINTER.print_to_stderr(f"{stdout_prefix} {line}", end="") + + if not node_started and NODE_STARTUP_OUTPUT_LINE_REGEX.search(line) is not None: + node_started = True + set_status_and_debug_log("Node started") + on_node_started() + + if not handle_node_output_line(line): + break + except queue.ShutDown: + # This means that the node has exited prematurely. But we check for this + # via the "poll" call below, so here it can be ignored. + pass + + exit_code = node_proc.poll() + if exit_code is not None: + raise Error(f"The node exited prematurely with exit code {exit_code}") + + # Shutdown the queue to prevent the reading thread from putting moree lines to it. + node_proc_stdout_queue.shutdown() + + on_attempt_completion() + + finally: + if last_handled_height is not None: + log.debug(f"Last handled height: {last_handled_height}") + + set_status_and_debug_log("Terminating the node") + + # Note: for some reason the node doesn't want to terminate sometimes, + # in particular this may happen when hitting Ctrl-C. Though the Ctrl-C case + # is not particularly important (since you can always hit it again), we want + # to protect against this situation during the normal script execution. + # So, we try terminating the node a few times and if it doesn't react, we kill it. + for i in range(3): + node_proc.terminate() + try: + node_proc.wait(timeout=5) + break + except subprocess.TimeoutExpired: + log.warning(f"Node didn't terminate, attempt {i}") + pass + else: + log.warning("Killing the node") + node_proc.kill() + node_proc.wait() + + self.save_peer_db() + + # If the script has created some flags, save the directory + if len(os.listdir(self.cur_attempt_flags_dir)) > 0: + os.makedirs(self.saved_attempts_dir, exist_ok=True) + + backup_dir_name = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") + backup_dir = self.saved_attempts_dir.joinpath(backup_dir_name) + + log.warning( + f"Sync iteration ended with some issues, backing up the output dir to {backup_dir}" + ) + os.rename(self.cur_attempt_dir, backup_dir) + else: + log.info("Sync iteration ended without issues, removing output dir") + shutil.rmtree(self.cur_attempt_dir) + + # Return the list of ip addresses we want to ban and the end of a sync attempt, + # to prevent syncing with the same peers again and again. + def get_node_peer_ip_addrs_to_ban(self): + peers = self.node_rpc_client.get_connected_peers() + + # Note: non-null `last_tip_block_time` means that the peer has sent us a block that + # became our tip. Other peers that had the same block but sent it a bit later are not + # counted, which means that it's technically possible to have a gadzillion peers where + # only one of them has a non-null `last_tip_block_time`. In practice though most of the + # currently connected peers should have a non-null `last_tip_block_time` after a full sync. + peers_with_last_tip_block_time = [ + peer for peer in peers if peer["last_tip_block_time"] is not None + ] + + log.debug(f"Obtaining peer ips to ban; total connected peers: {len(peers)}, " + f"peers with 'last_tip_block_time': {len(peers_with_last_tip_block_time)}") + + # Note: the return addresses have the form '{ip_addr}:{port}', which is interpreted + # as path by urlparse; prepending "//" convinces it that it's a full address. + return [urlparse("//" + peer["address"]).hostname for peer in peers_with_last_tip_block_time] + + # After the current attempt has been completed, save the current peer db. + def save_peer_db(self): + os.makedirs(self.saved_peer_dbs_dir, exist_ok=True) + + if os.path.exists(self.prev_peer_db_dir): + shutil.rmtree(self.prev_peer_db_dir) + + if os.path.exists(self.latest_peer_db_dir): + os.rename(self.latest_peer_db_dir, self.prev_peer_db_dir) + + cur_peer_db_dir = self.cur_attempt_node_data_dir.joinpath(PEER_DB_SUBDIR_IN_NODE_DATA) + shutil.copytree(cur_peer_db_dir, self.latest_peer_db_dir) + + # Before starting the next attempt, if the node dir is missing a peer db, copy the saved + # peer db into it. + def restore_peer_db(self): + cur_peer_db_dir = self.cur_attempt_node_data_dir.joinpath(PEER_DB_SUBDIR_IN_NODE_DATA) + + if dir_missing_or_empty(cur_peer_db_dir) and os.path.exists(self.latest_peer_db_dir): + shutil.copytree(self.latest_peer_db_dir, cur_peer_db_dir, dirs_exist_ok=True) + + # Touch a flag optionally appending some contents to it + def touch_flag(self, flag: str, contents=None): + flag_file = self.cur_attempt_flags_dir.joinpath(flag) + with open(flag_file, 'a') as file: + if contents is not None: + file.write(contents) + file.write("\n") + + log.warning(f"Flag created: {flag}") + + +def set_status_and_debug_log(status): + log.debug(status) + CONSOLE_PRINTER.set_status(status) + + +def main(): + if sys.version_info < (MIN_PYTHON_VERSION_MAJOR, MIN_PYTHON_VERSION_MINOR): + print(f"This script requires Python {MIN_PYTHON_VERSION_MAJOR}.{MIN_PYTHON_VERSION_MINOR} or higher") + sys.exit(1) + + hide_cursor() + + try: + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument("--node-cmd", + help="Command to run the node", + default=DEFAULT_NODE_CMD) + parser.add_argument("--node-rpc-bind-address", + help="Node PRC bind address", + default=DEFAULT_NODE_RPC_BIND_ADDR) + parser.add_argument( + '--api-server-url', + help='API server URL', required=True) + parser.add_argument("--chain-type", + help="Chain type", + choices=CHAIN_TYPE_CHOICES, + default=DEFAULT_CHAIN_TYPE) + parser.add_argument("--output-dir", + help="Output directory", + required=True) + parser.add_argument(f"--{CONTINUE_OPTION_NAME}", + help=(f"Proceed even if the '{CUR_ATTEMPT_SUBDIR}' subdirectory " + "already exists in the output dir"), + action="store_true", + dest="can_continue") + parser.add_argument(f"--unban-all", + help=("Unban all node's peers on start"), + action="store_true") + args = parser.parse_args() + + Handler(args).run() + except Error as e: + print(f"Error: {e}") + sys.exit(1) + finally: + CONSOLE_PRINTER.set_status("") + show_cursor() + + +if __name__ == "__main__": + main() diff --git a/build-tools/fork-detection/utils.py b/build-tools/fork-detection/utils.py new file mode 100644 index 000000000..86e9121f9 --- /dev/null +++ b/build-tools/fork-detection/utils.py @@ -0,0 +1,397 @@ +import json +import logging +import os +import queue +import requests +import sys +import time +from collections import namedtuple +from pathlib import Path +from queue import Queue +from threading import Lock +from typing import TextIO +from urllib.parse import urlparse + +import termcolor # type: ignore + + +class Error(Exception): + pass + + +# 'color' can be either the name of the color, e.g. "red", or a tuple where the first element +# is the name of the color and the second one the attributes, e.g. ("red", ["dark", "bold"]), +# or simply None. +def colored(text, color): + if color is None: + return text + elif isinstance(color, tuple): + return termcolor.colored(text, color[0], attrs=color[1]) + else: + return termcolor.colored(text, color) + + +# Color constants suitable for passing to the "colored" function. +# Note we are using the color names from termcolor v1.x, which doesn't have the "light_" colors. +# The "bold" attribute makes the color brighter, "dark" farker and "dark bold" is between +# "dark" and "bold". +# Note: the colors were chosen when using a Linux terminal with a dark theme, though they +# look ok with a light theme too. +STATUS_COLOR = ("cyan", ["bold"]) +LOG_DEBUG_COLOR = ("white", ["bold", "dark"]) +# "None" means it will be the normal foreground color, i.e. white for a dark theme and +# black for a light one. +LOG_INFO_COLOR = None +LOG_WARN_COLOR = "yellow" +LOG_ERROR_COLOR = "red" +LOG_CRITICAL_COLOR = ("red", ["bold"]) +NODE_OUTPUT_PREFIX_COLOR = "green" + +NODE_RPC_USER = "user" +NODE_RPC_PWD = "pwd" + +API_SERVER_TIMEOUT_SECS = 180 +NODE_RPC_TIMEOUT_SECS = 180 + +LOGGER = logging.getLogger("detector_logger") + + +# This class maintains a "status line" at the bottom of the terminal output, erasing and +# redrawing it when the normal output is performed. +# The status line is written to stdout while the normal output is always printed to stderr. +# Note that all printing in the app has to be done through the same object of this class +# (CONSOLE_PRINTER defined below), otherwise the output will be broken. +class ConsolePrinterWithStatus: + def __init__(self): + self.status = "" + self.mutex = Lock() + + if sys.stdout.isatty(): + # Prepare the line where the status will be shown. + sys.stdout.write("\n") + + def print_to_stderr(self, line, end = "\n"): + with self.mutex: + # If both are the same terminal, need to erase the status, print the line + # and then print the status again. + if stdout_and_stderr_are_same_terminal(): + # Note: technically we could write the line and then the required number + # of extra spaces, but that number is non-trivial to determine if the line + # or the status contain control chars. + self._erase_status() + sys.stdout.write(line) + sys.stdout.write(end) + sys.stdout.write(self.status) + sys.stdout.flush() + else: + print(line, file=sys.stderr) + + def set_status(self, status): + with self.mutex: + if sys.stdout.isatty(): + status = colored(status, STATUS_COLOR) + self._erase_status() + sys.stdout.write(status) + else: + sys.stdout.write(status) + sys.stdout.write("\n") + + sys.stdout.flush() + self.status = status + + def _erase_status(self): + sys.stdout.write("\r") + sys.stdout.write(" " * len(self.status)) + sys.stdout.write("\r") + + +CONSOLE_PRINTER = ConsolePrinterWithStatus() + + +# Log handler that prints the records via CONSOLE_PRINTER. +class LogConsoleHandler(logging.Handler): + def emit(self, record): + try: + msg = self.format(record) + CONSOLE_PRINTER.print_to_stderr(msg) + except Exception: + self.handleError(record) + + +# Log formatter that produces colored output. +class LogColoredFormatter(logging.Formatter): + def __init__(self, fmt: str): + super().__init__() + + self.formatters = { + logging.DEBUG: logging.Formatter(colored(fmt, LOG_DEBUG_COLOR)), + logging.INFO: logging.Formatter(colored(fmt, LOG_INFO_COLOR)), + logging.WARNING: logging.Formatter(colored(fmt, LOG_WARN_COLOR)), + logging.ERROR: logging.Formatter(colored(fmt, LOG_ERROR_COLOR)), + logging.CRITICAL: logging.Formatter(colored(fmt, LOG_CRITICAL_COLOR)), + } + + def format(self, record): + formatter = self.formatters.get(record.levelno) + return formatter.format(record) + + +def stdout_and_stderr_are_same_terminal(): + if not (sys.stdout.isatty() and sys.stderr.isatty()): + # At least one of them is not a terminal + return False + + if sys.platform.startswith("win"): + # On Windows, if both are terminals, then they should be the same terminal. + return True + + # On *nix, the more reliable way is to compare ttyname's. + stdout_name = os.ttyname(sys.stdout.fileno()) + stderr_name = os.ttyname(sys.stderr.fileno()) + return stdout_name == stderr_name + + +def init_logger(log_file: Path): + global LOGGER + + fmt = "%(asctime)s - %(levelname)s - %(message)s" + + console_handler = LogConsoleHandler() + console_handler.setFormatter(LogColoredFormatter(fmt)) + + file_handler = logging.FileHandler(log_file) + file_handler.setFormatter(logging.Formatter(fmt)) + + LOGGER.addHandler(console_handler) + LOGGER.addHandler(file_handler) + + LOGGER.setLevel(logging.DEBUG) + + # Without this the records will be propagated to the root logger and printed twice. + LOGGER.propagate = False + + +def dir_missing_or_empty(path: Path): + return not os.path.exists(path) or len(os.listdir(path)) == 0 + + +def prettify_duration(duration_secs: int) -> str: + if duration_secs == 0: + return "0s" + + result = "" + def append(val, symbol): + nonlocal result + if val != 0: + sep = " " if len(result) > 0 else "" + result += f"{sep}{val}{symbol}" + + duration_mins = duration_secs // 60 + duration_hrs = duration_mins // 60 + duration_days = duration_hrs // 24 + + append(duration_days, "d") + append(duration_hrs % 24, "h") + append(duration_mins % 60, "m") + append(duration_secs % 60, "s") + + return result + + +# The function reads lines from the stream and puts them to the queue. +# Even if the queue has been shut down on the receiving end, the function will continue +# to read from the stream until it is closed. +# +# This is intended to be used with subprocess.Popen when its stdout/stderr are in the PIPE mode, +# (because not reading the pipes may result in the child process dead locking when the pipe +# buffer becomes full). +# +# The function will also log (append) the read lines to the specified file, if provided. +def exhaustive_stream_line_reader(stream: TextIO, queue_obj: Queue, log_file: Path | None = None): + def reader(log_stream): + queue_already_shut_down = False + + # Loop until readline returns '', which means that the other end of the stream has been closed. + for line in iter(stream.readline, ''): + if log_stream is not None: + log_stream.write(line) + log_stream.flush() + + if not queue_already_shut_down: + try: + queue_obj.put(line) + except queue.ShutDown: + queue_already_shut_down = True + + queue_obj.shutdown() + + if log_file is not None: + with open(log_file, 'a') as log_stream: + reader(log_stream) + else: + reader(None) + + +BlockInfo = namedtuple("BlockInfo", ["id", "height"]) +BannedPeer = namedtuple("BannedPeer", ["ip", "banned_until_as_secs_since_epoch"]) + + +class APIServerClient: + def __init__(self, server_url): + if len(urlparse(server_url).scheme) == 0: + raise Error("The provided API server URL must contain a scheme") + + self.server_url = server_url + self.session = requests.Session() + + def _get(self, path: str, request_params): + url = f"{self.server_url}/api/v2/{path}" + try: + response = self.session.get(url, params=request_params, timeout=API_SERVER_TIMEOUT_SECS) + except requests.exceptions.Timeout: + raise Error(f"API server request to '{path}' timed out") + except requests.exceptions.ConnectionError: + raise Error("Cannot connect to the API server") + + if response.status_code == 404: + return None + response.raise_for_status() + return response.json() + + def get_tip(self): + tip_info = self._get("chain/tip", {}) + return BlockInfo(id=tip_info["block_id"], height=tip_info["block_height"]) + + def get_block_id(self, height: int): + return self._get(f"chain/{height}", {}) + + +class NodeRPCClient: + def __init__(self, server_url): + self.server_url = server_url + self.session = requests.Session() + + def _post(self, method: str, method_params, timeout=NODE_RPC_TIMEOUT_SECS, handle_exceptions=True): + headers = {"Content-Type": "application/json"} + payload = { + "jsonrpc": "2.0", + "method": method, + "params": method_params, + "id": 1, + } + url = f"http://{NODE_RPC_USER}:{NODE_RPC_PWD}@{self.server_url}" + try: + response = self.session.post(url, headers=headers, data=json.dumps(payload), timeout=timeout) + except requests.exceptions.Timeout: + if handle_exceptions: + raise Error(f"Node RPC request '{method}' timed out") + else: + raise + except requests.exceptions.ConnectionError: + if handle_exceptions: + raise Error("Cannot connect to the node via RPC") + else: + raise + + response.raise_for_status() + json_data = response.json() + + if "error" in json_data: + err_code = json_data["error"]["code"] + err_msg = json_data["error"]["message"] + raise Error( + f"Node RPC method '{method}' failed with code {err_code} and message '{err_msg}'" + ) + + return json_data["result"] + + def enable_networking(self, enable: bool): + self._post("p2p_enable_networking", [enable]) + + def get_connected_peers(self): + return self._post("p2p_get_connected_peers", []) + + def get_banned_peers(self) -> list[BannedPeer]: + raw_peers = self._post("p2p_list_banned", []) + pretty_peers = [] + for peer in raw_peers: + ip = peer[0] + banned_until_secs = peer[1]["time"]["secs"] + banned_until_nanos = peer[1]["time"]["nanos"] + # Round the seconds up. + banned_until_secs += 1 if banned_until_nanos != 0 else 0 + pretty_peers += [BannedPeer(ip=ip, banned_until_as_secs_since_epoch=banned_until_secs)] + + return pretty_peers + + def ban_peer(self, peer_addr: str, duration_secs: int): + self._post("p2p_ban", [peer_addr, {"secs":duration_secs, "nanos":0}]) + + def unban_peer(self, peer_addr: str): + self._post("p2p_unban", [peer_addr]) + + # Assuming that the node has already been started, wait until it is reachable via rpc. + def ensure_rpc_started(self): + max_attempts = 10 + for i in range(max_attempts): + try: + # Note: since we're repeating this multiple times, the timeout has to be small. + self._post("p2p_get_peer_count", [], timeout=5, handle_exceptions=False) + return + except requests.exceptions.ConnectionError: + time.sleep(1) + except requests.exceptions.Timeout: + # Try again on timeout too, just don't waste extra time on sleeping. + pass + else: + raise Error("The node is expected to have been started already, but RPC requests don't work") + + +def pretty_print_banned_peers(banned_peers: list[BannedPeer], multiline = True) -> str: + cur_secs_since_epoch = int(time.time()) + + # Note: the ban time can be in the past if we're restarting the script after a delay. + # Such peers are not really banned anymore. + banned_peers = [ + peer for peer in banned_peers if peer.banned_until_as_secs_since_epoch > cur_secs_since_epoch + ] + + if len(banned_peers) == 0: + return "[]" + + if multiline: + result = "[\n" + else: + result = "[" + + for idx, peer in enumerate(banned_peers): + duration = prettify_duration(peer.banned_until_as_secs_since_epoch - cur_secs_since_epoch) + line = f"(ip: {peer.ip}, remaining duration: {duration})" + + if multiline: + opt_sep = "," if idx != len(banned_peers) - 1 else "" + result += f" {line}{opt_sep}\n" + else: + opt_sep = ", " if idx != len(banned_peers) - 1 else "" + result += f"{line}{opt_sep}" + + result += "]" + + return result + + +def hide_cursor(): + esc_seq = "\033[?25l" + if sys.stdout.isatty(): + sys.stdout.write(esc_seq) + if sys.stderr.isatty(): + sys.stderr.write(esc_seq) + + +def show_cursor(): + esc_seq = "\033[?25h" + if sys.stdout.isatty(): + sys.stdout.write(esc_seq) + if sys.stderr.isatty(): + sys.stderr.write(esc_seq) + From 4f905d508396d8d69f8541fe4c4a3084bcf807db Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 26 Sep 2025 16:54:08 +0300 Subject: [PATCH 2/6] Fork detection script: add permabanned peers list and email sending; some cleanup --- build-tools/fork-detection/README.md | 22 ++++- build-tools/fork-detection/detector.py | 130 +++++++++++++++++++------ build-tools/fork-detection/utils.py | 22 +++++ 3 files changed, 142 insertions(+), 32 deletions(-) diff --git a/build-tools/fork-detection/README.md b/build-tools/fork-detection/README.md index b7461daa9..fb3be2565 100644 --- a/build-tools/fork-detection/README.md +++ b/build-tools/fork-detection/README.md @@ -30,4 +30,24 @@ Each attempt's directory has the following structure: - `node_data` - this is the node's data directory of this attempt. - `node_log.txt` - the node's log. -Note: currently the script requires Python 3.13 to run, though we may lift this requirement later. +Some notes: +* Currently the script requires Python 3.13 to run, though we may lift this requirement later. +* The script can send an email when it detects an issue using the local SMTP server + (if you're on Linux, google for an SMTP Postfix tutorial to set it up). +* Even if the script finds a problem (e.g. a checkpoint mismatch), you're still likely + to end up being on the correct chain. To download the actual fork for further investigation + you can initiate a separate full sync while using the node's option `--custom-checkpoints-csv-file` + to override the correct checkpoints with the wrong ones. +* Once the fork has been downloaded, you'll want to examine the contents of its chainstate db. + Currently we have the `chainstate-db-dumper` tool that can dump certain info about blocks + to a CSV file (the most interesting part of it being the ids of pools that continue producing + blocks on that fork). +* Once the fork has been investigated you can "permanently" ban the peers that have been sending it + to you, to prevent it from being reported again and again. To do so, you can add their ip + addresses to `permabanned_peers.txt` (one address per line) in the script's working directory + (it doesn't exist by default, so you'll have to create it first). Note that the file is checked + on every iteration, so you can update it while the script is already running and it will come + into effect when the next iteration starts. +* The script is likely to fail if a networking error occurs, e.g. if it can't query the API server. + So, run it in a loop in a shell script (with some delay after each run, to prevent it from spamming + you with warning emails). \ No newline at end of file diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py index 536b3efdc..7685686a2 100644 --- a/build-tools/fork-detection/detector.py +++ b/build-tools/fork-detection/detector.py @@ -17,7 +17,7 @@ colored, dir_missing_or_empty, exhaustive_stream_line_reader, hide_cursor, show_cursor, init_logger, pretty_print_banned_peers, CONSOLE_PRINTER, LOGGER as log, NODE_OUTPUT_PREFIX_COLOR, NODE_RPC_USER, NODE_RPC_PWD, - Error, APIServerClient, NodeRPCClient, + Error, APIServerClient, BannedPeer, EmailSender, NodeRPCClient, ) @@ -73,21 +73,26 @@ # when the node starts and immediately exists due to the p2p port being unavailable. NODE_STARTUP_OUTPUT_LINE_REGEX = re.compile(r"p2p.*Starting SyncManager") -BAN_DURATION_SECS = 6 * 3600 # 6h +DEFAULT_BAN_DURATION_HOURS = 12 # We use Queue.shutdown which is only available since Python v3.13 MIN_PYTHON_VERSION_MAJOR = 3 MIN_PYTHON_VERSION_MINOR = 13 +PERMABANNED_PEERS_FILE = "permabanned_peers.txt" +PERMABAN_DURATION_DAYS = 30 +PERMABAN_DURATION_SECS = 3600 * 24 * PERMABAN_DURATION_DAYS + class Handler(): - def __init__(self, args): + def __init__(self, args, email_sender): CONSOLE_PRINTER.set_status("Initializing") - self.entire_output_dir = Path(args.output_dir).resolve() - os.makedirs(self.entire_output_dir, exist_ok=True) + self.email_sender = email_sender + self.working_dir = Path(args.working_dir).resolve() + os.makedirs(self.working_dir, exist_ok=True) - init_logger(self.entire_output_dir.joinpath("log.txt")) + init_logger(self.working_dir.joinpath("log.txt")) log.info("Initializing") self.node_cmd = shlex.split(args.node_cmd) @@ -95,13 +100,15 @@ def __init__(self, args): self.node_rpc_client = NodeRPCClient(args.node_rpc_bind_address) self.api_server_client = APIServerClient(args.api_server_url) - self.saved_attempts_dir = self.entire_output_dir.joinpath(SAVED_ATTEMPTS_SUBDIR) + self.saved_attempts_dir = self.working_dir.joinpath(SAVED_ATTEMPTS_SUBDIR) - self.saved_peer_dbs_dir = self.entire_output_dir.joinpath(SAVED_PEER_DBS_SUBDIR) + self.saved_peer_dbs_dir = self.working_dir.joinpath(SAVED_PEER_DBS_SUBDIR) self.latest_peer_db_dir = self.saved_peer_dbs_dir.joinpath(LATEST_PEER_DB_SUBDIR) self.prev_peer_db_dir = self.saved_peer_dbs_dir.joinpath(PREV_PEER_DB_SUBDIR) - self.cur_attempt_dir = self.entire_output_dir.joinpath(CUR_ATTEMPT_SUBDIR) + self.permabanned_peers_file = self.working_dir.joinpath(PERMABANNED_PEERS_FILE) + + self.cur_attempt_dir = self.working_dir.joinpath(CUR_ATTEMPT_SUBDIR) if os.path.exists(self.cur_attempt_dir) and not args.can_continue: raise Error( (f"The directory {self.cur_attempt_dir} already exists. " @@ -113,6 +120,7 @@ def __init__(self, args): self.cur_attempt_logs_file = self.cur_attempt_dir.joinpath("node_log.txt") self.unban_all = args.unban_all + self.ban_duration_secs = args.ban_duration_hours * 3600 self.node_cmd += [ "--datadir", self.cur_attempt_node_data_dir, @@ -163,6 +171,8 @@ def do_full_sync(self): args=(node_proc.stdout, node_proc_stdout_queue, self.cur_attempt_logs_file) ).start() + # Handle a node's output line, return True if the current attempt should continue + # and False otherwise. def handle_node_output_line(line: str): nonlocal actual_tip_height, last_block_arrival_time, last_handled_height @@ -227,6 +237,15 @@ def handle_node_output_line(line: str): def on_node_started(): self.node_rpc_client.ensure_rpc_started() + perma_banned_peers = self.load_perma_banned_peers() + log.debug(f"Banning the following addresses for {PERMABAN_DURATION_DAYS} days: {perma_banned_peers}") + + for addr in perma_banned_peers: + self.node_rpc_client.ban_peer(addr, PERMABAN_DURATION_SECS) + + def filter_out_perma_banned_peers(peer_list: list[BannedPeer]) -> list[BannedPeer]: + return [peer for peer in peer_list if peer.ip not in perma_banned_peers] + banned_peers = self.node_rpc_client.get_banned_peers() banned_peers_str = pretty_print_banned_peers(banned_peers) @@ -234,16 +253,19 @@ def on_node_started(): if self.unban_all: self.unban_all = False - if len(banned_peers) > 0: - log.info("Unbanning currently banned peers due to the command line option") + peers_to_unban = filter_out_perma_banned_peers(banned_peers) + if len(peers_to_unban) > 0: + log.info("Unbanning currently (non-permanently) banned peers due to the command line option") - for peer in banned_peers: + for peer in peers_to_unban: self.node_rpc_client.unban_peer(peer.ip) banned_peers_after_unban = self.node_rpc_client.get_banned_peers() - if len(banned_peers_after_unban) > 0: - banned_peers_after_unban_str = pretty_print_banned_peers(banned_peers_after_unban) - log.warning(f"Some peers are still banned after unban: {banned_peers_after_unban_str}") + unexpected_banned_peers = filter_out_perma_banned_peers(banned_peers_after_unban) + + if len(unexpected_banned_peers) > 0: + unexpected_banned_peers_str = pretty_print_banned_peers(unexpected_banned_peers) + log.warning(f"Some peers are still banned after unban: {unexpected_banned_peers_str}") def on_attempt_completion(): # When a syncing attempt has been finished, but before the node has been stopped, @@ -263,7 +285,7 @@ def on_attempt_completion(): for ip_addr in peer_ips_to_ban: log.debug(f"Banning {ip_addr}") - self.node_rpc_client.ban_peer(ip_addr, BAN_DURATION_SECS) + self.node_rpc_client.ban_peer(ip_addr, self.ban_duration_secs) try: node_started = False @@ -331,12 +353,14 @@ def on_attempt_completion(): backup_dir_name = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") backup_dir = self.saved_attempts_dir.joinpath(backup_dir_name) - log.warning( - f"Sync iteration ended with some issues, backing up the output dir to {backup_dir}" - ) + warning_msg = ("Sync iteration ended with some issues, " + f"backing up the the attempt's dir to {backup_dir}") + log.warning(warning_msg) + self.email_sender.send("Warning", warning_msg) + os.rename(self.cur_attempt_dir, backup_dir) else: - log.info("Sync iteration ended without issues, removing output dir") + log.info("Sync iteration ended without issues, removing the attempt's dir") shutil.rmtree(self.cur_attempt_dir) # Return the list of ip addresses we want to ban and the end of a sync attempt, @@ -391,6 +415,24 @@ def touch_flag(self, flag: str, contents=None): log.warning(f"Flag created: {flag}") + def load_perma_banned_peers(self) -> set[str]: + def trim_line(line): + # Allow the file to have comments + return line.split("#", 1)[0].strip() + + log.debug(f"Checking {self.permabanned_peers_file} for the list of permabanned peer addresses") + + try: + with open(self.permabanned_peers_file, "r") as file: + lines = file.readlines() + lines = [ + trimmed_line for line in lines + if len(trimmed_line := trim_line(line)) > 0 + ] + return set(lines) + except FileNotFoundError: + return set() + def set_status_and_debug_log(status): log.debug(status) @@ -407,33 +449,59 @@ def main(): try: parser = argparse.ArgumentParser( formatter_class=argparse.ArgumentDefaultsHelpFormatter) - parser.add_argument("--node-cmd", + parser.add_argument( + "--node-cmd", help="Command to run the node", default=DEFAULT_NODE_CMD) - parser.add_argument("--node-rpc-bind-address", + parser.add_argument( + "--node-rpc-bind-address", help="Node PRC bind address", default=DEFAULT_NODE_RPC_BIND_ADDR) parser.add_argument( - '--api-server-url', + "--api-server-url", help='API server URL', required=True) - parser.add_argument("--chain-type", + parser.add_argument( + "--chain-type", help="Chain type", choices=CHAIN_TYPE_CHOICES, default=DEFAULT_CHAIN_TYPE) - parser.add_argument("--output-dir", - help="Output directory", + parser.add_argument( + "--working-dir", + help="Working directory, where all the output will be put", required=True) - parser.add_argument(f"--{CONTINUE_OPTION_NAME}", + parser.add_argument( + f"--{CONTINUE_OPTION_NAME}", help=(f"Proceed even if the '{CUR_ATTEMPT_SUBDIR}' subdirectory " - "already exists in the output dir"), + "already exists in the working dir"), action="store_true", dest="can_continue") - parser.add_argument(f"--unban-all", - help=("Unban all node's peers on start"), + parser.add_argument( + "--ban-duration", + help="Ban duration, in hours", + dest="ban_duration_hours", + default=DEFAULT_BAN_DURATION_HOURS) + parser.add_argument( + "--unban-all", + help="Unban all node's peers on start", action="store_true") + parser.add_argument( + "--notification-email", + help="Send notifications to this email using the local SMTP server", + default=None) + parser.add_argument( + "--notification-email-from", + help=("The from address for the notification email. " + "If None, the --notification-email value will be used"), + default=None) args = parser.parse_args() - Handler(args).run() + email_sender = EmailSender(args.notification_email, args.notification_email_from) + + try: + Handler(args, email_sender).run() + except Exception as e: + email_sender.send("Error", f"Script terminated due to exception: {e}") + raise except Error as e: print(f"Error: {e}") sys.exit(1) diff --git a/build-tools/fork-detection/utils.py b/build-tools/fork-detection/utils.py index 86e9121f9..5b8863480 100644 --- a/build-tools/fork-detection/utils.py +++ b/build-tools/fork-detection/utils.py @@ -1,11 +1,14 @@ import json import logging import os +import platform import queue import requests +import smtplib import sys import time from collections import namedtuple +from email.mime.text import MIMEText from pathlib import Path from queue import Queue from threading import Lock @@ -395,3 +398,22 @@ def show_cursor(): if sys.stderr.isatty(): sys.stderr.write(esc_seq) + +# Sends notification emails to the specified address if it's not None, otherwise does nothing. +class EmailSender: + # to_addr - the address to send emails to; if None, nothing will be sent. + # from_addr - the 'from' address for the emails; if None, to_addr will be used. + def __init__(self, to_addr: str | None, from_addr: str | None): + self.to_addr = to_addr + self.from_addr = from_addr or to_addr + + def send(self, msg_subj, msg_body): + if self.to_addr is not None: + msg = MIMEText(msg_body) + msg["Subject"] = msg_subj + msg["From"] = f"Fork detection script at {platform.node()} <{self.from_addr}>" + msg["To"] = self.to_addr + + s = smtplib.SMTP('localhost') + s.sendmail(self.from_addr, [self.to_addr], msg.as_string()) + s.quit() From 30a37886ef78da362ebb0e85c9ecdbb55a48d1f0 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Mon, 29 Sep 2025 19:12:02 +0300 Subject: [PATCH 3/6] Fork detection script: don't call API server on every block; don't create a flag for each received stale block; handle the situation when the node can't receive new blocks. --- build-tools/fork-detection/detector.py | 166 +++++++++++++++++-------- build-tools/fork-detection/utils.py | 11 ++ chainstate/src/detail/mod.rs | 18 ++- 3 files changed, 136 insertions(+), 59 deletions(-) diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py index 7685686a2..bdcfe9d11 100644 --- a/build-tools/fork-detection/detector.py +++ b/build-tools/fork-detection/detector.py @@ -11,6 +11,7 @@ from pathlib import Path from queue import Queue from threading import Thread +from typing import Optional from urllib.parse import urlparse from utils import ( @@ -40,24 +41,32 @@ # Note: this is defined by the node and cannot be changed. PEER_DB_SUBDIR_IN_NODE_DATA = "peerdb-lmdb" -# The mapping from node's output to the name of the flag that must be created as a result. +# If the height difference between the current tip and a stale block is bigger than or equal to +# this value, a reorg to the stale block is no longer possible. +MAX_REORG_DEPTH = 1000 + +# The mapping from node's output to the name of the flag that must be automatically created +# as a result. NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING = [ (re.compile(r"\bCRITICAL\b"), "critical_error"), (re.compile(r"Checkpoint mismatch"), "checkpoint_mismatch"), (re.compile(r"\bERROR\b.+\bprocess_block\b"), "process_block_failure"), (re.compile(r"\bERROR\b.+\bpreliminary_block_check\b"), "preliminary_block_check_failure"), (re.compile(r"\bERROR\b.+\bpreliminary_headers_check\b"), "preliminary_headers_check_failure"), - (re.compile(r"Stale block received"), "stale_block_received"), ] -# The "flag" that will be created if the node's mainchain block differs from the API server's. -# Note: technically this is not a flag, because it will contain additional data. -POTENTIAL_REORGS_FLAG_NAME = "potential_reorgs" +ENDED_UP_ON_A_FORK_FLAG_NAME = "ended_up_on_a_fork" +STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME = "stale_block_below_reorg_limit" +NO_INCOMING_BLOCKS_WHILE_ON_STALE_CHAIN_FLAG_NAME = "no_incoming_blocks_while_on_stale_chain" NODE_OUTPUT_LINE_NEW_TIP_REGEX = re.compile( r"NEW TIP in chainstate (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" ) -# The regex by which we determine that node's output line should be printed to the console +NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX = re.compile( + r"Received stale block (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" +) + +# The regex used to decide whether a node's output line should be printed to the console # (we want to avoid debug and info lines since they're both too noisy during sync and put extra # strain on the console app). # Note that this is not 100% reliable, because a log record can technically span multiple lines, @@ -66,6 +75,7 @@ # But even if we did, this approach is "good enough" anyway, since you can always look into the log # file for the missing details. NODE_OUTPUT_LINE_TO_PRINT_REGEX = re.compile(r"^\S+\s+(WARN|ERROR)\b") + # The regex by which we determine that the node is actually being started; this is mainly needed # because by default we invoke cargo, which may have to do a lengthy compilation first. # Also note that we use a log line indicating that p2p has already been started (instead of, say, @@ -149,7 +159,7 @@ def do_full_sync(self): self.restore_peer_db() node_proc_env = os.environ.copy() - # Note: "chainstate_verbose_block_ids=debug" allows to catch the "Stale block received" line + # Note: "chainstate_verbose_block_ids=debug" allows to catch the "Received stale block" line # and also forces certain block-processing functions in chainstate to print full block ids. # We avoid using the "normal" debug log, because it's too noisy, e.g. even # "info,chainstate=debug" produces hundreds of megabytes of logs during the full sync. @@ -162,8 +172,8 @@ def do_full_sync(self): stderr=subprocess.STDOUT, env=node_proc_env) - last_block_arrival_time = None - last_handled_height = None + last_tip_arrival_time = None + last_tip_height = None node_proc_stdout_queue = Queue() Thread( @@ -171,65 +181,105 @@ def do_full_sync(self): args=(node_proc.stdout, node_proc_stdout_queue, self.cur_attempt_logs_file) ).start() - # Handle a node's output line, return True if the current attempt should continue - # and False otherwise. - def handle_node_output_line(line: str): - nonlocal actual_tip_height, last_block_arrival_time, last_handled_height + # This is called for each node's output line and on a timeout when reading the line + # from a queue. + # Returns True if the current attempt should continue and False otherwise. + def on_node_output_line_or_timeout(line: Optional[str]): + nonlocal actual_tip_height, last_tip_arrival_time, last_tip_height + + line = line if line is not None else "" for line_re, flag in NODE_OUTPUT_LINE_REGEX_TO_FLAG_MAPPING: if line_re.search(line) is not None: self.touch_flag(flag) - new_tip_match = NODE_OUTPUT_LINE_NEW_TIP_REGEX.search(line) - if new_tip_match is not None: - cur_seconds_since_epoch = time.time() - seconds_without_blocks = ( - cur_seconds_since_epoch - last_block_arrival_time - if last_block_arrival_time is not None else 0 - ) - last_block_arrival_time = cur_seconds_since_epoch + cur_seconds_since_epoch = time.time() + if (new_tip_match := NODE_OUTPUT_LINE_NEW_TIP_REGEX.search(line)) is not None: block_id = new_tip_match.group("block_id") height = int(new_tip_match.group("height")) - last_handled_height = height timestamp = int(new_tip_match.group("timestamp")) + last_tip_arrival_time = cur_seconds_since_epoch + last_tip_height = height + if height % 10 == 0: CONSOLE_PRINTER.set_status(f"Synced to height {height}") # Update actual_tip_height if we've reached it. - # Note: >= is used to make sure that the check "height == actual_tip_height" - # below uses the updated height. if height >= actual_tip_height: actual_tip_height = self.api_server_client.get_tip().height - if height > actual_tip_height: - log_func = log.info if height == actual_tip_height + 1 else log.warning; - log_func( - f"Tip reached, height = {height} (the API server is {height-actual_tip_height} block(s) behind)" + fresh_block_reached = timestamp >= cur_seconds_since_epoch - 120 + actual_tip_height_reached = height >= actual_tip_height + + # Note: we can't query the API server on every block, because it's a costly operation + # (unless the API server is being run on the same machine). So we only query it every + # few hundred blocks or if we're near the end of the sync. + # Note: 500 was chosen because it's also the distance between our checkpoints, + # but the precise value is not essential. + if height % 500 == 0 or fresh_block_reached or actual_tip_height_reached: + actual_block_id = self.api_server_client.get_block_id(height) + if block_id.lower() != actual_block_id.lower(): + if actual_tip_height - height >= MAX_REORG_DEPTH: + self.touch_flag(ENDED_UP_ON_A_FORK_FLAG_NAME) + + if fresh_block_reached: + log.info(f"Fresh block on a stale chain reached (height = {height})") + return False + + if actual_tip_height_reached: + log_func = log.info if height <= actual_tip_height + 1 else log.warning + extra = ( + "" if height == actual_tip_height + else f" (the API server is {height-actual_tip_height} block(s) behind)" ) + log_func(f"Tip reached, height = {height}{extra}") return False - actual_block_id = self.api_server_client.get_block_id(height) - if block_id.lower() != actual_block_id.lower(): + elif (stale_block_received_match := NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX.search(line)) is not None: + block_id = stale_block_received_match.group("block_id") + height = int(stale_block_received_match.group("height")) + + log.warn(f"Stale block received at height {height}: {block_id}") + + if actual_tip_height - height >= MAX_REORG_DEPTH: + # Note: this may mean 2 things: + # a) If we're on the proper chain, then we've found a peer who is on + # a fork, in which case we definitely want to create this flag. + # b) If we're on a fork, then the stale block may be from the proper + # chain, in which case the flag creation is redundant, because the code + # above or below should catch this situation; but it's not harmful either. self.touch_flag( - POTENTIAL_REORGS_FLAG_NAME, - f"height={height}, node block id={block_id}, api srv block id={actual_block_id}" + STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME, + f"height={height}, block id={block_id}" ) + else: + seconds_since_last_tip = ( + cur_seconds_since_epoch - last_tip_arrival_time + if last_tip_arrival_time is not None else 0 + ) - # Check if the block is fresh enough - if timestamp >= cur_seconds_since_epoch - 120: - log.info(f"Fresh block on a stale chain reached (height = {height})") - return False + # Note: the reason for not receiving any blocks may be that we've already banned + # all or most of the potential peers. But if we're on a stale chain, then we may + # not receive any more blocks, so we have to stop. + # We'll also stop if some flags have already been created. - # Check if we haven't seen a block in awhile - if seconds_without_blocks >= 120: - log.info(f"No incoming blocks while on stale chain (height = {height})") - return False + if seconds_since_last_tip >= 120: + chainstate_info = self.node_rpc_client.get_chainstate_info() + tip_id = chainstate_info.best_block_id + tip_height = chainstate_info.best_block_height - if height == actual_tip_height: - log.info(f"Tip reached, height = {height}") - return False + if tip_height != 0: + actual_block_id = self.api_server_client.get_block_id(tip_height) + + if tip_id.lower() != actual_block_id.lower(): + self.touch_flag(NO_INCOMING_BLOCKS_WHILE_ON_STALE_CHAIN_FLAG_NAME) + return False + + if self.have_flags(): + log.info("Exiting because we haven't received any blocks in a while, but some flags already exist") + return False return True @@ -293,18 +343,21 @@ def on_attempt_completion(): while True: try: - line = node_proc_stdout_queue.get() + try: + line = node_proc_stdout_queue.get(timeout=10) - if NODE_OUTPUT_LINE_TO_PRINT_REGEX.search(line) is not None: - stdout_prefix = colored("node> ", NODE_OUTPUT_PREFIX_COLOR) - CONSOLE_PRINTER.print_to_stderr(f"{stdout_prefix} {line}", end="") + if NODE_OUTPUT_LINE_TO_PRINT_REGEX.search(line) is not None: + stdout_prefix = colored("node> ", NODE_OUTPUT_PREFIX_COLOR) + CONSOLE_PRINTER.print_to_stderr(f"{stdout_prefix} {line}", end="") - if not node_started and NODE_STARTUP_OUTPUT_LINE_REGEX.search(line) is not None: - node_started = True - set_status_and_debug_log("Node started") - on_node_started() + if not node_started and NODE_STARTUP_OUTPUT_LINE_REGEX.search(line) is not None: + node_started = True + set_status_and_debug_log("Node started") + on_node_started() + except queue.Empty: + line = None - if not handle_node_output_line(line): + if not on_node_output_line_or_timeout(line): break except queue.ShutDown: # This means that the node has exited prematurely. But we check for this @@ -321,8 +374,8 @@ def on_attempt_completion(): on_attempt_completion() finally: - if last_handled_height is not None: - log.debug(f"Last handled height: {last_handled_height}") + if last_tip_height is not None: + log.debug(f"Last handled tip height: {last_tip_height}") set_status_and_debug_log("Terminating the node") @@ -347,7 +400,7 @@ def on_attempt_completion(): self.save_peer_db() # If the script has created some flags, save the directory - if len(os.listdir(self.cur_attempt_flags_dir)) > 0: + if self.have_flags(): os.makedirs(self.saved_attempts_dir, exist_ok=True) backup_dir_name = datetime.today().strftime("%Y-%m-%d_%H-%M-%S") @@ -363,6 +416,9 @@ def on_attempt_completion(): log.info("Sync iteration ended without issues, removing the attempt's dir") shutil.rmtree(self.cur_attempt_dir) + def have_flags(self): + return len(os.listdir(self.cur_attempt_flags_dir)) > 0 + # Return the list of ip addresses we want to ban and the end of a sync attempt, # to prevent syncing with the same peers again and again. def get_node_peer_ip_addrs_to_ban(self): diff --git a/build-tools/fork-detection/utils.py b/build-tools/fork-detection/utils.py index 5b8863480..2f5f6ae03 100644 --- a/build-tools/fork-detection/utils.py +++ b/build-tools/fork-detection/utils.py @@ -237,6 +237,7 @@ def reader(log_stream): BlockInfo = namedtuple("BlockInfo", ["id", "height"]) BannedPeer = namedtuple("BannedPeer", ["ip", "banned_until_as_secs_since_epoch"]) +ChainstateInfo = namedtuple("ChainstateInfo", ["best_block_height", "best_block_id", "best_block_timestamp"]) class APIServerClient: @@ -333,6 +334,16 @@ def ban_peer(self, peer_addr: str, duration_secs: int): def unban_peer(self, peer_addr: str): self._post("p2p_unban", [peer_addr]) + def get_chainstate_info(self) -> ChainstateInfo: + info = self._post("chainstate_info", []) + bb_height = int(info["best_block_height"]) + bb_timestamp = int(info["best_block_timestamp"]["timestamp"]) + bb_id = info["best_block_id"] + + return ChainstateInfo( + best_block_height=bb_height, best_block_id=bb_id, best_block_timestamp=bb_timestamp + ) + # Assuming that the node has already been started, wait until it is reachable via rpc. def ensure_rpc_started(self): max_attempts = 10 diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index 33d0178e2..5009a3eeb 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -636,10 +636,20 @@ impl Chainstate self.update_initial_block_download_flag() .map_err(BlockError::BestBlockIdQueryError)?; } else { - tracing::debug!( - target: CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS, - "Stale block received: {block_id}" - ); + if tracing::event_enabled!(tracing::Level::DEBUG, CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS) { + // FIXME: return custom enum from attempt_to_process_block + let chainstate_ref = self.make_db_tx_ro().map_err(BlockError::from)?; + let bi = get_existing_block_index(&chainstate_ref, &block_id)?; + + tracing::debug!( + target: CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS, + "Received stale block {:x} with height {}, timestamp: {} ({})", + bi.block_id(), + bi.block_height(), + bi.block_timestamp(), + bi.block_timestamp().into_time(), + ) + } } Ok(result) From 377af9d1802313bda0bd4261025c81c90a34491a Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 3 Oct 2025 18:55:14 +0300 Subject: [PATCH 4/6] Minor fix in build-tools/block-data-plots/collect_data.py --- build-tools/block-data-plots/collect_data.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build-tools/block-data-plots/collect_data.py b/build-tools/block-data-plots/collect_data.py index 396d9dc89..d505c1d71 100644 --- a/build-tools/block-data-plots/collect_data.py +++ b/build-tools/block-data-plots/collect_data.py @@ -1,4 +1,5 @@ import argparse +import os import subprocess from pathlib import Path @@ -14,6 +15,7 @@ def collect_data(args): if args.output_file is None: + os.makedirs(DEFAULT_OUTPUT_DIR, exist_ok=True) output_file = DEFAULT_OUTPUT_DIR.joinpath( DEFAULT_OUTPUT_FILE_NAME_FMT.format(chain_type=args.chain_type) ) From ff1c2ef95f67440839f90b106a2ec5f1463a428e Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Mon, 13 Oct 2025 20:34:05 +0300 Subject: [PATCH 5/6] Minor cleanup --- build-tools/block-data-plots/collect_data.py | 2 +- build-tools/fork-detection/detector.py | 4 +++- build-tools/fork-detection/utils.py | 5 +++-- chainstate/db-dumper/src/dumper/options.rs | 2 +- chainstate/src/detail/error.rs | 2 +- 5 files changed, 9 insertions(+), 6 deletions(-) diff --git a/build-tools/block-data-plots/collect_data.py b/build-tools/block-data-plots/collect_data.py index d505c1d71..f474c589d 100644 --- a/build-tools/block-data-plots/collect_data.py +++ b/build-tools/block-data-plots/collect_data.py @@ -28,7 +28,7 @@ def collect_data(args): "--output-file", output_file, "--mainchain-only=true", "--fields=height,timestamp,target", - "--from_height=0" + "--from-height=0" ] if args.node_data_dir is not None: diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py index bdcfe9d11..c4b237af2 100644 --- a/build-tools/fork-detection/detector.py +++ b/build-tools/fork-detection/detector.py @@ -551,7 +551,9 @@ def main(): default=None) args = parser.parse_args() - email_sender = EmailSender(args.notification_email, args.notification_email_from) + email_sender = EmailSender( + args.chain_type, args.notification_email, args.notification_email_from + ) try: Handler(args, email_sender).run() diff --git a/build-tools/fork-detection/utils.py b/build-tools/fork-detection/utils.py index 2f5f6ae03..7169fb7d4 100644 --- a/build-tools/fork-detection/utils.py +++ b/build-tools/fork-detection/utils.py @@ -414,7 +414,8 @@ def show_cursor(): class EmailSender: # to_addr - the address to send emails to; if None, nothing will be sent. # from_addr - the 'from' address for the emails; if None, to_addr will be used. - def __init__(self, to_addr: str | None, from_addr: str | None): + def __init__(self, chain_type: str, to_addr: str | None, from_addr: str | None): + self.chain_type = chain_type self.to_addr = to_addr self.from_addr = from_addr or to_addr @@ -422,7 +423,7 @@ def send(self, msg_subj, msg_body): if self.to_addr is not None: msg = MIMEText(msg_body) msg["Subject"] = msg_subj - msg["From"] = f"Fork detection script at {platform.node()} <{self.from_addr}>" + msg["From"] = f"Fork detection script at {platform.node()} ({self.chain_type}) <{self.from_addr}>" msg["To"] = self.to_addr s = smtplib.SMTP('localhost') diff --git a/chainstate/db-dumper/src/dumper/options.rs b/chainstate/db-dumper/src/dumper/options.rs index 761e42d40..121d4f685 100644 --- a/chainstate/db-dumper/src/dumper/options.rs +++ b/chainstate/db-dumper/src/dumper/options.rs @@ -69,7 +69,7 @@ pub struct Options { pub mainchain_only: bool, /// Block height to start from - #[clap(long = "from_height", default_value_t = 0)] + #[clap(long = "from-height", default_value_t = 0)] pub from_height: u64, /// This help string diff --git a/chainstate/src/detail/error.rs b/chainstate/src/detail/error.rs index 4d78d9206..439d1aff4 100644 --- a/chainstate/src/detail/error.rs +++ b/chainstate/src/detail/error.rs @@ -168,7 +168,7 @@ pub enum CheckBlockError { #[error("CRITICAL: Failed to retrieve ancestor of submitted block: {0}")] GetAncestorError(#[from] GetAncestorError), #[error( - "Attempted to add a block before reorg limit (attempted at height: {} while current height is: {} and min allowed is: {})", + "Attempted to add a block before reorg limit (common ancestor height is: {} while current tip height is: {} and min allowed is: {})", common_ancestor_height, tip_block_height, min_allowed_height From 4b860c6754d2f6f187b4fe91010d0d8c4735aed2 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Fri, 24 Oct 2025 19:11:37 +0300 Subject: [PATCH 6/6] Remove the redundant handling of stale blocks below the max reorg depth (they are rejected by the chainstate anyway) --- build-tools/fork-detection/detector.py | 30 ++++---------------------- chainstate/src/detail/mod.rs | 20 ++--------------- 2 files changed, 6 insertions(+), 44 deletions(-) diff --git a/build-tools/fork-detection/detector.py b/build-tools/fork-detection/detector.py index c4b237af2..0b56068d9 100644 --- a/build-tools/fork-detection/detector.py +++ b/build-tools/fork-detection/detector.py @@ -56,15 +56,11 @@ ] ENDED_UP_ON_A_FORK_FLAG_NAME = "ended_up_on_a_fork" -STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME = "stale_block_below_reorg_limit" NO_INCOMING_BLOCKS_WHILE_ON_STALE_CHAIN_FLAG_NAME = "no_incoming_blocks_while_on_stale_chain" NODE_OUTPUT_LINE_NEW_TIP_REGEX = re.compile( r"NEW TIP in chainstate (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" ) -NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX = re.compile( - r"Received stale block (?P[0-9A-Fa-f]+) with height (?P\d+), timestamp: (?P\d+)" -) # The regex used to decide whether a node's output line should be printed to the console # (we want to avoid debug and info lines since they're both too noisy during sync and put extra @@ -159,10 +155,10 @@ def do_full_sync(self): self.restore_peer_db() node_proc_env = os.environ.copy() - # Note: "chainstate_verbose_block_ids=debug" allows to catch the "Received stale block" line - # and also forces certain block-processing functions in chainstate to print full block ids. - # We avoid using the "normal" debug log, because it's too noisy, e.g. even - # "info,chainstate=debug" produces hundreds of megabytes of logs during the full sync. + # Note: "chainstate_verbose_block_ids=debug" forces certain block-processing functions + # in chainstate to print full block ids. We avoid using the "normal" debug log, because + # it's too noisy, e.g. even "info,chainstate=debug" produces hundreds of megabytes of + # logs during the full sync. node_proc_env["RUST_LOG"] = "info,chainstate_verbose_block_ids=debug" node_proc = subprocess.Popen( @@ -236,24 +232,6 @@ def on_node_output_line_or_timeout(line: Optional[str]): ) log_func(f"Tip reached, height = {height}{extra}") return False - - elif (stale_block_received_match := NODE_OUTPUT_LINE_STALE_BLOCK_RECEIVED_REGEX.search(line)) is not None: - block_id = stale_block_received_match.group("block_id") - height = int(stale_block_received_match.group("height")) - - log.warn(f"Stale block received at height {height}: {block_id}") - - if actual_tip_height - height >= MAX_REORG_DEPTH: - # Note: this may mean 2 things: - # a) If we're on the proper chain, then we've found a peer who is on - # a fork, in which case we definitely want to create this flag. - # b) If we're on a fork, then the stale block may be from the proper - # chain, in which case the flag creation is redundant, because the code - # above or below should catch this situation; but it's not harmful either. - self.touch_flag( - STALE_BLOCK_BELOW_REORG_LIMIT_FLAG_NAME, - f"height={height}, block id={block_id}" - ) else: seconds_since_last_tip = ( cur_seconds_since_epoch - last_tip_arrival_time diff --git a/chainstate/src/detail/mod.rs b/chainstate/src/detail/mod.rs index 5009a3eeb..a48cdcb0a 100644 --- a/chainstate/src/detail/mod.rs +++ b/chainstate/src/detail/mod.rs @@ -94,9 +94,8 @@ type ChainstateEventHandler = EventHandler; pub type OrphanErrorHandler = dyn Fn(&BlockError) + Send + Sync; -/// A tracing target that either forces full block ids to be printed where they're normally -/// printed in the abbreviated form, or just makes block ids be printed where normally they won't -/// be. +/// A tracing target that forces full block ids to be printed in certain places where they're +/// normally printed in the abbreviated form. pub const CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS: &str = "chainstate_verbose_block_ids"; #[must_use] @@ -635,21 +634,6 @@ impl Chainstate self.update_initial_block_download_flag() .map_err(BlockError::BestBlockIdQueryError)?; - } else { - if tracing::event_enabled!(tracing::Level::DEBUG, CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS) { - // FIXME: return custom enum from attempt_to_process_block - let chainstate_ref = self.make_db_tx_ro().map_err(BlockError::from)?; - let bi = get_existing_block_index(&chainstate_ref, &block_id)?; - - tracing::debug!( - target: CHAINSTATE_TRACING_TARGET_VERBOSE_BLOCK_IDS, - "Received stale block {:x} with height {}, timestamp: {} ({})", - bi.block_id(), - bi.block_height(), - bi.block_timestamp(), - bi.block_timestamp().into_time(), - ) - } } Ok(result)