Skip to content

Add SqliteStore backend #100

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 6 additions & 48 deletions src/io/fs_store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
#[cfg(target_os = "windows")]
extern crate winapi;

use super::KVStore;
use super::*;

use std::collections::HashMap;
use std::fs;
use std::io::{BufReader, Read, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};

#[cfg(not(target_os = "windows"))]
Expand Down Expand Up @@ -45,7 +44,8 @@ pub struct FilesystemStore {
}

impl FilesystemStore {
pub(crate) fn new(dest_dir: PathBuf) -> Self {
pub(crate) fn new(mut dest_dir: PathBuf) -> Self {
dest_dir.push("fs_store");
let locks = Mutex::new(HashMap::new());
Self { dest_dir, locks }
}
Expand Down Expand Up @@ -248,34 +248,15 @@ impl Read for FilesystemReader {

impl KVStorePersister for FilesystemStore {
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;

let parent_directory = dest_file_path.parent().ok_or_else(|| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;
let namespace = parent_directory.display().to_string();

let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;
let key = dest_without_namespace.display().to_string();

self.write(&namespace, &key, &object.encode())?;
Ok(())
let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?;
self.write(&namespace, &key, &object.encode())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::utils::random_storage_path;
use lightning::util::persist::KVStorePersister;
use lightning::util::ser::Readable;

use proptest::prelude::*;
proptest! {
Expand All @@ -284,31 +265,8 @@ mod tests {
let rand_dir = random_storage_path();

let fs_store = FilesystemStore::new(rand_dir.into());
let namespace = "testspace";
let key = "testkey";

// Test the basic KVStore operations.
fs_store.write(namespace, key, &data).unwrap();

let listed_keys = fs_store.list(namespace).unwrap();
assert_eq!(listed_keys.len(), 1);
assert_eq!(listed_keys[0], "testkey");

let mut reader = fs_store.read(namespace, key).unwrap();
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
assert_eq!(data, read_data);

fs_store.remove(namespace, key).unwrap();

let listed_keys = fs_store.list(namespace).unwrap();
assert_eq!(listed_keys.len(), 0);

// Test KVStorePersister
let prefixed_key = format!("{}/{}", namespace, key);
fs_store.persist(&prefixed_key, &data).unwrap();
let mut reader = fs_store.read(namespace, key).unwrap();
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
assert_eq!(data, read_data);
do_read_write_remove_list_persist(&data, &fs_store);
}
}
}
62 changes: 62 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
//! Objects and traits for data persistence.

pub(crate) mod fs_store;
pub(crate) mod sqlite_store;
pub(crate) mod utils;

pub use fs_store::FilesystemStore;
pub use sqlite_store::SqliteStore;

use lightning::util::persist::KVStorePersister;

use std::io::Read;
use std::path::PathBuf;
use std::str::FromStr;

// The namespacs and keys LDK uses for persisting
pub(crate) const CHANNEL_MANAGER_PERSISTENCE_NAMESPACE: &str = "";
Expand Down Expand Up @@ -72,3 +76,61 @@ pub trait KVStore: KVStorePersister {
/// Will return an empty list if the `namespace` is unknown.
fn list(&self, namespace: &str) -> std::io::Result<Vec<String>>;
}

fn get_namespace_and_key_from_prefixed(
prefixed_key: &str,
) -> lightning::io::Result<(String, String)> {
let dest_file_path = PathBuf::from_str(prefixed_key).map_err(|_| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;

let parent_directory = dest_file_path.parent().ok_or_else(|| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;
let namespace = parent_directory.display().to_string();

let dest_without_namespace = dest_file_path.strip_prefix(&namespace).map_err(|_| {
let msg = format!("Could not persist file for key {}.", prefixed_key);
lightning::io::Error::new(lightning::io::ErrorKind::InvalidInput, msg)
})?;
let key = dest_without_namespace.display().to_string();

Ok((namespace, key))
}

#[cfg(test)]
fn do_read_write_remove_list_persist<K: KVStore>(data: &[u8; 32], kv_store: &K) {
use lightning::util::ser::Readable;

let namespace = "testspace";
let key = "testkey";

// Test the basic KVStore operations.
kv_store.write(namespace, key, data).unwrap();

// Test empty namespace is allowed, but not empty key.
kv_store.write("", key, data).unwrap();
assert!(kv_store.write(namespace, "", data).is_err());

let listed_keys = kv_store.list(namespace).unwrap();
assert_eq!(listed_keys.len(), 1);
assert_eq!(listed_keys[0], key);

let mut reader = kv_store.read(namespace, key).unwrap();
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
assert_eq!(*data, read_data);

kv_store.remove(namespace, key).unwrap();

let listed_keys = kv_store.list(namespace).unwrap();
assert_eq!(listed_keys.len(), 0);

// Test KVStorePersister
let prefixed_key = format!("{}/{}", namespace, key);
kv_store.persist(&prefixed_key, &data).unwrap();
let mut reader = kv_store.read(namespace, key).unwrap();
let read_data: [u8; 32] = Readable::read(&mut reader).unwrap();
assert_eq!(*data, read_data);
}
192 changes: 192 additions & 0 deletions src/io/sqlite_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
use super::*;

use lightning::util::persist::KVStorePersister;
use lightning::util::ser::Writeable;

use rusqlite::{named_params, Connection};

use std::fs;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};

// The database file name.
const SQLITE_DB_FILE: &str = "ldk_node.sqlite";

// The table in which we store all data.
const KV_TABLE_NAME: &str = "ldk_node_data";

// The current SQLite `user_version`, which we can use if we'd ever need to do a schema migration.
const SCHEMA_USER_VERSION: u16 = 1;

/// A [`KVStore`] implementation that writes to and reads from an [SQLite] database.
///
/// [SQLite]: https://sqlite.org
pub struct SqliteStore {
connection: Arc<Mutex<Connection>>,
}

impl SqliteStore {
pub(crate) fn new(dest_dir: PathBuf) -> Self {
fs::create_dir_all(dest_dir.clone()).unwrap_or_else(|_| {
panic!("Failed to create database destination directory: {}", dest_dir.display())
});
let mut db_file_path = dest_dir.clone();
db_file_path.push(SQLITE_DB_FILE);

let connection = Connection::open(db_file_path.clone()).unwrap_or_else(|_| {
panic!("Failed to open/create database file: {}", db_file_path.display())
});

connection
.pragma(Some(rusqlite::DatabaseName::Main), "user_version", SCHEMA_USER_VERSION, |_| {
Ok(())
})
.unwrap_or_else(|_| panic!("Failed to set PRAGMA user_version"));

let sql = format!(
"CREATE TABLE IF NOT EXISTS {} (
namespace TEXT NOT NULL,
key TEXT NOT NULL CHECK (key <> ''),
value BLOB, PRIMARY KEY ( namespace, key )
);",
KV_TABLE_NAME
);
connection
.execute(&sql, [])
.unwrap_or_else(|_| panic!("Failed to create table: {}", KV_TABLE_NAME));

let connection = Arc::new(Mutex::new(connection));
Self { connection }
}
}

impl KVStore for SqliteStore {
type Reader = Cursor<Vec<u8>>;

fn read(&self, namespace: &str, key: &str) -> std::io::Result<Self::Reader> {
let locked_conn = self.connection.lock().unwrap();
let sql =
format!("SELECT value FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);

let res = locked_conn
.query_row(
&sql,
named_params! {
":namespace": namespace,
":key": key,
},
|row| row.get(0),
)
.map_err(|e| match e {
rusqlite::Error::QueryReturnedNoRows => {
let msg =
format!("Failed to read as key could not be found: {}/{}", namespace, key);
std::io::Error::new(std::io::ErrorKind::NotFound, msg)
}
e => {
let msg = format!("Failed to read from key {}/{}: {}", namespace, key, e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
}
})?;
Ok(Cursor::new(res))
}

fn write(&self, namespace: &str, key: &str, buf: &[u8]) -> std::io::Result<()> {
let locked_conn = self.connection.lock().unwrap();

let sql = format!(
"INSERT OR REPLACE INTO {} (namespace, key, value) VALUES (:namespace, :key, :value);",
KV_TABLE_NAME
);

locked_conn
.execute(
&sql,
named_params! {
":namespace": namespace,
":key": key,
":value": buf,
},
)
.map(|_| ())
.map_err(|e| {
let msg = format!("Failed to write to key {}/{}: {}", namespace, key, e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
})
}

fn remove(&self, namespace: &str, key: &str) -> std::io::Result<bool> {
let locked_conn = self.connection.lock().unwrap();

let sql = format!("DELETE FROM {} WHERE namespace=:namespace AND key=:key;", KV_TABLE_NAME);
let changes = locked_conn
.execute(
&sql,
named_params! {
":namespace": namespace,
":key": key,
},
)
.map_err(|e| {
let msg = format!("Failed to delete key {}/{}: {}", namespace, key, e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
})?;

let was_present = changes != 0;

Ok(was_present)
}

fn list(&self, namespace: &str) -> std::io::Result<Vec<String>> {
let locked_conn = self.connection.lock().unwrap();

let sql = format!("SELECT key FROM {} WHERE namespace=:namespace", KV_TABLE_NAME);
let mut stmt = locked_conn.prepare(&sql).map_err(|e| {
let msg = format!("Failed to prepare statement: {}", e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
})?;

let mut keys = Vec::new();

let rows_iter = stmt
.query_map(named_params! {":namespace": namespace, }, |row| row.get(0))
.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
})?;

for k in rows_iter {
keys.push(k.map_err(|e| {
let msg = format!("Failed to retrieve queried rows: {}", e);
std::io::Error::new(std::io::ErrorKind::Other, msg)
})?);
}

Ok(keys)
}
}

impl KVStorePersister for SqliteStore {
fn persist<W: Writeable>(&self, prefixed_key: &str, object: &W) -> lightning::io::Result<()> {
let (namespace, key) = get_namespace_and_key_from_prefixed(prefixed_key)?;
self.write(&namespace, &key, &object.encode())
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::test::utils::random_storage_path;

use proptest::prelude::*;
proptest! {
#[test]
fn read_write_remove_list_persist(data in any::<[u8; 32]>()) {
let rand_dir = random_storage_path();
let sqlite_store = SqliteStore::new(rand_dir.into());

do_read_write_remove_list_persist(&data, &sqlite_store);
}
}
}
Loading