diff --git a/arrow-avro/Cargo.toml b/arrow-avro/Cargo.toml index e2280b251ff6..8db404923c30 100644 --- a/arrow-avro/Cargo.toml +++ b/arrow-avro/Cargo.toml @@ -56,6 +56,7 @@ bzip2 = { version = "0.6.0", optional = true } xz = { version = "0.1", default-features = false, optional = true } crc = { version = "3.0", optional = true } uuid = "1.17" +strum_macros = "0.27" [dev-dependencies] arrow-data = { workspace = true } diff --git a/arrow-avro/src/lib.rs b/arrow-avro/src/lib.rs index ae13c3861842..8087a908d673 100644 --- a/arrow-avro/src/lib.rs +++ b/arrow-avro/src/lib.rs @@ -33,10 +33,10 @@ /// Implements the primary reader interface and record decoding logic. pub mod reader; -// Avro schema parsing and representation -// -// Provides types for parsing and representing Avro schema definitions. -mod schema; +/// Avro schema parsing and representation +/// +/// Provides types for parsing and representing Avro schema definitions. +pub mod schema; /// Compression codec implementations for Avro /// diff --git a/arrow-avro/src/schema.rs b/arrow-avro/src/schema.rs index c3e4549c8c38..539e7b02f306 100644 --- a/arrow-avro/src/schema.rs +++ b/arrow-avro/src/schema.rs @@ -15,12 +15,28 @@ // specific language governing permissions and limitations // under the License. +use arrow_schema::ArrowError; use serde::{Deserialize, Serialize}; +use serde_json::{json, Value}; +use std::cmp::PartialEq; +use std::collections::hash_map::Entry; use std::collections::HashMap; +use strum_macros::AsRefStr; /// The metadata key used for storing the JSON encoded [`Schema`] pub const SCHEMA_METADATA_KEY: &str = "avro.schema"; +/// The Avro single‑object encoding “magic” bytes (`0xC3 0x01`) +pub const SINGLE_OBJECT_MAGIC: [u8; 2] = [0xC3, 0x01]; + +/// Compare two Avro schemas for equality (identical schemas). +/// Returns true if the schemas have the same parsing canonical form (i.e., logically identical). +pub fn compare_schemas(writer: &Schema, reader: &Schema) -> Result { + let canon_writer = generate_canonical_form(writer)?; + let canon_reader = generate_canonical_form(reader)?; + Ok(canon_writer == canon_reader) +} + /// Either a [`PrimitiveType`] or a reference to a previously defined named type /// /// @@ -39,8 +55,9 @@ pub enum TypeName<'a> { /// A primitive type /// /// -#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AsRefStr)] #[serde(rename_all = "camelCase")] +#[strum(serialize_all = "lowercase")] pub enum PrimitiveType { /// null: no value Null, @@ -260,6 +277,376 @@ pub struct Fixed<'a> { pub attributes: Attributes<'a>, } +/// A wrapper for an Avro schema in its JSON string representation. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub struct AvroSchema { + /// The Avro schema as a JSON string. + pub json_string: String, +} + +impl AvroSchema { + /// Creates a new `AvroSchema` from a JSON string. + pub fn new(json_string: String) -> Self { + Self { json_string } + } + + /// Deserializes and returns the `AvroSchema`. + /// + /// The returned schema borrows from `self`. + pub fn schema(&self) -> Result, ArrowError> { + serde_json::from_str(self.json_string.as_str()) + .map_err(|e| ArrowError::ParseError(format!("Invalid Avro schema JSON: {e}"))) + } + + /// Returns the Rabin fingerprint of the schema. + pub fn fingerprint(&self) -> Result { + generate_fingerprint_rabin(&self.schema()?) + } +} + +/// Supported fingerprint algorithms for Avro schema identification. +/// Currently only `Rabin` is supported, `SHA256` and `MD5` support will come in a future update +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Default)] +pub enum FingerprintAlgorithm { + /// 64‑bit CRC‑64‑AVRO Rabin fingerprint. + #[default] + Rabin, +} + +/// A schema fingerprint in one of the supported formats. +/// +/// This is used as the key inside `SchemaStore` `HashMap`. Each `SchemaStore` +/// instance always stores only one variant, matching its configured +/// `FingerprintAlgorithm`, but the enum makes the API uniform. +/// Currently only `Rabin` is supported +/// +/// +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +pub enum Fingerprint { + /// A 64-bit Rabin fingerprint. + Rabin(u64), +} + +/// Allow easy extraction of the algorithm used to create a fingerprint. +impl From<&Fingerprint> for FingerprintAlgorithm { + fn from(fp: &Fingerprint) -> Self { + match fp { + Fingerprint::Rabin(_) => FingerprintAlgorithm::Rabin, + } + } +} + +/// Generates a fingerprint for the given `Schema` using the specified `FingerprintAlgorithm`. +pub(crate) fn generate_fingerprint( + schema: &Schema, + hash_type: FingerprintAlgorithm, +) -> Result { + let canonical = generate_canonical_form(schema).map_err(|e| { + ArrowError::ComputeError(format!("Failed to generate canonical form for schema: {e}")) + })?; + match hash_type { + FingerprintAlgorithm::Rabin => { + Ok(Fingerprint::Rabin(compute_fingerprint_rabin(&canonical))) + } + } +} + +/// Generates the 64-bit Rabin fingerprint for the given `Schema`. +/// +/// The fingerprint is computed from the canonical form of the schema. +/// This is also known as `CRC-64-AVRO`. +/// +/// # Returns +/// A `Fingerprint::Rabin` variant containing the 64-bit fingerprint. +pub fn generate_fingerprint_rabin(schema: &Schema) -> Result { + generate_fingerprint(schema, FingerprintAlgorithm::Rabin) +} + +/// Generates the Parsed Canonical Form for the given [`Schema`]. +/// +/// The canonical form is a standardized JSON representation of the schema, +/// primarily used for generating a schema fingerprint for equality checking. +/// +/// This form strips attributes that do not affect the schema's identity, +/// such as `doc` fields, `aliases`, and any properties not defined in the +/// Avro specification. +/// +/// +pub fn generate_canonical_form(schema: &Schema) -> Result { + build_canonical(schema, None) +} + +/// An in-memory cache of Avro schemas, indexed by their fingerprint. +/// +/// `SchemaStore` provides a mechanism to store and retrieve Avro schemas efficiently. +/// Each schema is associated with a unique [`Fingerprint`], which is generated based +/// on the schema's canonical form and a specific hashing algorithm. +/// +/// A `SchemaStore` instance is configured to use a single [`FingerprintAlgorithm`] such as Rabin, +/// MD5 (not yet supported), or SHA256 (not yet supported) for all its operations. +/// This ensures consistency when generating fingerprints and looking up schemas. +/// All schemas registered will have their fingerprint computed with this algorithm, and +/// lookups must use a matching fingerprint. +/// +/// # Examples +/// +/// ```no_run +/// // Create a new store with the default Rabin fingerprinting. +/// use arrow_avro::schema::{AvroSchema, SchemaStore}; +/// +/// let mut store = SchemaStore::new(); +/// let schema = AvroSchema::new("\"string\"".to_string()); +/// // Register the schema to get its fingerprint. +/// let fingerprint = store.register(schema.clone()).unwrap(); +/// // Use the fingerprint to look up the schema. +/// let retrieved_schema = store.lookup(&fingerprint).cloned(); +/// assert_eq!(retrieved_schema, Some(schema)); +/// ``` +#[derive(Debug, Clone, Default)] +pub struct SchemaStore { + /// The hashing algorithm used for generating fingerprints. + fingerprint_algorithm: FingerprintAlgorithm, + /// A map from a schema's fingerprint to the schema itself. + schemas: HashMap, +} + +impl TryFrom<&[AvroSchema]> for SchemaStore { + type Error = ArrowError; + + /// Creates a `SchemaStore` from a slice of schemas. + /// Each schema in the slice is registered with the new store. + fn try_from(schemas: &[AvroSchema]) -> Result { + let mut store = SchemaStore::new(); + for schema in schemas { + store.register(schema.clone())?; + } + Ok(store) + } +} + +impl SchemaStore { + /// Creates an empty `SchemaStore` using the default fingerprinting algorithm (64-bit Rabin). + pub fn new() -> Self { + Self::default() + } + + /// Registers a schema with the store and returns its fingerprint. + /// + /// A fingerprint is calculated for the given schema using the store's configured + /// hash type. If a schema with the same fingerprint does not already exist in the + /// store, the new schema is inserted. If the fingerprint already exists, the + /// existing schema is not overwritten. + /// + /// # Arguments + /// + /// * `schema` - The `AvroSchema` to register. + /// + /// # Returns + /// + /// A `Result` containing the `Fingerprint` of the schema if successful, + /// or an `ArrowError` on failure. + pub fn register(&mut self, schema: AvroSchema) -> Result { + let fingerprint = generate_fingerprint(&schema.schema()?, self.fingerprint_algorithm)?; + match self.schemas.entry(fingerprint) { + Entry::Occupied(entry) => { + if entry.get() != &schema { + return Err(ArrowError::ComputeError(format!( + "Schema fingerprint collision detected for fingerprint {fingerprint:?}" + ))); + } + } + Entry::Vacant(entry) => { + entry.insert(schema); + } + } + Ok(fingerprint) + } + + /// Looks up a schema by its `Fingerprint`. + /// + /// # Arguments + /// + /// * `fingerprint` - A reference to the `Fingerprint` of the schema to look up. + /// + /// # Returns + /// + /// An `Option` containing a clone of the `AvroSchema` if found, otherwise `None`. + pub fn lookup(&self, fingerprint: &Fingerprint) -> Option<&AvroSchema> { + self.schemas.get(fingerprint) + } + + /// Returns a `Vec` containing **all unique [`Fingerprint`]s** currently + /// held by this [`SchemaStore`]. + /// + /// The order of the returned fingerprints is unspecified and should not be + /// relied upon. + pub fn fingerprints(&self) -> Vec { + self.schemas.keys().copied().collect() + } + + /// Returns the `FingerprintAlgorithm` used by the `SchemaStore` for fingerprinting. + pub(crate) fn fingerprint_algorithm(&self) -> FingerprintAlgorithm { + self.fingerprint_algorithm + } +} + +fn quote(s: &str) -> Result { + serde_json::to_string(s) + .map_err(|e| ArrowError::ComputeError(format!("Failed to quote string: {e}"))) +} + +// Avro names are defined by a `name` and an optional `namespace`. +// The full name is composed of the namespace and the name, separated by a dot. +// +// Avro specification defines two ways to specify a full name: +// 1. The `name` attribute contains the full name (e.g., "a.b.c.d"). +// In this case, the `namespace` attribute is ignored. +// 2. The `name` attribute contains the simple name (e.g., "d") and the +// `namespace` attribute contains the namespace (e.g., "a.b.c"). +// +// Each part of the name must match the regex `^[A-Za-z_][A-Za-z0-9_]*$`. +// Complex paths with quotes or backticks like `a."hi".b` are not supported. +// +// This function constructs the full name and extracts the namespace, +// handling both ways of specifying the name. It prioritizes a namespace +// defined within the `name` attribute itself, then the explicit `namespace_attr`, +// and finally the `enclosing_ns`. +fn make_full_name( + name: &str, + namespace_attr: Option<&str>, + enclosing_ns: Option<&str>, +) -> (String, Option) { + // `name` already contains a dot then treat as full-name, ignore namespace. + if let Some((ns, _)) = name.rsplit_once('.') { + return (name.to_string(), Some(ns.to_string())); + } + match namespace_attr.or(enclosing_ns) { + Some(ns) => (format!("{ns}.{name}"), Some(ns.to_string())), + None => (name.to_string(), None), + } +} + +fn build_canonical(schema: &Schema, enclosing_ns: Option<&str>) -> Result { + Ok(match schema { + Schema::TypeName(tn) | Schema::Type(Type { r#type: tn, .. }) => match tn { + TypeName::Primitive(pt) => quote(pt.as_ref())?, + TypeName::Ref(name) => { + let (full_name, _) = make_full_name(name, None, enclosing_ns); + quote(&full_name)? + } + }, + Schema::Union(branches) => format!( + "[{}]", + branches + .iter() + .map(|b| build_canonical(b, enclosing_ns)) + .collect::, _>>()? + .join(",") + ), + Schema::Complex(ct) => match ct { + ComplexType::Record(r) => { + let (full_name, child_ns) = make_full_name(r.name, r.namespace, enclosing_ns); + let fields = r + .fields + .iter() + .map(|f| { + let field_type = + build_canonical(&f.r#type, child_ns.as_deref().or(enclosing_ns))?; + Ok(format!( + r#"{{"name":{},"type":{}}}"#, + quote(f.name)?, + field_type + )) + }) + .collect::, ArrowError>>()? + .join(","); + format!( + r#"{{"name":{},"type":"record","fields":[{fields}]}}"#, + quote(&full_name)?, + ) + } + ComplexType::Enum(e) => { + let (full_name, _) = make_full_name(e.name, e.namespace, enclosing_ns); + let symbols = e + .symbols + .iter() + .map(|s| quote(s)) + .collect::, _>>()? + .join(","); + format!( + r#"{{"name":{},"type":"enum","symbols":[{symbols}]}}"#, + quote(&full_name)? + ) + } + ComplexType::Array(arr) => format!( + r#"{{"type":"array","items":{}}}"#, + build_canonical(&arr.items, enclosing_ns)? + ), + ComplexType::Map(map) => format!( + r#"{{"type":"map","values":{}}}"#, + build_canonical(&map.values, enclosing_ns)? + ), + ComplexType::Fixed(f) => { + let (full_name, _) = make_full_name(f.name, f.namespace, enclosing_ns); + format!( + r#"{{"name":{},"type":"fixed","size":{}}}"#, + quote(&full_name)?, + f.size + ) + } + }, + }) +} + +/// 64‑bit Rabin fingerprint as described in the Avro spec. +const EMPTY: u64 = 0xc15d_213a_a4d7_a795; + +/// Build one entry of the polynomial‑division table. +/// +/// We cannot yet write `for _ in 0..8` here: `for` loops rely on +/// `Iterator::next`, which is not `const` on stable Rust. Until the +/// `const_for` feature (tracking issue #87575) is stabilized, a `while` +/// loop is the only option in a `const fn` +const fn one_entry(i: usize) -> u64 { + let mut fp = i as u64; + let mut j = 0; + while j < 8 { + fp = (fp >> 1) ^ (EMPTY & (0u64.wrapping_sub(fp & 1))); + j += 1; + } + fp +} + +/// Build the full 256‑entry table at compile time. +/// +/// We cannot yet write `for _ in 0..256` here: `for` loops rely on +/// `Iterator::next`, which is not `const` on stable Rust. Until the +/// `const_for` feature (tracking issue #87575) is stabilized, a `while` +/// loop is the only option in a `const fn` +const fn build_table() -> [u64; 256] { + let mut table = [0u64; 256]; + let mut i = 0; + while i < 256 { + table[i] = one_entry(i); + i += 1; + } + table +} + +/// The pre‑computed table. +static FINGERPRINT_TABLE: [u64; 256] = build_table(); + +/// Computes the 64-bit Rabin fingerprint for a given canonical schema string. +/// This implementation is based on the Avro specification for schema fingerprinting. +pub(crate) fn compute_fingerprint_rabin(canonical_form: &str) -> u64 { + let mut fp = EMPTY; + for &byte in canonical_form.as_bytes() { + let idx = ((fp as u8) ^ byte) as usize; + fp = (fp >> 8) ^ FINGERPRINT_TABLE[idx]; + } + fp +} + #[cfg(test)] mod tests { use super::*; @@ -267,6 +654,34 @@ mod tests { use arrow_schema::{DataType, Fields, TimeUnit}; use serde_json::json; + fn int_schema() -> Schema<'static> { + Schema::TypeName(TypeName::Primitive(PrimitiveType::Int)) + } + + fn record_schema() -> Schema<'static> { + Schema::Complex(ComplexType::Record(Record { + name: "record1", + namespace: Some("test.namespace"), + doc: Some("A test record"), + aliases: vec![], + fields: vec![ + Field { + name: "field1", + doc: Some("An integer field"), + r#type: int_schema(), + default: None, + }, + Field { + name: "field2", + doc: None, + r#type: Schema::TypeName(TypeName::Primitive(PrimitiveType::String)), + default: None, + }, + ], + attributes: Attributes::default(), + })) + } + #[test] fn test_deserialize() { let t: Schema = serde_json::from_str("\"string\"").unwrap(); @@ -562,4 +977,147 @@ mod tests { })) ); } + + #[test] + fn test_new_schema_store() { + let store = SchemaStore::new(); + assert!(store.schemas.is_empty()); + } + + #[test] + fn test_try_from_schemas_rabin() { + let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); + let schemas = vec![int_avro_schema.clone(), record_avro_schema.clone()]; + let store = SchemaStore::try_from(schemas.as_slice()).unwrap(); + let int_fp = int_avro_schema.fingerprint().unwrap(); + assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); + let rec_fp = record_avro_schema.fingerprint().unwrap(); + assert_eq!(store.lookup(&rec_fp).cloned(), Some(record_avro_schema)); + } + + #[test] + fn test_try_from_with_duplicates() { + let int_avro_schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let record_avro_schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); + let schemas = vec![ + int_avro_schema.clone(), + record_avro_schema, + int_avro_schema.clone(), + ]; + let store = SchemaStore::try_from(schemas.as_slice()).unwrap(); + assert_eq!(store.schemas.len(), 2); + let int_fp = int_avro_schema.fingerprint().unwrap(); + assert_eq!(store.lookup(&int_fp).cloned(), Some(int_avro_schema)); + } + + #[test] + fn test_register_and_lookup_rabin() { + let mut store = SchemaStore::new(); + let schema = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let fp_enum = store.register(schema.clone()).unwrap(); + let Fingerprint::Rabin(fp_val) = fp_enum; + assert_eq!( + store.lookup(&Fingerprint::Rabin(fp_val)).cloned(), + Some(schema.clone()) + ); + assert!(store + .lookup(&Fingerprint::Rabin(fp_val.wrapping_add(1))) + .is_none()); + } + + #[test] + fn test_register_duplicate_schema() { + let mut store = SchemaStore::new(); + let schema1 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let schema2 = AvroSchema::new(serde_json::to_string(&int_schema()).unwrap()); + let fingerprint1 = store.register(schema1).unwrap(); + let fingerprint2 = store.register(schema2).unwrap(); + assert_eq!(fingerprint1, fingerprint2); + assert_eq!(store.schemas.len(), 1); + } + + #[test] + fn test_canonical_form_generation_primitive() { + let schema = int_schema(); + let canonical_form = generate_canonical_form(&schema).unwrap(); + assert_eq!(canonical_form, r#""int""#); + } + + #[test] + fn test_canonical_form_generation_record() { + let schema = record_schema(); + let expected_canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#; + let canonical_form = generate_canonical_form(&schema).unwrap(); + assert_eq!(canonical_form, expected_canonical_form); + } + + #[test] + fn test_fingerprint_calculation() { + let canonical_form = r#"{"fields":[{"name":"a","type":"long"},{"name":"b","type":"string"}],"name":"test","type":"record"}"#; + let expected_fingerprint = 10505236152925314060; + let fingerprint = compute_fingerprint_rabin(canonical_form); + assert_eq!(fingerprint, expected_fingerprint); + } + + #[test] + fn test_register_and_lookup_complex_schema() { + let mut store = SchemaStore::new(); + let schema = AvroSchema::new(serde_json::to_string(&record_schema()).unwrap()); + let canonical_form = r#"{"name":"test.namespace.record1","type":"record","fields":[{"name":"field1","type":"int"},{"name":"field2","type":"string"}]}"#; + let expected_fingerprint = + Fingerprint::Rabin(super::compute_fingerprint_rabin(canonical_form)); + let fingerprint = store.register(schema.clone()).unwrap(); + assert_eq!(fingerprint, expected_fingerprint); + let looked_up = store.lookup(&fingerprint).cloned(); + assert_eq!(looked_up, Some(schema)); + } + + #[test] + fn test_fingerprints_returns_all_keys() { + let mut store = SchemaStore::new(); + let fp_int = store + .register(AvroSchema::new( + serde_json::to_string(&int_schema()).unwrap(), + )) + .unwrap(); + let fp_record = store + .register(AvroSchema::new( + serde_json::to_string(&record_schema()).unwrap(), + )) + .unwrap(); + let fps = store.fingerprints(); + assert_eq!(fps.len(), 2); + assert!(fps.contains(&fp_int)); + assert!(fps.contains(&fp_record)); + } + + #[test] + fn test_canonical_form_strips_attributes() { + let schema_with_attrs = Schema::Complex(ComplexType::Record(Record { + name: "record_with_attrs", + namespace: None, + doc: Some("This doc should be stripped"), + aliases: vec!["alias1", "alias2"], + fields: vec![Field { + name: "f1", + doc: Some("field doc"), + r#type: Schema::Type(Type { + r#type: TypeName::Primitive(PrimitiveType::Bytes), + attributes: Attributes { + logical_type: Some("decimal"), + additional: HashMap::from([("precision", json!(4))]), + }, + }), + default: None, + }], + attributes: Attributes { + logical_type: None, + additional: HashMap::from([("custom_attr", json!("value"))]), + }, + })); + let expected_canonical_form = r#"{"name":"record_with_attrs","type":"record","fields":[{"name":"f1","type":"bytes"}]}"#; + let canonical_form = generate_canonical_form(&schema_with_attrs).unwrap(); + assert_eq!(canonical_form, expected_canonical_form); + } }