-
Notifications
You must be signed in to change notification settings - Fork 1.7k
feat(spark): implement Spark try_parse_url function
#17485
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
e995629
impl try_parse_url spark function
rafafrdz 8869878
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 54e0125
suggestions
rafafrdz 944d204
fix parse_url
rafafrdz bf39835
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 87405e2
fix parse_url
rafafrdz 532bd38
fix parse_url
rafafrdz 82f5a9b
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 37dc796
suggestions
rafafrdz 36ead8b
suggestions
rafafrdz a07f7eb
suggestions
rafafrdz 80e7259
suggestions
rafafrdz b58b4c1
edit
rafafrdz 3cc0cbe
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 47d9a21
tests and clippy
rafafrdz 1e40033
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 2e0cc6a
suggestions and tests
rafafrdz ae48283
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 6904c7d
Merge branch 'main' into feat/try_parse_url
Jefffrey 60488a6
Merge branch 'main' into feat/try_parse_url
rafafrdz 8626fdb
fixing parse_url
rafafrdz 3c97788
Merge remote-tracking branch 'upstream/main' into feat/try_parse_url
rafafrdz 80e2dbc
fixing parse_url
rafafrdz e70efa3
Merge branch 'main' into feat/try_parse_url
alamb File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,13 +26,13 @@ use arrow::datatypes::DataType; | |
| use datafusion_common::cast::{ | ||
| as_large_string_array, as_string_array, as_string_view_array, | ||
| }; | ||
| use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result}; | ||
| use datafusion_common::{exec_datafusion_err, exec_err, Result}; | ||
| use datafusion_expr::{ | ||
| ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, | ||
| Volatility, | ||
| }; | ||
| use datafusion_functions::utils::make_scalar_function; | ||
| use url::Url; | ||
| use url::{ParseError, Url}; | ||
|
|
||
| #[derive(Debug, PartialEq, Eq, Hash)] | ||
| pub struct ParseUrl { | ||
|
|
@@ -49,20 +49,7 @@ impl ParseUrl { | |
| pub fn new() -> Self { | ||
| Self { | ||
| signature: Signature::one_of( | ||
| vec![ | ||
| TypeSignature::Uniform( | ||
| 1, | ||
| vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], | ||
| ), | ||
| TypeSignature::Uniform( | ||
| 2, | ||
| vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], | ||
| ), | ||
| TypeSignature::Uniform( | ||
| 3, | ||
| vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8], | ||
| ), | ||
| ], | ||
| vec![TypeSignature::String(2), TypeSignature::String(3)], | ||
| Volatility::Immutable, | ||
| ), | ||
| } | ||
|
|
@@ -95,11 +82,22 @@ impl ParseUrl { | |
| /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed | ||
| /// | ||
| fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> { | ||
| Url::parse(value) | ||
| .map_err(|e| exec_datafusion_err!("{e:?}")) | ||
| let url: std::result::Result<Url, ParseError> = Url::parse(value); | ||
| if let Err(ParseError::RelativeUrlWithoutBase) = url { | ||
| return if !value.contains("://") { | ||
| Ok(None) | ||
| } else { | ||
| Err(exec_datafusion_err!("The url is invalid: {value}. Use `try_parse_url` to tolerate invalid URL and return NULL instead. SQLSTATE: 22P02")) | ||
| }; | ||
| }; | ||
| url.map_err(|e| exec_datafusion_err!("{e:?}")) | ||
| .map(|url| match part { | ||
| "HOST" => url.host_str().map(String::from), | ||
| "PATH" => Some(url.path().to_string()), | ||
| "PATH" => { | ||
| let path: String = url.path().to_string(); | ||
| let path: String = if path == "/" { "".to_string() } else { path }; | ||
| Some(path) | ||
| } | ||
| "QUERY" => match key { | ||
| None => url.query().map(String::from), | ||
| Some(key) => url | ||
|
|
@@ -146,35 +144,7 @@ impl ScalarUDFImpl for ParseUrl { | |
| } | ||
|
|
||
| fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> { | ||
| if arg_types.len() < 2 || arg_types.len() > 3 { | ||
| return plan_err!( | ||
| "{} expects 2 or 3 arguments, but got {}", | ||
| self.name(), | ||
| arg_types.len() | ||
| ); | ||
| } | ||
| match arg_types.len() { | ||
| 2 | 3 => { | ||
| if arg_types | ||
| .iter() | ||
| .any(|arg| matches!(arg, DataType::LargeUtf8)) | ||
| { | ||
| Ok(DataType::LargeUtf8) | ||
| } else if arg_types | ||
| .iter() | ||
| .any(|arg| matches!(arg, DataType::Utf8View)) | ||
| { | ||
| Ok(DataType::Utf8View) | ||
| } else { | ||
| Ok(DataType::Utf8) | ||
| } | ||
| } | ||
| _ => plan_err!( | ||
| "`{}` expects 2 or 3 arguments, got {}", | ||
| &self.name(), | ||
| arg_types.len() | ||
| ), | ||
| } | ||
| Ok(arg_types[0].clone()) | ||
| } | ||
|
|
||
| fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> { | ||
|
|
@@ -200,6 +170,13 @@ impl ScalarUDFImpl for ParseUrl { | |
| /// - The output array type (StringArray or LargeStringArray) is determined by input types | ||
| /// | ||
| fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> { | ||
| spark_handled_parse_url(args, |x| x) | ||
| } | ||
|
|
||
| pub fn spark_handled_parse_url( | ||
| args: &[ArrayRef], | ||
| handler_err: impl Fn(Result<Option<String>>) -> Result<Option<String>>, | ||
| ) -> Result<ArrayRef> { | ||
| if args.len() < 2 || args.len() > 3 { | ||
| return exec_err!( | ||
| "{} expects 2 or 3 arguments, but got {}", | ||
|
|
@@ -212,6 +189,7 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> { | |
| let part = &args[1]; | ||
|
|
||
| let result = if args.len() == 3 { | ||
| // In this case, the 'key' argument is passed | ||
| let key = &args[2]; | ||
|
|
||
| match (url.data_type(), part.data_type(), key.data_type()) { | ||
|
|
@@ -220,20 +198,23 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> { | |
| as_string_array(url)?, | ||
| as_string_array(part)?, | ||
| as_string_array(key)?, | ||
| handler_err, | ||
| ) | ||
| } | ||
| (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => { | ||
| process_parse_url::<_, _, _, StringViewArray>( | ||
| as_string_view_array(url)?, | ||
| as_string_view_array(part)?, | ||
| as_string_view_array(key)?, | ||
| handler_err, | ||
| ) | ||
| } | ||
| (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => { | ||
| process_parse_url::<_, _, _, LargeStringArray>( | ||
| as_large_string_array(url)?, | ||
| as_large_string_array(part)?, | ||
| as_large_string_array(key)?, | ||
| handler_err, | ||
| ) | ||
| } | ||
| _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), | ||
|
|
@@ -253,20 +234,23 @@ fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> { | |
| as_string_array(url)?, | ||
| as_string_array(part)?, | ||
| &key, | ||
| handler_err, | ||
| ) | ||
| } | ||
| (DataType::Utf8View, DataType::Utf8View) => { | ||
| process_parse_url::<_, _, _, StringViewArray>( | ||
| as_string_view_array(url)?, | ||
| as_string_view_array(part)?, | ||
| &key, | ||
| handler_err, | ||
| ) | ||
| } | ||
| (DataType::LargeUtf8, DataType::LargeUtf8) => { | ||
| process_parse_url::<_, _, _, LargeStringArray>( | ||
| as_large_string_array(url)?, | ||
| as_large_string_array(part)?, | ||
| &key, | ||
| handler_err, | ||
| ) | ||
| } | ||
| _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args), | ||
|
|
@@ -279,6 +263,7 @@ fn process_parse_url<'a, A, B, C, T>( | |
| url_array: &'a A, | ||
| part_array: &'a B, | ||
| key_array: &'a C, | ||
| handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>, | ||
| ) -> Result<ArrayRef> | ||
| where | ||
| &'a A: StringArrayType<'a>, | ||
|
|
@@ -292,11 +277,156 @@ where | |
| .zip(key_array.iter()) | ||
| .map(|((url, part), key)| { | ||
| if let (Some(url), Some(part), key) = (url, part, key) { | ||
| ParseUrl::parse(url, part, key) | ||
| handle(ParseUrl::parse(url, part, key)) | ||
| } else { | ||
| Ok(None) | ||
| } | ||
| }) | ||
| .collect::<Result<T>>() | ||
| .map(|array| Arc::new(array) as ArrayRef) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use arrow::array::{ArrayRef, Int32Array, StringArray}; | ||
| use datafusion_common::Result; | ||
| use std::array::from_ref; | ||
| use std::sync::Arc; | ||
|
|
||
| fn sa(vals: &[Option<&str>]) -> ArrayRef { | ||
| Arc::new(StringArray::from(vals.to_vec())) as ArrayRef | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_host() -> Result<()> { | ||
| let got = ParseUrl::parse("https://example.com/a?x=1", "HOST", None)?; | ||
| assert_eq!(got, Some("example.com".to_string())); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_query_no_key_vs_with_key() -> Result<()> { | ||
| let got_all = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", None)?; | ||
| assert_eq!(got_all, Some("a=1&b=2".to_string())); | ||
|
|
||
| let got_a = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("a"))?; | ||
| assert_eq!(got_a, Some("1".to_string())); | ||
|
|
||
| let got_c = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("c"))?; | ||
| assert_eq!(got_c, None); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_ref_protocol_userinfo_file_authority() -> Result<()> { | ||
| let url = "ftp://user:[email protected]:21/files?x=1#frag"; | ||
| assert_eq!(ParseUrl::parse(url, "REF", None)?, Some("frag".to_string())); | ||
| assert_eq!( | ||
| ParseUrl::parse(url, "PROTOCOL", None)?, | ||
| Some("ftp".to_string()) | ||
| ); | ||
| assert_eq!( | ||
| ParseUrl::parse(url, "USERINFO", None)?, | ||
| Some("user:pwd".to_string()) | ||
| ); | ||
| assert_eq!( | ||
| ParseUrl::parse(url, "FILE", None)?, | ||
| Some("/files?x=1".to_string()) | ||
| ); | ||
| assert_eq!( | ||
| ParseUrl::parse(url, "AUTHORITY", None)?, | ||
| Some("user:[email protected]".to_string()) | ||
| ); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_path_root_is_empty_string() -> Result<()> { | ||
| let got = ParseUrl::parse("https://example.com/", "PATH", None)?; | ||
| assert_eq!(got, Some("".to_string())); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_parse_malformed_url_returns_error() -> Result<()> { | ||
| let got = ParseUrl::parse("notaurl", "HOST", None)?; | ||
| assert_eq!(got, None); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_utf8_two_args() -> Result<()> { | ||
| let urls = sa(&[Some("https://example.com/a?x=1"), Some("https://ex.com/")]); | ||
| let parts = sa(&[Some("HOST"), Some("PATH")]); | ||
|
|
||
| let out = spark_handled_parse_url(&[urls, parts], |x| x)?; | ||
| let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap(); | ||
|
|
||
| assert_eq!(out_sa.len(), 2); | ||
| assert_eq!(out_sa.value(0), "example.com"); | ||
| assert_eq!(out_sa.value(1), ""); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_utf8_three_args_query_key() -> Result<()> { | ||
| let urls = sa(&[ | ||
| Some("https://example.com/a?x=1&y=2"), | ||
| Some("https://ex.com/?a=1"), | ||
| ]); | ||
| let parts = sa(&[Some("QUERY"), Some("QUERY")]); | ||
| let keys = sa(&[Some("y"), Some("b")]); | ||
|
|
||
| let out = spark_handled_parse_url(&[urls, parts, keys], |x| x)?; | ||
| let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap(); | ||
|
|
||
| assert_eq!(out_sa.len(), 2); | ||
| assert_eq!(out_sa.value(0), "2"); | ||
| assert!(out_sa.is_null(1)); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_spark_userinfo_and_nulls() -> Result<()> { | ||
| let urls = sa(&[ | ||
| Some("ftp://user:[email protected]:21/files"), | ||
| Some("https://example.com"), | ||
| None, | ||
| ]); | ||
| let parts = sa(&[Some("USERINFO"), Some("USERINFO"), Some("USERINFO")]); | ||
|
|
||
| let out = spark_handled_parse_url(&[urls, parts], |x| x)?; | ||
| let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap(); | ||
|
|
||
| assert_eq!(out_sa.len(), 3); | ||
| assert_eq!(out_sa.value(0), "user:pwd"); | ||
| assert!(out_sa.is_null(1)); | ||
| assert!(out_sa.is_null(2)); | ||
| Ok(()) | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_invalid_arg_count() { | ||
| let urls = sa(&[Some("https://example.com")]); | ||
| let err = spark_handled_parse_url(from_ref(&urls), |x| x).unwrap_err(); | ||
| assert!(format!("{err}").contains("expects 2 or 3 arguments")); | ||
|
|
||
| let parts = sa(&[Some("HOST")]); | ||
| let keys = sa(&[Some("x")]); | ||
| let err = | ||
| spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x) | ||
| .unwrap_err(); | ||
| assert!(format!("{err}").contains("expects 2 or 3 arguments")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_non_string_types_error() { | ||
| let urls = sa(&[Some("https://example.com")]); | ||
| let bad_part = Arc::new(Int32Array::from(vec![1])) as ArrayRef; | ||
|
|
||
| let err = spark_handled_parse_url(&[urls, bad_part], |x| x).unwrap_err(); | ||
| let msg = format!("{err}"); | ||
| assert!(msg.contains("expects STRING arguments")); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel quite a few of these tests could be done as slt tests; @alamb do we have a preference on where tests should be done? Should we prefer slt over rust tests, and fallback only to rust if it is something that slt can't handle?
Took a look at https://datafusion.apache.org/contributor-guide/testing.html but it doesn't mention if we have a specific preference, other than slt's being easier to maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I personally prefer slt tests. But i agree we don't have clear guidance