1717
1818//! "crypto" DataFusion functions
1919
20- use arrow:: array:: StringArray ;
2120use arrow:: array:: { Array , ArrayRef , BinaryArray , OffsetSizeTrait } ;
21+ use arrow:: array:: { AsArray , GenericStringArray , StringArray , StringViewArray } ;
2222use arrow:: datatypes:: DataType ;
2323use blake2:: { Blake2b512 , Blake2s256 , Digest } ;
2424use blake3:: Hasher as Blake3 ;
2525use datafusion_common:: cast:: as_binary_array;
2626
27+ use arrow:: compute:: StringArrayType ;
2728use datafusion_common:: plan_err;
2829use datafusion_common:: {
29- cast:: { as_generic_binary_array, as_generic_string_array } ,
30- exec_err , internal_err , DataFusionError , Result , ScalarValue ,
30+ cast:: as_generic_binary_array, exec_err , internal_err , DataFusionError , Result ,
31+ ScalarValue ,
3132} ;
3233use datafusion_expr:: ColumnarValue ;
3334use md5:: Md5 ;
@@ -121,9 +122,9 @@ pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
121122 }
122123 let digest_algorithm = match & args[ 1 ] {
123124 ColumnarValue :: Scalar ( scalar) => match scalar {
124- ScalarValue :: Utf8 ( Some ( method) ) | ScalarValue :: LargeUtf8 ( Some ( method ) ) => {
125- method . parse :: < DigestAlgorithm > ( )
126- }
125+ ScalarValue :: Utf8View ( Some ( method) )
126+ | ScalarValue :: Utf8 ( Some ( method ) )
127+ | ScalarValue :: LargeUtf8 ( Some ( method ) ) => method . parse :: < DigestAlgorithm > ( ) ,
127128 other => exec_err ! ( "Unsupported data type {other:?} for function digest" ) ,
128129 } ,
129130 ColumnarValue :: Array ( _) => {
@@ -132,6 +133,7 @@ pub fn digest(args: &[ColumnarValue]) -> Result<ColumnarValue> {
132133 } ?;
133134 digest_process ( & args[ 0 ] , digest_algorithm)
134135}
136+
135137impl FromStr for DigestAlgorithm {
136138 type Err = DataFusionError ;
137139 fn from_str ( name : & str ) -> Result < DigestAlgorithm > {
@@ -166,12 +168,14 @@ impl FromStr for DigestAlgorithm {
166168 } )
167169 }
168170}
171+
169172impl fmt:: Display for DigestAlgorithm {
170173 fn fmt ( & self , f : & mut fmt:: Formatter ) -> fmt:: Result {
171174 write ! ( f, "{}" , format!( "{self:?}" ) . to_lowercase( ) )
172175 }
173176}
174- // /// computes md5 hash digest of the given input
177+
178+ /// computes md5 hash digest of the given input
175179pub fn md5 ( args : & [ ColumnarValue ] ) -> Result < ColumnarValue > {
176180 if args. len ( ) != 1 {
177181 return exec_err ! (
@@ -180,7 +184,9 @@ pub fn md5(args: &[ColumnarValue]) -> Result<ColumnarValue> {
180184 DigestAlgorithm :: Md5
181185 ) ;
182186 }
187+
183188 let value = digest_process ( & args[ 0 ] , DigestAlgorithm :: Md5 ) ?;
189+
184190 // md5 requires special handling because of its unique utf8 return type
185191 Ok ( match value {
186192 ColumnarValue :: Array ( array) => {
@@ -214,7 +220,8 @@ pub fn utf8_or_binary_to_binary_type(
214220 name : & str ,
215221) -> Result < DataType > {
216222 Ok ( match arg_type {
217- DataType :: LargeUtf8
223+ DataType :: Utf8View
224+ | DataType :: LargeUtf8
218225 | DataType :: Utf8
219226 | DataType :: Binary
220227 | DataType :: LargeBinary => DataType :: Binary ,
@@ -296,8 +303,30 @@ impl DigestAlgorithm {
296303 where
297304 T : OffsetSizeTrait ,
298305 {
299- let input_value = as_generic_string_array :: < T > ( value) ?;
300- let array: ArrayRef = match self {
306+ let array = match value. data_type ( ) {
307+ DataType :: Utf8 | DataType :: LargeUtf8 => {
308+ let v = value. as_string :: < T > ( ) ;
309+ self . digest_utf8_array_impl :: < & GenericStringArray < T > > ( v)
310+ }
311+ DataType :: Utf8View => {
312+ let v = value. as_string_view ( ) ;
313+ self . digest_utf8_array_impl :: < & StringViewArray > ( v)
314+ }
315+ other => {
316+ return exec_err ! ( "unsupported type for digest_utf_array: {other:?}" )
317+ }
318+ } ;
319+ Ok ( ColumnarValue :: Array ( array) )
320+ }
321+
322+ pub fn digest_utf8_array_impl < ' a , StringArrType > (
323+ self ,
324+ input_value : StringArrType ,
325+ ) -> ArrayRef
326+ where
327+ StringArrType : StringArrayType < ' a > ,
328+ {
329+ match self {
301330 Self :: Md5 => digest_to_array ! ( Md5 , input_value) ,
302331 Self :: Sha224 => digest_to_array ! ( Sha224 , input_value) ,
303332 Self :: Sha256 => digest_to_array ! ( Sha256 , input_value) ,
@@ -318,8 +347,7 @@ impl DigestAlgorithm {
318347 . collect ( ) ;
319348 Arc :: new ( binary_array)
320349 }
321- } ;
322- Ok ( ColumnarValue :: Array ( array) )
350+ }
323351 }
324352}
325353pub fn digest_process (
@@ -328,6 +356,7 @@ pub fn digest_process(
328356) -> Result < ColumnarValue > {
329357 match value {
330358 ColumnarValue :: Array ( a) => match a. data_type ( ) {
359+ DataType :: Utf8View => digest_algorithm. digest_utf8_array :: < i32 > ( a. as_ref ( ) ) ,
331360 DataType :: Utf8 => digest_algorithm. digest_utf8_array :: < i32 > ( a. as_ref ( ) ) ,
332361 DataType :: LargeUtf8 => digest_algorithm. digest_utf8_array :: < i64 > ( a. as_ref ( ) ) ,
333362 DataType :: Binary => digest_algorithm. digest_binary_array :: < i32 > ( a. as_ref ( ) ) ,
@@ -339,7 +368,9 @@ pub fn digest_process(
339368 ) ,
340369 } ,
341370 ColumnarValue :: Scalar ( scalar) => match scalar {
342- ScalarValue :: Utf8 ( a) | ScalarValue :: LargeUtf8 ( a) => {
371+ ScalarValue :: Utf8View ( a)
372+ | ScalarValue :: Utf8 ( a)
373+ | ScalarValue :: LargeUtf8 ( a) => {
343374 Ok ( digest_algorithm
344375 . digest_scalar ( a. as_ref ( ) . map ( |s : & String | s. as_bytes ( ) ) ) )
345376 }
0 commit comments