diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 84fc509462f..fd219f101ed 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -13,7 +13,7 @@ jobs: 1.30.0, # 1.34.2 is Debian stable 1.34.2, - # 1.45.2 is MSRV for lightning-net-tokio and generates coverage + # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, and coverage generation 1.45.2] include: - toolchain: stable @@ -48,6 +48,24 @@ jobs: - name: Build on Rust ${{ matrix.toolchain }} if: "! matrix.build-net-tokio" run: cargo build --verbose --color always -p lightning + - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features + if: "matrix.build-net-tokio && !matrix.coverage" + run: | + cd lightning-block-sync + cargo build --verbose --color always --features rest-client + cargo build --verbose --color always --features rpc-client + cargo build --verbose --color always --features rpc-client,rest-client + cargo build --verbose --color always --features rpc-client,rest-client,tokio + cd .. + - name: Build Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation + if: matrix.coverage + run: | + cd lightning-block-sync + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rest-client + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client + RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio + cd .. - name: Test on Rust ${{ matrix.toolchain }} with net-tokio if: "matrix.build-net-tokio && !matrix.coverage" run: cargo test --verbose --color always @@ -57,6 +75,24 @@ jobs: - name: Test on Rust ${{ matrix.toolchain }} if: "! matrix.build-net-tokio" run: cargo test --verbose --color always -p lightning + - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features + if: "matrix.build-net-tokio && !matrix.coverage" + run: | + cd lightning-block-sync + cargo test --verbose --color always --features rest-client + cargo test --verbose --color always --features rpc-client + cargo test --verbose --color always --features rpc-client,rest-client + cargo test --verbose --color always --features rpc-client,rest-client,tokio + cd .. + - name: Test Block Sync Clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation + if: matrix.coverage + run: | + cd lightning-block-sync + RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rest-client + RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client + RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client + RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always --features rpc-client,rest-client,tokio + cd .. - name: Install deps for kcov if: matrix.coverage run: | diff --git a/Cargo.toml b/Cargo.toml index c43e7927581..96f4b1d1770 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ members = [ "lightning", + "lightning-block-sync", "lightning-net-tokio", "lightning-persister", ] diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml new file mode 100644 index 00000000000..aec6d1404c0 --- /dev/null +++ b/lightning-block-sync/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "lightning-block-sync" +version = "0.0.1" +authors = ["Jeffrey Czyz", "Matt Corallo"] +license = "Apache-2.0" +edition = "2018" +description = """ +Utilities to fetch the chain data from a block source and feed them into Rust Lightning. +""" + +[features] +rest-client = [ "serde", "serde_json", "chunked_transfer" ] +rpc-client = [ "serde", "serde_json", "chunked_transfer" ] + +[dependencies] +bitcoin = "0.24" +lightning = { version = "0.0.12", path = "../lightning" } +tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true } +serde = { version = "1.0", features = ["derive"], optional = true } +serde_json = { version = "1.0", optional = true } +chunked_transfer = { version = "1.4", optional = true } +futures = { version = "0.3" } + +[dev-dependencies] +tokio = { version = "1.0", features = [ "macros", "rt" ] } diff --git a/lightning-block-sync/src/convert.rs b/lightning-block-sync/src/convert.rs new file mode 100644 index 00000000000..37b2c432397 --- /dev/null +++ b/lightning-block-sync/src/convert.rs @@ -0,0 +1,472 @@ +use crate::{BlockHeaderData, BlockSourceError}; +use crate::http::{BinaryResponse, JsonResponse}; +use crate::utils::hex_to_uint256; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::consensus::encode; +use bitcoin::hash_types::{BlockHash, TxMerkleNode}; +use bitcoin::hashes::hex::{ToHex, FromHex}; + +use serde::Deserialize; + +use serde_json; + +use std::convert::From; +use std::convert::TryFrom; +use std::convert::TryInto; + +/// Conversion from `std::io::Error` into `BlockSourceError`. +impl From for BlockSourceError { + fn from(e: std::io::Error) -> BlockSourceError { + match e.kind() { + std::io::ErrorKind::InvalidData => BlockSourceError::persistent(e), + std::io::ErrorKind::InvalidInput => BlockSourceError::persistent(e), + _ => BlockSourceError::transient(e), + } + } +} + +/// Parses binary data as a block. +impl TryInto for BinaryResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + match encode::deserialize(&self.0) { + Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")), + Ok(block) => Ok(block), + } + } +} + +/// Converts a JSON value into block header data. The JSON value may be an object representing a +/// block header or an array of such objects. In the latter case, the first object is converted. +impl TryInto for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + let mut header = match self.0 { + serde_json::Value::Array(mut array) if !array.is_empty() => array.drain(..).next().unwrap(), + serde_json::Value::Object(_) => self.0, + _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "unexpected JSON type")), + }; + + if !header.is_object() { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + } + + // Add an empty previousblockhash for the genesis block. + if let None = header.get("previousblockhash") { + let hash: BlockHash = Default::default(); + header.as_object_mut().unwrap().insert("previousblockhash".to_string(), serde_json::json!(hash.to_hex())); + } + + match serde_json::from_value::(header) { + Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header response")), + Ok(response) => match response.try_into() { + Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid header data")), + Ok(header) => Ok(header), + }, + } + } +} + +/// Response data from `getblockheader` RPC and `headers` REST requests. +#[derive(Deserialize)] +struct GetHeaderResponse { + pub version: i32, + pub merkleroot: String, + pub time: u32, + pub nonce: u32, + pub bits: String, + pub previousblockhash: String, + + pub chainwork: String, + pub height: u32, +} + +/// Converts from `GetHeaderResponse` to `BlockHeaderData`. +impl TryFrom for BlockHeaderData { + type Error = bitcoin::hashes::hex::Error; + + fn try_from(response: GetHeaderResponse) -> Result { + Ok(BlockHeaderData { + header: BlockHeader { + version: response.version, + prev_blockhash: BlockHash::from_hex(&response.previousblockhash)?, + merkle_root: TxMerkleNode::from_hex(&response.merkleroot)?, + time: response.time, + bits: u32::from_be_bytes(<[u8; 4]>::from_hex(&response.bits)?), + nonce: response.nonce, + }, + chainwork: hex_to_uint256(&response.chainwork)?, + height: response.height, + }) + } +} + + +/// Converts a JSON value into a block. Assumes the block is hex-encoded in a JSON string. +impl TryInto for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + match self.0.as_str() { + None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), + Some(hex_data) => match Vec::::from_hex(hex_data) { + Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + Ok(block_data) => match encode::deserialize(&block_data) { + Err(_) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid block data")), + Ok(block) => Ok(block), + }, + }, + } + } +} + +/// Converts a JSON value into the best block hash and optional height. +impl TryInto<(BlockHash, Option)> for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result<(BlockHash, Option)> { + if !self.0.is_object() { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + } + + let hash = match &self.0["bestblockhash"] { + serde_json::Value::String(hex_data) => match BlockHash::from_hex(&hex_data) { + Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid hex data")), + Ok(block_hash) => block_hash, + }, + _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON string")), + }; + + let height = match &self.0["blocks"] { + serde_json::Value::Null => None, + serde_json::Value::Number(height) => match height.as_u64() { + None => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")), + Some(height) => match height.try_into() { + Err(_) => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid height")), + Ok(height) => Some(height), + } + }, + _ => return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON number")), + }; + + Ok((hash, height)) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::consensus::encode; + use bitcoin::network::constants::Network; + + /// Converts from `BlockHeaderData` into a `GetHeaderResponse` JSON value. + impl From for serde_json::Value { + fn from(data: BlockHeaderData) -> Self { + let BlockHeaderData { chainwork, height, header } = data; + serde_json::json!({ + "chainwork": chainwork.to_string()["0x".len()..], + "height": height, + "version": header.version, + "merkleroot": header.merkle_root.to_hex(), + "time": header.time, + "nonce": header.nonce, + "bits": header.bits.to_hex(), + "previousblockhash": header.prev_blockhash.to_hex(), + }) + } + } + + #[test] + fn into_block_header_from_json_response_with_unexpected_type() { + let response = JsonResponse(serde_json::json!(42)); + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "unexpected JSON type"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_header_from_json_response_with_unexpected_header_type() { + let response = JsonResponse(serde_json::json!([42])); + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_header_from_json_response_with_invalid_header_response() { + let block = genesis_block(Network::Bitcoin); + let mut response = JsonResponse(BlockHeaderData { + chainwork: block.header.work(), + height: 0, + header: block.header + }.into()); + response.0["chainwork"].take(); + + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid header response"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_header_from_json_response_with_invalid_header_data() { + let block = genesis_block(Network::Bitcoin); + let mut response = JsonResponse(BlockHeaderData { + chainwork: block.header.work(), + height: 0, + header: block.header + }.into()); + response.0["chainwork"] = serde_json::json!("foobar"); + + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid header data"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_header_from_json_response_with_valid_header() { + let block = genesis_block(Network::Bitcoin); + let response = JsonResponse(BlockHeaderData { + chainwork: block.header.work(), + height: 0, + header: block.header + }.into()); + + match TryInto::::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(data) => { + assert_eq!(data.chainwork, block.header.work()); + assert_eq!(data.height, 0); + assert_eq!(data.header, block.header); + }, + } + } + + #[test] + fn into_block_header_from_json_response_with_valid_header_array() { + let genesis_block = genesis_block(Network::Bitcoin); + let best_block_header = BlockHeader { + prev_blockhash: genesis_block.block_hash(), + ..genesis_block.header + }; + let chainwork = genesis_block.header.work() + best_block_header.work(); + let response = JsonResponse(serde_json::json!([ + serde_json::Value::from(BlockHeaderData { + chainwork, height: 1, header: best_block_header, + }), + serde_json::Value::from(BlockHeaderData { + chainwork: genesis_block.header.work(), height: 0, header: genesis_block.header, + }), + ])); + + match TryInto::::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(data) => { + assert_eq!(data.chainwork, chainwork); + assert_eq!(data.height, 1); + assert_eq!(data.header, best_block_header); + }, + } + } + + #[test] + fn into_block_header_from_json_response_without_previous_block_hash() { + let block = genesis_block(Network::Bitcoin); + let mut response = JsonResponse(BlockHeaderData { + chainwork: block.header.work(), + height: 0, + header: block.header + }.into()); + response.0.as_object_mut().unwrap().remove("previousblockhash"); + + match TryInto::::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(BlockHeaderData { chainwork: _, height: _, header }) => { + assert_eq!(header, block.header); + }, + } + } + + #[test] + fn into_block_from_invalid_binary_response() { + let response = BinaryResponse(b"foo".to_vec()); + match TryInto::::try_into(response) { + Err(_) => {}, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_from_valid_binary_response() { + let genesis_block = genesis_block(Network::Bitcoin); + let response = BinaryResponse(encode::serialize(&genesis_block)); + match TryInto::::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(block) => assert_eq!(block, genesis_block), + } + } + + #[test] + fn into_block_from_json_response_with_unexpected_type() { + let response = JsonResponse(serde_json::json!({ "result": "foo" })); + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_from_json_response_with_invalid_hex_data() { + let response = JsonResponse(serde_json::json!("foobar")); + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_from_json_response_with_invalid_block_data() { + let response = JsonResponse(serde_json::json!("abcd")); + match TryInto::::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid block data"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_from_json_response_with_valid_block_data() { + let genesis_block = genesis_block(Network::Bitcoin); + let response = JsonResponse(serde_json::json!(encode::serialize_hex(&genesis_block))); + match TryInto::::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(block) => assert_eq!(block, genesis_block), + } + } + + #[test] + fn into_block_hash_from_json_response_with_unexpected_type() { + let response = JsonResponse(serde_json::json!("foo")); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_hash_from_json_response_with_unexpected_bestblockhash_type() { + let response = JsonResponse(serde_json::json!({ "bestblockhash": 42 })); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON string"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_hash_from_json_response_with_invalid_hex_data() { + let response = JsonResponse(serde_json::json!({ "bestblockhash": "foobar"} )); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid hex data"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_hash_from_json_response_without_height() { + let block = genesis_block(Network::Bitcoin); + let response = JsonResponse(serde_json::json!({ + "bestblockhash": block.block_hash().to_hex(), + })); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((hash, height)) => { + assert_eq!(hash, block.block_hash()); + assert!(height.is_none()); + }, + } + } + + #[test] + fn into_block_hash_from_json_response_with_unexpected_blocks_type() { + let block = genesis_block(Network::Bitcoin); + let response = JsonResponse(serde_json::json!({ + "bestblockhash": block.block_hash().to_hex(), + "blocks": "foo", + })); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON number"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_hash_from_json_response_with_invalid_height() { + let block = genesis_block(Network::Bitcoin); + let response = JsonResponse(serde_json::json!({ + "bestblockhash": block.block_hash().to_hex(), + "blocks": std::u64::MAX, + })); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid height"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn into_block_hash_from_json_response_with_height() { + let block = genesis_block(Network::Bitcoin); + let response = JsonResponse(serde_json::json!({ + "bestblockhash": block.block_hash().to_hex(), + "blocks": 1, + })); + match TryInto::<(BlockHash, Option)>::try_into(response) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((hash, height)) => { + assert_eq!(hash, block.block_hash()); + assert_eq!(height.unwrap(), 1); + }, + } + } +} diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs new file mode 100644 index 00000000000..0788bc989f5 --- /dev/null +++ b/lightning-block-sync/src/http.rs @@ -0,0 +1,767 @@ +use chunked_transfer; +use serde_json; + +use std::convert::TryFrom; +#[cfg(not(feature = "tokio"))] +use std::io::Write; +use std::net::ToSocketAddrs; +use std::time::Duration; + +#[cfg(feature = "tokio")] +use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt}; +#[cfg(feature = "tokio")] +use tokio::net::TcpStream; + +#[cfg(not(feature = "tokio"))] +use std::io::BufRead; +use std::io::Read; +#[cfg(not(feature = "tokio"))] +use std::net::TcpStream; + +/// Timeout for operations on TCP streams. +const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5); + +/// Maximum HTTP message header size in bytes. +const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192; + +/// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any +/// overhead for HTTP chunked transfer encoding. +const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000; + +/// Endpoint for interacting with an HTTP-based API. +#[derive(Debug)] +pub struct HttpEndpoint { + host: String, + port: Option, + path: String, +} + +impl HttpEndpoint { + /// Creates an endpoint for the given host and default HTTP port. + pub fn for_host(host: String) -> Self { + Self { + host, + port: None, + path: String::from("/"), + } + } + + /// Specifies a port to use with the endpoint. + pub fn with_port(mut self, port: u16) -> Self { + self.port = Some(port); + self + } + + /// Specifies a path to use with the endpoint. + pub fn with_path(mut self, path: String) -> Self { + self.path = path; + self + } + + /// Returns the endpoint host. + pub fn host(&self) -> &str { + &self.host + } + + /// Returns the endpoint port. + pub fn port(&self) -> u16 { + match self.port { + None => 80, + Some(port) => port, + } + } + + /// Returns the endpoint path. + pub fn path(&self) -> &str { + &self.path + } +} + +impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint { + type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter; + + fn to_socket_addrs(&self) -> std::io::Result { + (self.host(), self.port()).to_socket_addrs() + } +} + +/// Client for making HTTP requests. +pub(crate) struct HttpClient { + stream: TcpStream, +} + +impl HttpClient { + /// Opens a connection to an HTTP endpoint. + pub fn connect(endpoint: E) -> std::io::Result { + let address = match endpoint.to_socket_addrs()?.next() { + None => { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses")); + }, + Some(address) => address, + }; + let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?; + stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?; + stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?; + + #[cfg(feature = "tokio")] + let stream = { + stream.set_nonblocking(true)?; + TcpStream::from_std(stream)? + }; + + Ok(Self { stream }) + } + + /// Sends a `GET` request for a resource identified by `uri` at the `host`. + /// + /// Returns the response body in `F` format. + #[allow(dead_code)] + pub async fn get(&mut self, uri: &str, host: &str) -> std::io::Result + where F: TryFrom, Error = std::io::Error> { + let request = format!( + "GET {} HTTP/1.1\r\n\ + Host: {}\r\n\ + Connection: keep-alive\r\n\ + \r\n", uri, host); + let response_body = self.send_request_with_retry(&request).await?; + F::try_from(response_body) + } + + /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP + /// authentication credentials. + /// + /// The request body consists of the provided JSON `content`. Returns the response body in `F` + /// format. + #[allow(dead_code)] + pub async fn post(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result + where F: TryFrom, Error = std::io::Error> { + let content = content.to_string(); + let request = format!( + "POST {} HTTP/1.1\r\n\ + Host: {}\r\n\ + Authorization: {}\r\n\ + Connection: keep-alive\r\n\ + Content-Type: application/json\r\n\ + Content-Length: {}\r\n\ + \r\n\ + {}", uri, host, auth, content.len(), content); + let response_body = self.send_request_with_retry(&request).await?; + F::try_from(response_body) + } + + /// Sends an HTTP request message and reads the response, returning its body. Attempts to + /// reconnect and retry if the connection has been closed. + async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result> { + let endpoint = self.stream.peer_addr().unwrap(); + match self.send_request(request).await { + Ok(bytes) => Ok(bytes), + Err(e) => match e.kind() { + std::io::ErrorKind::ConnectionReset | + std::io::ErrorKind::ConnectionAborted | + std::io::ErrorKind::UnexpectedEof => { + // Reconnect if the connection was closed. This may happen if the server's + // keep-alive limits are reached. + *self = Self::connect(endpoint)?; + self.send_request(request).await + }, + _ => Err(e), + }, + } + } + + /// Sends an HTTP request message and reads the response, returning its body. + async fn send_request(&mut self, request: &str) -> std::io::Result> { + self.write_request(request).await?; + self.read_response().await + } + + /// Writes an HTTP request message. + async fn write_request(&mut self, request: &str) -> std::io::Result<()> { + #[cfg(feature = "tokio")] + { + self.stream.write_all(request.as_bytes()).await?; + self.stream.flush().await + } + #[cfg(not(feature = "tokio"))] + { + self.stream.write_all(request.as_bytes())?; + self.stream.flush() + } + } + + /// Reads an HTTP response message. + async fn read_response(&mut self) -> std::io::Result> { + #[cfg(feature = "tokio")] + let stream = self.stream.split().0; + #[cfg(not(feature = "tokio"))] + let stream = std::io::Read::by_ref(&mut self.stream); + + let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64); + + #[cfg(feature = "tokio")] + let mut reader = tokio::io::BufReader::new(limited_stream); + #[cfg(not(feature = "tokio"))] + let mut reader = std::io::BufReader::new(limited_stream); + + macro_rules! read_line { () => { { + let mut line = String::new(); + #[cfg(feature = "tokio")] + let bytes_read = reader.read_line(&mut line).await?; + #[cfg(not(feature = "tokio"))] + let bytes_read = reader.read_line(&mut line)?; + + match bytes_read { + 0 => None, + _ => { + // Remove trailing CRLF + if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } + Some(line) + }, + } + } } } + + // Read and parse status line + let status_line = read_line!() + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; + let status = HttpStatus::parse(&status_line)?; + + // Read and parse relevant headers + let mut message_length = HttpMessageLength::Empty; + loop { + let line = read_line!() + .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?; + if line.is_empty() { break; } + + let header = HttpHeader::parse(&line)?; + if header.has_name("Content-Length") { + let length = header.value.parse() + .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; + if let HttpMessageLength::Empty = message_length { + message_length = HttpMessageLength::ContentLength(length); + } + continue; + } + + if header.has_name("Transfer-Encoding") { + message_length = HttpMessageLength::TransferEncoding(header.value.into()); + continue; + } + } + + if !status.is_ok() { + // TODO: Handle 3xx redirection responses. + return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found")); + } + + // Read message body + let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len(); + reader.get_mut().set_limit(read_limit as u64); + match message_length { + HttpMessageLength::Empty => { Ok(Vec::new()) }, + HttpMessageLength::ContentLength(length) => { + if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE { + Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range")) + } else { + let mut content = vec![0; length]; + #[cfg(feature = "tokio")] + reader.read_exact(&mut content[..]).await?; + #[cfg(not(feature = "tokio"))] + reader.read_exact(&mut content[..])?; + Ok(content) + } + }, + HttpMessageLength::TransferEncoding(coding) => { + if !coding.eq_ignore_ascii_case("chunked") { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, "unsupported transfer coding")) + } else { + let mut content = Vec::new(); + #[cfg(feature = "tokio")] + { + // Since chunked_transfer doesn't have an async interface, only use it to + // determine the size of each chunk to read. + // + // TODO: Replace with an async interface when available. + // https://github.com/frewsxcv/rust-chunked-transfer/issues/7 + loop { + // Read the chunk header which contains the chunk size. + let mut chunk_header = String::new(); + reader.read_line(&mut chunk_header).await?; + if chunk_header == "0\r\n" { + // Read the terminator chunk since the decoder consumes the CRLF + // immediately when this chunk is encountered. + reader.read_line(&mut chunk_header).await?; + } + + // Decode the chunk header to obtain the chunk size. + let mut buffer = Vec::new(); + let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes()); + decoder.read_to_end(&mut buffer)?; + + // Read the chunk body. + let chunk_size = match decoder.remaining_chunks_size() { + None => break, + Some(chunk_size) => chunk_size, + }; + let chunk_offset = content.len(); + content.resize(chunk_offset + chunk_size + "\r\n".len(), 0); + reader.read_exact(&mut content[chunk_offset..]).await?; + content.resize(chunk_offset + chunk_size, 0); + } + Ok(content) + } + #[cfg(not(feature = "tokio"))] + { + let mut decoder = chunked_transfer::Decoder::new(reader); + decoder.read_to_end(&mut content)?; + Ok(content) + } + } + }, + } + } +} + +/// HTTP response status code as defined by [RFC 7231]. +/// +/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6 +struct HttpStatus<'a> { + code: &'a str, +} + +impl<'a> HttpStatus<'a> { + /// Parses an HTTP status line as defined by [RFC 7230]. + /// + /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2 + fn parse(line: &'a String) -> std::io::Result> { + let mut tokens = line.splitn(3, ' '); + + let http_version = tokens.next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?; + if !http_version.eq_ignore_ascii_case("HTTP/1.1") && + !http_version.eq_ignore_ascii_case("HTTP/1.0") { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version")); + } + + let code = tokens.next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?; + if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code")); + } + + let _reason = tokens.next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?; + + Ok(Self { code }) + } + + /// Returns whether the status is successful (i.e., 2xx status class). + fn is_ok(&self) -> bool { + self.code.starts_with('2') + } +} + +/// HTTP response header as defined by [RFC 7231]. +/// +/// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7 +struct HttpHeader<'a> { + name: &'a str, + value: &'a str, +} + +impl<'a> HttpHeader<'a> { + /// Parses an HTTP header field as defined by [RFC 7230]. + /// + /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2 + fn parse(line: &'a String) -> std::io::Result> { + let mut tokens = line.splitn(2, ':'); + let name = tokens.next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?; + let value = tokens.next() + .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))? + .trim_start(); + Ok(Self { name, value }) + } + + /// Returns whether the header field has the given name. + fn has_name(&self, name: &str) -> bool { + self.name.eq_ignore_ascii_case(name) + } +} + +/// HTTP message body length as defined by [RFC 7230]. +/// +/// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3 +enum HttpMessageLength { + Empty, + ContentLength(usize), + TransferEncoding(String), +} + +/// An HTTP response body in binary format. +pub(crate) struct BinaryResponse(pub(crate) Vec); + +/// An HTTP response body in JSON format. +pub(crate) struct JsonResponse(pub(crate) serde_json::Value); + +/// Interprets bytes from an HTTP response body as binary data. +impl TryFrom> for BinaryResponse { + type Error = std::io::Error; + + fn try_from(bytes: Vec) -> std::io::Result { + Ok(BinaryResponse(bytes)) + } +} + +/// Interprets bytes from an HTTP response body as a JSON value. +impl TryFrom> for JsonResponse { + type Error = std::io::Error; + + fn try_from(bytes: Vec) -> std::io::Result { + Ok(JsonResponse(serde_json::from_slice(&bytes)?)) + } +} + +#[cfg(test)] +mod endpoint_tests { + use super::HttpEndpoint; + + #[test] + fn with_default_port() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.port(), 80); + } + + #[test] + fn with_custom_port() { + let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.port(), 8080); + } + + #[test] + fn with_uri_path() { + let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.path(), "/path"); + } + + #[test] + fn without_uri_path() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + assert_eq!(endpoint.host(), "foo.com"); + assert_eq!(endpoint.path(), "/"); + } + + #[test] + fn convert_to_socket_addrs() { + let endpoint = HttpEndpoint::for_host("foo.com".into()); + let host = endpoint.host(); + let port = endpoint.port(); + + use std::net::ToSocketAddrs; + match (&endpoint).to_socket_addrs() { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(mut socket_addrs) => { + match socket_addrs.next() { + None => panic!("Expected socket address"), + Some(addr) => { + assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap()); + assert!(socket_addrs.next().is_none()); + } + } + } + } + } +} + +#[cfg(test)] +pub(crate) mod client_tests { + use super::*; + use std::io::BufRead; + use std::io::Write; + + /// Server for handling HTTP client requests with a stock response. + pub struct HttpServer { + address: std::net::SocketAddr, + handler: std::thread::JoinHandle<()>, + shutdown: std::sync::Arc, + } + + /// Body of HTTP response messages. + pub enum MessageBody { + Empty, + Content(T), + ChunkedContent(T), + } + + impl HttpServer { + pub fn responding_with_ok(body: MessageBody) -> Self { + let response = match body { + MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(), + MessageBody::Content(body) => { + let body = body.to_string(); + format!( + "HTTP/1.1 200 OK\r\n\ + Content-Length: {}\r\n\ + \r\n\ + {}", body.len(), body) + }, + MessageBody::ChunkedContent(body) => { + let mut chuncked_body = Vec::new(); + { + use chunked_transfer::Encoder; + let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8); + encoder.write_all(body.to_string().as_bytes()).unwrap(); + } + format!( + "HTTP/1.1 200 OK\r\n\ + Transfer-Encoding: chunked\r\n\ + \r\n\ + {}", String::from_utf8(chuncked_body).unwrap()) + }, + }; + HttpServer::responding_with(response) + } + + pub fn responding_with_not_found() -> Self { + let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string(); + HttpServer::responding_with(response) + } + + fn responding_with(response: String) -> Self { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + let address = listener.local_addr().unwrap(); + + let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)); + let shutdown_signaled = std::sync::Arc::clone(&shutdown); + let handler = std::thread::spawn(move || { + for stream in listener.incoming() { + let mut stream = stream.unwrap(); + stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap(); + + let lines_read = std::io::BufReader::new(&stream) + .lines() + .take_while(|line| !line.as_ref().unwrap().is_empty()) + .count(); + if lines_read == 0 { continue; } + + for chunk in response.as_bytes().chunks(16) { + if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) { + return; + } else { + if let Err(_) = stream.write(chunk) { break; } + if let Err(_) = stream.flush() { break; } + } + } + } + }); + + Self { address, handler, shutdown } + } + + fn shutdown(self) { + self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst); + self.handler.join().unwrap(); + } + + pub fn endpoint(&self) -> HttpEndpoint { + HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port()) + } + } + + #[test] + fn connect_to_unresolvable_host() { + match HttpClient::connect(("example.invalid", 80)) { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn connect_with_no_socket_address() { + match HttpClient::connect(&vec![][..]) { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput), + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn connect_with_unknown_server() { + match HttpClient::connect(("::", 80)) { + #[cfg(target_os = "windows")] + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable), + #[cfg(not(target_os = "windows"))] + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn connect_with_valid_endpoint() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + match HttpClient::connect(&server.endpoint()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + + #[tokio::test] + async fn read_empty_message() { + let server = HttpServer::responding_with("".to_string()); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no status line"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_incomplete_message() { + let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string()); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_too_large_message_headers() { + let response = format!( + "HTTP/1.1 302 Found\r\n\ + Location: {}\r\n\ + \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE)); + let server = HttpServer::responding_with(response); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof); + assert_eq!(e.get_ref().unwrap().to_string(), "no headers"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_too_large_message_body() { + let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1); + let server = HttpServer::responding_with_ok::(MessageBody::Content(body)); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "out of range"); + }, + Ok(_) => panic!("Expected error"), + } + server.shutdown(); + } + + #[tokio::test] + async fn read_message_with_unsupported_transfer_coding() { + let response = String::from( + "HTTP/1.1 200 OK\r\n\ + Transfer-Encoding: gzip\r\n\ + \r\n\ + foobar"); + let server = HttpServer::responding_with(response); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput); + assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn read_empty_message_body() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), + } + } + + #[tokio::test] + async fn read_message_body_with_length() { + let body = "foo bar baz qux".repeat(32); + let content = MessageBody::Content(body.clone()); + let server = HttpServer::responding_with_ok::(content); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), + } + } + + #[tokio::test] + async fn read_chunked_message_body() { + let body = "foo bar baz qux".repeat(32); + let chunked_content = MessageBody::ChunkedContent(body.clone()); + let server = HttpServer::responding_with_ok::(chunked_content); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()), + } + } + + #[tokio::test] + async fn reconnect_closed_connection() { + let server = HttpServer::responding_with_ok::(MessageBody::Empty); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + assert!(client.get::("/foo", "foo.com").await.is_ok()); + match client.get::("/foo", "foo.com").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(bytes) => assert_eq!(bytes.0, Vec::::new()), + } + } + + #[test] + fn from_bytes_into_binary_response() { + let bytes = b"foo"; + match BinaryResponse::try_from(bytes.to_vec()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(response) => assert_eq!(&response.0, bytes), + } + } + + #[test] + fn from_invalid_bytes_into_json_response() { + let json = serde_json::json!({ "result": 42 }); + match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) { + Err(_) => {}, + Ok(_) => panic!("Expected error"), + } + } + + #[test] + fn from_valid_bytes_into_json_response() { + let json = serde_json::json!({ "result": 42 }); + match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(response) => assert_eq!(response.0, json), + } + } +} diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs new file mode 100644 index 00000000000..58f77bdcaba --- /dev/null +++ b/lightning-block-sync/src/lib.rs @@ -0,0 +1,126 @@ +//! A lightweight client for keeping in sync with chain activity. +//! +//! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers +//! and data. +//! +//! Enabling feature `rest-client` or `rpc-client` allows configuring the client to fetch blocks +//! using Bitcoin Core's REST or RPC interface, respectively. +//! +//! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`, +//! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime. +//! +//! [`BlockSource`]: trait.BlockSource.html + +#[cfg(any(feature = "rest-client", feature = "rpc-client"))] +pub mod http; + +#[cfg(feature = "rest-client")] +pub mod rest; + +#[cfg(feature = "rpc-client")] +pub mod rpc; + +#[cfg(any(feature = "rest-client", feature = "rpc-client"))] +mod convert; + +#[cfg(any(feature = "rest-client", feature = "rpc-client"))] +mod utils; + +use bitcoin::blockdata::block::{Block, BlockHeader}; +use bitcoin::hash_types::BlockHash; +use bitcoin::util::uint::Uint256; + +use std::future::Future; +use std::pin::Pin; + +/// Abstract type for retrieving block headers and data. +pub trait BlockSource : Sync + Send { + /// Returns the header for a given hash. A height hint may be provided in case a block source + /// cannot easily find headers based on a hash. This is merely a hint and thus the returned + /// header must have the same hash as was requested. Otherwise, an error must be returned. + /// + /// Implementations that cannot find headers based on the hash should return a `Transient` error + /// when `height_hint` is `None`. + fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData>; + + /// Returns the block for a given hash. A headers-only block source should return a `Transient` + /// error. + fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; + + // TODO: Phrase in terms of `Poll` once added. + /// Returns the hash of the best block and, optionally, its height. When polling a block source, + /// the height is passed to `get_header` to allow for a more efficient lookup. + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)>; +} + +/// Result type for `BlockSource` requests. +type BlockSourceResult = Result; + +// TODO: Replace with BlockSourceResult once `async` trait functions are supported. For details, +// see: https://areweasyncyet.rs. +/// Result type for asynchronous `BlockSource` requests. +type AsyncBlockSourceResult<'a, T> = Pin> + 'a + Send>>; + +/// Error type for `BlockSource` requests. +/// +/// Transient errors may be resolved when re-polling, but no attempt will be made to re-poll on +/// persistent errors. +pub struct BlockSourceError { + kind: BlockSourceErrorKind, + error: Box, +} + +/// The kind of `BlockSourceError`, either persistent or transient. +#[derive(Clone, Copy)] +pub enum BlockSourceErrorKind { + /// Indicates an error that won't resolve when retrying a request (e.g., invalid data). + Persistent, + + /// Indicates an error that may resolve when retrying a request (e.g., unresponsive). + Transient, +} + +impl BlockSourceError { + /// Creates a new persistent error originated from the given error. + pub fn persistent(error: E) -> Self + where E: Into> { + Self { + kind: BlockSourceErrorKind::Persistent, + error: error.into(), + } + } + + /// Creates a new transient error originated from the given error. + pub fn transient(error: E) -> Self + where E: Into> { + Self { + kind: BlockSourceErrorKind::Transient, + error: error.into(), + } + } + + /// Returns the kind of error. + pub fn kind(&self) -> BlockSourceErrorKind { + self.kind + } + + /// Converts the error into the underlying error. + pub fn into_inner(self) -> Box { + self.error + } +} + +/// A block header and some associated data. This information should be available from most block +/// sources (and, notably, is available in Bitcoin Core's RPC and REST interfaces). +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct BlockHeaderData { + /// The block header itself. + pub header: BlockHeader, + + /// The block height where the genesis block has height 0. + pub height: u32, + + /// The total chain work in expected number of double-SHA256 hashes required to build a chain + /// of equivalent weight. + pub chainwork: Uint256, +} diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs new file mode 100644 index 00000000000..3c2e76e23d7 --- /dev/null +++ b/lightning-block-sync/src/rest.rs @@ -0,0 +1,110 @@ +use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse}; + +use bitcoin::blockdata::block::Block; +use bitcoin::hash_types::BlockHash; +use bitcoin::hashes::hex::ToHex; + +use std::convert::TryFrom; +use std::convert::TryInto; + +/// A simple REST client for requesting resources using HTTP `GET`. +pub struct RestClient { + endpoint: HttpEndpoint, + client: HttpClient, +} + +impl RestClient { + /// Creates a new REST client connected to the given endpoint. + /// + /// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest). + pub fn new(endpoint: HttpEndpoint) -> std::io::Result { + let client = HttpClient::connect(&endpoint)?; + Ok(Self { endpoint, client }) + } + + /// Requests a resource encoded in `F` format and interpreted as type `T`. + async fn request_resource(&mut self, resource_path: &str) -> std::io::Result + where F: TryFrom, Error = std::io::Error> + TryInto { + let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); + let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path); + self.client.get::(&uri, &host).await?.try_into() + } +} + +impl BlockSource for RestClient { + fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { + let resource_path = format!("headers/1/{}.json", header_hash.to_hex()); + Ok(self.request_resource::(&resource_path).await?) + }) + } + + fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { + let resource_path = format!("block/{}.bin", header_hash.to_hex()); + Ok(self.request_resource::(&resource_path).await?) + }) + } + + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + Box::pin(async move { + Ok(self.request_resource::("chaininfo.json").await?) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::BinaryResponse; + use crate::http::client_tests::{HttpServer, MessageBody}; + + /// Parses binary data as a string-encoded `u32`. + impl TryInto for BinaryResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + match std::str::from_utf8(&self.0) { + Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + Ok(s) => match u32::from_str_radix(s, 10) { + Err(e) => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, e)), + Ok(n) => Ok(n), + } + } + } + } + + #[tokio::test] + async fn request_unknown_resource() { + let server = HttpServer::responding_with_not_found(); + let mut client = RestClient::new(server.endpoint()).unwrap(); + + match client.request_resource::("/").await { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn request_malformed_resource() { + let server = HttpServer::responding_with_ok(MessageBody::Content("foo")); + let mut client = RestClient::new(server.endpoint()).unwrap(); + + match client.request_resource::("/").await { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn request_valid_resource() { + let server = HttpServer::responding_with_ok(MessageBody::Content(42)); + let mut client = RestClient::new(server.endpoint()).unwrap(); + + match client.request_resource::("/").await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(n) => assert_eq!(n, 42), + } + } +} diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs new file mode 100644 index 00000000000..34cbd2e02c0 --- /dev/null +++ b/lightning-block-sync/src/rpc.rs @@ -0,0 +1,197 @@ +use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::http::{HttpClient, HttpEndpoint, JsonResponse}; + +use bitcoin::blockdata::block::Block; +use bitcoin::hash_types::BlockHash; +use bitcoin::hashes::hex::ToHex; + +use serde_json; + +use std::convert::TryFrom; +use std::convert::TryInto; +use std::sync::atomic::{AtomicUsize, Ordering}; + +/// A simple RPC client for calling methods using HTTP `POST`. +pub struct RpcClient { + basic_auth: String, + endpoint: HttpEndpoint, + client: HttpClient, + id: AtomicUsize, +} + +impl RpcClient { + /// Creates a new RPC client connected to the given endpoint with the provided credentials. The + /// credentials should be a base64 encoding of a user name and password joined by a colon, as is + /// required for HTTP basic access authentication. + pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { + let client = HttpClient::connect(&endpoint)?; + Ok(Self { + basic_auth: "Basic ".to_string() + credentials, + endpoint, + client, + id: AtomicUsize::new(0), + }) + } + + /// Calls a method with the response encoded in JSON format and interpreted as type `T`. + async fn call_method(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result + where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { + let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); + let uri = self.endpoint.path(); + let content = serde_json::json!({ + "method": method, + "params": params, + "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() + }); + + let mut response = self.client.post::(&uri, &host, &self.basic_auth, content) + .await?.0; + if !response.is_object() { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); + } + + let error = &response["error"]; + if !error.is_null() { + // TODO: Examine error code for a more precise std::io::ErrorKind. + let message = error["message"].as_str().unwrap_or("unknown error"); + return Err(std::io::Error::new(std::io::ErrorKind::Other, message)); + } + + let result = &mut response["result"]; + if result.is_null() { + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")); + } + + JsonResponse(result.take()).try_into() + } +} + +impl BlockSource for RpcClient { + fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { + let header_hash = serde_json::json!(header_hash.to_hex()); + Ok(self.call_method("getblockheader", &[header_hash]).await?) + }) + } + + fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { + let header_hash = serde_json::json!(header_hash.to_hex()); + let verbosity = serde_json::json!(0); + Ok(self.call_method("getblock", &[header_hash, verbosity]).await?) + }) + } + + fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + Box::pin(async move { + Ok(self.call_method("getblockchaininfo", &[]).await?) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::http::client_tests::{HttpServer, MessageBody}; + + /// Credentials encoded in base64. + const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA=="; + + /// Converts a JSON value into `u64`. + impl TryInto for JsonResponse { + type Error = std::io::Error; + + fn try_into(self) -> std::io::Result { + match self.0.as_u64() { + None => Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "not a number")), + Some(n) => Ok(n), + } + } + } + + #[tokio::test] + async fn call_method_returning_unknown_response() { + let server = HttpServer::responding_with_not_found(); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound), + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_malfomred_response() { + let response = serde_json::json!("foo"); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON object"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_error() { + let response = serde_json::json!({ + "error": { "code": -8, "message": "invalid parameter" }, + }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + let invalid_block_hash = serde_json::json!("foo"); + match client.call_method::("getblock", &[invalid_block_hash]).await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_missing_result() { + let response = serde_json::json!({ "result": null }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "expected JSON result"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_malformed_result() { + let response = serde_json::json!({ "result": "foo" }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); + assert_eq!(e.get_ref().unwrap().to_string(), "not a number"); + }, + Ok(_) => panic!("Expected error"), + } + } + + #[tokio::test] + async fn call_method_returning_valid_result() { + let response = serde_json::json!({ "result": 654470 }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + + match client.call_method::("getblockcount", &[]).await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok(count) => assert_eq!(count, 654470), + } + } +} diff --git a/lightning-block-sync/src/utils.rs b/lightning-block-sync/src/utils.rs new file mode 100644 index 00000000000..96a2e578877 --- /dev/null +++ b/lightning-block-sync/src/utils.rs @@ -0,0 +1,54 @@ +use bitcoin::hashes::hex::FromHex; +use bitcoin::util::uint::Uint256; + +pub fn hex_to_uint256(hex: &str) -> Result { + let bytes = <[u8; 32]>::from_hex(hex)?; + Ok(Uint256::from_be_bytes(bytes)) +} + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::util::uint::Uint256; + + #[test] + fn hex_to_uint256_empty_str() { + assert!(hex_to_uint256("").is_err()); + } + + #[test] + fn hex_to_uint256_too_short_str() { + let hex = String::from_utf8(vec![b'0'; 32]).unwrap(); + assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 32))); + } + + #[test] + fn hex_to_uint256_too_long_str() { + let hex = String::from_utf8(vec![b'0'; 128]).unwrap(); + assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidLength(64, 128))); + } + + #[test] + fn hex_to_uint256_odd_length_str() { + let hex = String::from_utf8(vec![b'0'; 65]).unwrap(); + assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::OddLengthString(65))); + } + + #[test] + fn hex_to_uint256_invalid_char() { + let hex = String::from_utf8(vec![b'G'; 64]).unwrap(); + assert_eq!(hex_to_uint256(&hex), Err(bitcoin::hashes::hex::Error::InvalidChar(b'G'))); + } + + #[test] + fn hex_to_uint256_lowercase_str() { + let hex: String = std::iter::repeat("0123456789abcdef").take(4).collect(); + assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4])); + } + + #[test] + fn hex_to_uint256_uppercase_str() { + let hex: String = std::iter::repeat("0123456789ABCDEF").take(4).collect(); + assert_eq!(hex_to_uint256(&hex).unwrap(), Uint256([0x0123456789abcdefu64; 4])); + } +}