From 870358bbab7493598db9f84935439272b0ca5672 Mon Sep 17 00:00:00 2001 From: baishen Date: Fri, 15 Nov 2024 12:47:34 +0800 Subject: [PATCH 1/4] feat(query): Support `query_max_failures` setting --- src/common/exception/src/context.rs | 1 + src/common/exception/src/exception.rs | 20 +- src/common/exception/src/exception_code.rs | 491 +++++++++--------- src/common/exception/src/exception_into.rs | 29 +- .../exception/tests/it/exception_flight.rs | 1 + src/common/grpc/src/dns_resolver.rs | 1 + .../servers/http/v1/query/execute_state.rs | 71 +-- src/query/settings/src/settings_default.rs | 9 +- .../settings/src/settings_getter_setter.rs | 8 + .../mutation/mutator/recluster_mutator.rs | 2 +- src/query/storages/orc/src/hashable_schema.rs | 3 +- src/query/storages/orc/src/table.rs | 3 +- .../parquet_rs/parquet_reader/row_group.rs | 4 +- .../storages/parquet/src/parquet_rs/schema.rs | 2 +- 14 files changed, 350 insertions(+), 295 deletions(-) diff --git a/src/common/exception/src/context.rs b/src/common/exception/src/context.rs index 304fe0ef6ecf0..5a91ea21e4c45 100644 --- a/src/common/exception/src/context.rs +++ b/src/common/exception/src/context.rs @@ -41,6 +41,7 @@ impl ErrorCode { name: self.name, display_text: self.display_text, detail: self.detail, + is_retryable: self.is_retryable, span: self.span, cause: self.cause, backtrace: self.backtrace, diff --git a/src/common/exception/src/exception.rs b/src/common/exception/src/exception.rs index 668aeba8c6b53..31175d5730be1 100644 --- a/src/common/exception/src/exception.rs +++ b/src/common/exception/src/exception.rs @@ -33,6 +33,7 @@ pub struct ErrorCode { pub(crate) name: String, pub(crate) display_text: String, pub(crate) detail: String, + pub(crate) is_retryable: bool, pub(crate) span: Span, // cause is only used to contain an `anyhow::Error`. // TODO: remove `cause` when we completely get rid of `anyhow::Error`. @@ -118,6 +119,17 @@ impl ErrorCode { } } + pub fn is_retryable(&self) -> bool { + self.is_retryable + } + + pub fn set_is_retryable(self, is_retryable: bool) -> Self { + Self { + is_retryable, + ..self + } + } + pub fn span(&self) -> Span { self.span } @@ -193,12 +205,13 @@ impl Display for ErrorCode { impl ErrorCode { /// All std error will be converted to InternalError #[track_caller] - pub fn from_std_error(error: T) -> Self { + pub fn from_std_error(error: T, is_retryable: bool) -> Self { ErrorCode { code: 1001, name: String::from("FromStdError"), display_text: error.to_string(), detail: String::new(), + is_retryable, span: None, cause: None, backtrace: capture(), @@ -214,6 +227,7 @@ impl ErrorCode { name: String::from("Internal"), display_text: error.clone(), detail: String::new(), + is_retryable: false, span: None, cause: None, backtrace: capture(), @@ -229,6 +243,7 @@ impl ErrorCode { name: String::from("Internal"), display_text: error, detail: String::new(), + is_retryable: false, span: None, cause: None, stacks: vec![], @@ -242,6 +257,7 @@ impl ErrorCode { name: impl ToString, display_text: String, detail: String, + is_retryable: bool, cause: Option>, backtrace: StackTrace, ) -> Self { @@ -249,6 +265,7 @@ impl ErrorCode { code, display_text: display_text.clone(), detail, + is_retryable, span: None, cause, backtrace, @@ -313,6 +330,7 @@ impl Clone for ErrorCode { &self.name, self.display_text(), self.detail.clone(), + self.is_retryable, None, self.backtrace(), ) diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 3ac860b8f7a0c..202120d009261 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -18,7 +18,7 @@ use crate::exception_backtrace::capture; use crate::ErrorCode; macro_rules! build_exceptions { - ($($(#[$meta:meta])* $body:ident($code:expr)),*$(,)*) => { + ($($(#[$meta:meta])* $body:ident($code:expr, $is_retryable:expr)),*$(,)*) => { impl ErrorCode { $( @@ -38,6 +38,7 @@ macro_rules! build_exceptions { stringify!($body), display_text.into(), String::new(), + $is_retryable, None, bt, ) @@ -49,7 +50,7 @@ macro_rules! build_exceptions { // Internal errors [0, 2000]. build_exceptions! { - Ok(0), + Ok(0, false), /// Internal means this is the internal error that no action /// can be taken by neither developers or users. @@ -63,7 +64,7 @@ build_exceptions! { /// This error should never be used to for error checking. An error /// that returns as internal error could be assigned a separate error /// code at anytime. - Internal(1001), + Internal(1001, false), /// Unimplemented means this is a not implemented feature. /// @@ -73,361 +74,361 @@ build_exceptions! { /// /// It's OK to use this error code for not implemented feature in /// our dependences. For example, in arrow. - Unimplemented(1002), + Unimplemented(1002, false), // Legacy error codes, we will refactor them one by one. - UnknownDatabase(1003), - UnknownDatabaseId(1004), - SyntaxException(1005), - BadArguments(1006), - IllegalDataType(1007), - UnknownFunction(1008), - BadDataValueType(1010), - EmptyData(1016), - DataStructMissMatch(1017), - BadDataArrayLength(1018), - UnknownTableId(1020), - UnknownTable(1025), - UnknownView(1026), - UnknownAggregateFunction(1027), - NumberArgumentsNotMatch(1028), - EmptyDataFromServer(1030), - NotFoundClusterNode(1035), - BadAddressFormat(1036), - DnsParseError(1037), - CannotConnectNode(1038), - TooManyUserConnections(1041), - AbortedSession(1042), - AbortedQuery(1043), - ClosedQuery(1044), - CannotListenerPort(1045), - BadBytes(1046), - InitPrometheusFailure(1047), - Overflow(1049), - TLSConfigurationFailure(1052), - UnknownSession(1053), - SHA1CheckFailed(1057), - UnknownColumn(1058), - StrParseError(1060), - IllegalGrant(1061), - ManagementModePermissionDenied(1062), - PermissionDenied(1063), - UnmarshalError(1064), - SemanticError(1065), - NeedChangePasswordDenied(1066), - UnknownException(1067), - TokioError(1068), - HttpNotFound(1072), - UnknownFormat(1074), - UnknownCompressionType(1075), - InvalidCompressionData(1076), - InvalidTimezone(1078), - InvalidDate(1079), - InvalidTimestamp(1080), - InvalidClusterKeys(1081), - UnknownFragmentExchange(1082), - TenantIsEmpty(1101), - IndexOutOfBounds(1102), - LayoutError(1103), - PanicError(1104), - TableInfoError(1106), - ReadTableDataError(1107), - AddColumnExistError(1108), - DropColumnEmptyError(1109), + UnknownDatabase(1003, false), + UnknownDatabaseId(1004, false), + SyntaxException(1005, false), + BadArguments(1006, false), + IllegalDataType(1007, false), + UnknownFunction(1008, false), + BadDataValueType(1010, false), + EmptyData(1016, false), + DataStructMissMatch(1017, false), + BadDataArrayLength(1018, false), + UnknownTableId(1020, false), + UnknownTable(1025, false), + UnknownView(1026, false), + UnknownAggregateFunction(1027, false), + NumberArgumentsNotMatch(1028, false), + EmptyDataFromServer(1030, false), + NotFoundClusterNode(1035, false), + BadAddressFormat(1036, false), + DnsParseError(1037, false), + CannotConnectNode(1038, true), + TooManyUserConnections(1041, false), + AbortedSession(1042, false), + AbortedQuery(1043, false), + ClosedQuery(1044, false), + CannotListenerPort(1045, false), + BadBytes(1046, false), + InitPrometheusFailure(1047, false), + Overflow(1049, false), + TLSConfigurationFailure(1052, false), + UnknownSession(1053, false), + SHA1CheckFailed(1057, false), + UnknownColumn(1058, false), + StrParseError(1060, false), + IllegalGrant(1061, false), + ManagementModePermissionDenied(1062, false), + PermissionDenied(1063, false), + UnmarshalError(1064, false), + SemanticError(1065, false), + NeedChangePasswordDenied(1066, false), + UnknownException(1067, false), + TokioError(1068, false), + HttpNotFound(1072, false), + UnknownFormat(1074, false), + UnknownCompressionType(1075, false), + InvalidCompressionData(1076, false), + InvalidTimezone(1078, false), + InvalidDate(1079, false), + InvalidTimestamp(1080, false), + InvalidClusterKeys(1081, false), + UnknownFragmentExchange(1082, false), + TenantIsEmpty(1101, false), + IndexOutOfBounds(1102, false), + LayoutError(1103, false), + PanicError(1104, false), + TableInfoError(1106, false), + ReadTableDataError(1107, false), + AddColumnExistError(1108, false), + DropColumnEmptyError(1109, false), // create table or alter table add column with internal column name - TableWithInternalColumnName(1110), - EmptyShareEndpointConfig(1111), - LicenceDenied(1112), - UnknownDatamask(1113), - UnmatchColumnDataType(1114), - VirtualColumnNotFound(1115), - VirtualColumnAlreadyExists(1116), - ColumnReferencedByComputedColumn(1117), - ColumnReferencedByInvertedIndex(1118), + TableWithInternalColumnName(1110, false), + EmptyShareEndpointConfig(1111, false), + LicenceDenied(1112, false), + UnknownDatamask(1113, false), + UnmatchColumnDataType(1114, false), + VirtualColumnNotFound(1115, false), + VirtualColumnAlreadyExists(1116, false), + ColumnReferencedByComputedColumn(1117, false), + ColumnReferencedByInvertedIndex(1118, false), // The table is not a clustered table. - UnclusteredTable(1118), - UnknownCatalog(1119), - UnknownCatalogType(1120), - UnmatchMaskPolicyReturnType(1121), - Timeout(1122), - Outdated(1123), + UnclusteredTable(1118, false), + UnknownCatalog(1119, false), + UnknownCatalogType(1120, false), + UnmatchMaskPolicyReturnType(1121, false), + Timeout(1122, false), + Outdated(1123, false), // sequence - OutofSequenceRange(1124), - WrongSequenceCount(1125), - UnknownSequence(1126), - UnknownQuery(1127), + OutofSequenceRange(1124, false), + WrongSequenceCount(1125, false), + UnknownSequence(1126, false), + UnknownQuery(1127, false), // Data Related Errors /// ParquetFileInvalid is used when given parquet file is invalid. - ParquetFileInvalid(1201), + ParquetFileInvalid(1201, false), /// InvalidUtf8String is used when given string is not a valid utf8 string. - InvalidUtf8String(1202), + InvalidUtf8String(1202, false), // Table related errors starts here. /// TableOptionInvalid is used when users input an invalid option. /// /// For example: try to set a reserved table option. - TableOptionInvalid(1301), + TableOptionInvalid(1301, false), /// TableEngineMismatch is used when users try to do not supported /// operations on specified engine. /// /// For example: drop on `view` engine. - TableEngineNotSupported(1302), + TableEngineNotSupported(1302, false), /// TableSchemaMismatch is used when table's schema is not match with input /// /// For example: try to with 3 columns into a table with 4 columns. - TableSchemaMismatch(1303), + TableSchemaMismatch(1303, false), // License related errors starts here /// LicenseKeyParseError is used when license key cannot be pared by the jwt public key /// /// For example: license key is not valid - LicenseKeyParseError(1401), + LicenseKeyParseError(1401, false), /// LicenseKeyInvalid is used when license key verification error occurs /// /// For example: license key is expired - LicenseKeyInvalid(1402), - EnterpriseFeatureNotEnable(1403), - LicenseKeyExpired(1404), + LicenseKeyInvalid(1402, false), + EnterpriseFeatureNotEnable(1403, false), + LicenseKeyExpired(1404, false), - BackgroundJobAlreadyExists(1501), - UnknownBackgroundJob(1502), + BackgroundJobAlreadyExists(1501, false), + UnknownBackgroundJob(1502, false), - InvalidRowIdIndex(1503), + InvalidRowIdIndex(1503, false), // Index related errors. - UnsupportedIndex(1601), - RefreshIndexError(1602), - IndexOptionInvalid(1603), + UnsupportedIndex(1601, false), + RefreshIndexError(1602, false), + IndexOptionInvalid(1603, false), // Cloud control error codes - CloudControlConnectError(1701), - CloudControlNotEnabled(1702), - IllegalCloudControlMessageFormat(1703), + CloudControlConnectError(1701, true), + CloudControlNotEnabled(1702, false), + IllegalCloudControlMessageFormat(1703, false), // Geometry errors. - GeometryError(1801), - InvalidGeometryFormat(1802), + GeometryError(1801, false), + InvalidGeometryFormat(1802, false), // Tantivy errors. - TantivyError(1901), - TantivyOpenReadError(1902), - TantivyQueryParserError(1903), + TantivyError(1901, false), + TantivyOpenReadError(1902, false), + TantivyQueryParserError(1903, false), - ReqwestError(1910) + ReqwestError(1910, false) } // Meta service errors [2001, 3000]. build_exceptions! { // Meta service does not work. - MetaServiceError(2001), - InvalidConfig(2002), - MetaStorageError(2003), - InvalidArgument(2004), + MetaServiceError(2001, false), + InvalidConfig(2002, false), + MetaStorageError(2003, false), + InvalidArgument(2004, false), // Meta service replied with invalid data - InvalidReply(2005), + InvalidReply(2005, false), - TableVersionMismatched(2009), - OCCRetryFailure(2011), - TableNotWritable(2012), - TableHistoricalDataNotFound(2013), - DuplicatedUpsertFiles(2014), - TableAlreadyLocked(2015), - TableLockExpired(2016), + TableVersionMismatched(2009, false), + OCCRetryFailure(2011, false), + TableNotWritable(2012, false), + TableHistoricalDataNotFound(2013, false), + DuplicatedUpsertFiles(2014, false), + TableAlreadyLocked(2015, false), + TableLockExpired(2016, false), // User api error codes. - UnknownUser(2201), - UserAlreadyExists(2202), - IllegalUserInfoFormat(2203), - UnknownRole(2204), - InvalidRole(2206), - UnknownNetworkPolicy(2207), - NetworkPolicyAlreadyExists(2208), - IllegalNetworkPolicy(2209), - NetworkPolicyIsUsedByUser(2210), - UnknownPasswordPolicy(2211), - PasswordPolicyAlreadyExists(2212), - IllegalPasswordPolicy(2213), - PasswordPolicyIsUsedByUser(2214), - InvalidPassword(2215), - RoleAlreadyExists(2216), - IllegalRole(2217), - IllegalUser(2218), + UnknownUser(2201, false), + UserAlreadyExists(2202, false), + IllegalUserInfoFormat(2203, false), + UnknownRole(2204, false), + InvalidRole(2206, false), + UnknownNetworkPolicy(2207, false), + NetworkPolicyAlreadyExists(2208, false), + IllegalNetworkPolicy(2209, false), + NetworkPolicyIsUsedByUser(2210, false), + UnknownPasswordPolicy(2211, false), + PasswordPolicyAlreadyExists(2212, false), + IllegalPasswordPolicy(2213, false), + PasswordPolicyIsUsedByUser(2214, false), + InvalidPassword(2215, false), + RoleAlreadyExists(2216, false), + IllegalRole(2217, false), + IllegalUser(2218, false), // Meta api error codes. - DatabaseAlreadyExists(2301), - TableAlreadyExists(2302), - ViewAlreadyExists(2306), - CreateTableWithDropTime(2307), - UndropTableAlreadyExists(2308), - UndropTableHasNoHistory(2309), - CreateDatabaseWithDropTime(2310), - UndropDbHasNoHistory(2312), - UndropTableWithNoDropTime(2313), - DropTableWithDropTime(2314), - DropDbWithDropTime(2315), - UndropDbWithNoDropTime(2316), - TxnRetryMaxTimes(2317), + DatabaseAlreadyExists(2301, false), + TableAlreadyExists(2302, false), + ViewAlreadyExists(2306, false), + CreateTableWithDropTime(2307, false), + UndropTableAlreadyExists(2308, false), + UndropTableHasNoHistory(2309, false), + CreateDatabaseWithDropTime(2310, false), + UndropDbHasNoHistory(2312, false), + UndropTableWithNoDropTime(2313, false), + DropTableWithDropTime(2314, false), + DropDbWithDropTime(2315, false), + UndropDbWithNoDropTime(2316, false), + TxnRetryMaxTimes(2317, false), /// `CatalogNotSupported` should be raised when defining a catalog, which is: /// - not supported by the application, like creating a `HIVE` catalog but `HIVE` feature not enabled; /// - forbidden to create, like creating a `DEFAULT` catalog - CatalogNotSupported(2318), + CatalogNotSupported(2318, false), /// `CatalogAlreadyExists` should be raised when defining a catalog, which is: /// - having the same name as a already exist, like `default` /// - and without `IF NOT EXISTS` - CatalogAlreadyExists(2319), + CatalogAlreadyExists(2319, false), /// `CatalogNotFound` should be raised when trying to drop a catalog that is: /// - not exists. /// - and without `IF EXISTS` - CatalogNotFound(2320), + CatalogNotFound(2320, false), /// data mask error codes - DatamaskAlreadyExists(2321), + DatamaskAlreadyExists(2321, false), - CommitTableMetaError(2322), - CreateAsDropTableWithoutDropTime(2323), + CommitTableMetaError(2322, false), + CreateAsDropTableWithoutDropTime(2323, false), // Cluster error codes. - ClusterUnknownNode(2401), - ClusterNodeAlreadyExists(2402), + ClusterUnknownNode(2401, false), + ClusterNodeAlreadyExists(2402, false), // Stage error codes. - UnknownStage(2501), - StageAlreadyExists(2502), - IllegalUserStageFormat(2503), - StageFileAlreadyExists(2504), - IllegalStageFileFormat(2505), - StagePermissionDenied(2506), + UnknownStage(2501, false), + StageAlreadyExists(2502, false), + IllegalUserStageFormat(2503, false), + StageFileAlreadyExists(2504, false), + IllegalStageFileFormat(2505, false), + StagePermissionDenied(2506, false), // FileFormat error codes. - UnknownFileFormat(2507), - IllegalFileFormat(2508), - FileFormatAlreadyExists(2509), + UnknownFileFormat(2507, false), + IllegalFileFormat(2508, false), + FileFormatAlreadyExists(2509, false), // Connection error codes. - UnknownConnection(2510), - IllegalConnection(2511), - ConnectionAlreadyExists(2512), + UnknownConnection(2510, false), + IllegalConnection(2511, false), + ConnectionAlreadyExists(2512, false), // User defined function error codes. - IllegalUDFFormat(2601), - UnknownUDF(2602), - UdfAlreadyExists(2603), - UDFServerConnectError(2604), - UDFSchemaMismatch(2605), - UnsupportedDataType(2606), - UDFDataError(2607), + IllegalUDFFormat(2601, false), + UnknownUDF(2602, false), + UdfAlreadyExists(2603, false), + UDFServerConnectError(2604, true), + UDFSchemaMismatch(2605, false), + UnsupportedDataType(2606, false), + UDFDataError(2607, false), // Database error codes. - UnknownDatabaseEngine(2701), - UnknownTableEngine(2702), - UnsupportedEngineParams(2703), + UnknownDatabaseEngine(2701, false), + UnknownTableEngine(2702, false), + UnsupportedEngineParams(2703, false), // Share error codes. - ShareAlreadyExists(2705), - UnknownShare(2706), - UnknownShareId(2707), - ShareAccountsAlreadyExists(2708), - UnknownShareAccounts(2709), - WrongShareObject(2710), - WrongShare(2711), - ShareHasNoGrantedDatabase(2712), - ShareHasNoGrantedPrivilege(2713), - ShareEndpointAlreadyExists(2714), - UnknownShareEndpoint(2715), - UnknownShareEndpointId(2716), - CannotAccessShareTable(2717), - CannotShareDatabaseCreatedFromShare(2718), - ShareStorageError(2719), + ShareAlreadyExists(2705, false), + UnknownShare(2706, false), + UnknownShareId(2707, false), + ShareAccountsAlreadyExists(2708, false), + UnknownShareAccounts(2709, false), + WrongShareObject(2710, false), + WrongShare(2711, false), + ShareHasNoGrantedDatabase(2712, false), + ShareHasNoGrantedPrivilege(2713, false), + ShareEndpointAlreadyExists(2714, false), + UnknownShareEndpoint(2715, false), + UnknownShareEndpointId(2716, false), + CannotAccessShareTable(2717, false), + CannotShareDatabaseCreatedFromShare(2718, false), + ShareStorageError(2719, false), // Index error codes. - CreateIndexWithDropTime(2720), - IndexAlreadyExists(2721), - UnknownIndex(2722), - DropIndexWithDropTime(2723), - GetIndexWithDropTime(2724), - DuplicatedIndexColumnId(2725), - IndexColumnIdNotFound(2726), + CreateIndexWithDropTime(2720, false), + IndexAlreadyExists(2721, false), + UnknownIndex(2722, false), + DropIndexWithDropTime(2723, false), + GetIndexWithDropTime(2724, false), + DuplicatedIndexColumnId(2725, false), + IndexColumnIdNotFound(2726, false), // Stream error codes. - UnknownStream(2730), - UnknownStreamId(2731), - StreamAlreadyExists(2732), - IllegalStream(2733), - StreamVersionMismatched(2734), - WithOptionInvalid(2735), + UnknownStream(2730, false), + UnknownStreamId(2731, false), + StreamAlreadyExists(2732, false), + IllegalStream(2733, false), + StreamVersionMismatched(2734, false), + WithOptionInvalid(2735, false), // dynamic error codes. - IllegalDynamicTable(2740), + IllegalDynamicTable(2740, false), // Variable error codes. - UnknownVariable(2801), - OnlySupportAsciiChars(2802), - WrongValueForVariable(2803), + UnknownVariable(2801, false), + OnlySupportAsciiChars(2802, false), + WrongValueForVariable(2803, false), // Tenant quota error codes. - IllegalTenantQuotaFormat(2901), - TenantQuotaUnknown(2902), - TenantQuotaExceeded(2903), + IllegalTenantQuotaFormat(2901, false), + TenantQuotaUnknown(2902, false), + TenantQuotaExceeded(2903, false), // Script error codes. - ScriptSemanticError(3001), - ScriptExecutionError(3002), + ScriptSemanticError(3001, false), + ScriptExecutionError(3002, false), // sequence - SequenceError(3101), + SequenceError(3101, false), - // Share error codes(continue). - ErrorShareEndpointCredential(3111), - WrongSharePrivileges(3112), + // Share error codes(continue, false). + ErrorShareEndpointCredential(3111, false), + WrongSharePrivileges(3112, false), // dictionary - DictionaryAlreadyExists(3113), - UnknownDictionary(3114), - DictionarySourceError(3115), + DictionaryAlreadyExists(3113, false), + UnknownDictionary(3114, false), + DictionarySourceError(3115, false), // Procedure - UnknownProcedure(3130), - ProcedureAlreadyExists(3131), - IllegalProcedureFormat(3132), + UnknownProcedure(3130, false), + ProcedureAlreadyExists(3131, false), + IllegalProcedureFormat(3132, false), } // Storage errors [3001, 4000]. build_exceptions! { - StorageNotFound(3001), - StoragePermissionDenied(3002), - StorageUnavailable(3901), - StorageUnsupported(3902), - StorageInsecure(3903), - DeprecatedIndexFormat(3904), - InvalidOperation(3905), - StorageOther(4000), - UnresolvableConflict(4001), + StorageNotFound(3001, false), + StoragePermissionDenied(3002, false), + StorageUnavailable(3901, false), + StorageUnsupported(3902, false), + StorageInsecure(3903, false), + DeprecatedIndexFormat(3904, false), + InvalidOperation(3905, false), + StorageOther(4000, false), + UnresolvableConflict(4001, false), // transaction error codes - CurrentTransactionIsAborted(4002), - TransactionTimeout(4003), - InvalidSessionState(4004), + CurrentTransactionIsAborted(4002, false), + TransactionTimeout(4003, false), + InvalidSessionState(4004, false), // recluster error codes - NoNeedToRecluster(4011), - NoNeedToCompact(4012), + NoNeedToRecluster(4011, false), + NoNeedToCompact(4012, false), - RefreshTableInfoFailure(4012), + RefreshTableInfoFailure(4012, false), } // Service errors [5001,6000]. build_exceptions! { // A task that already stopped and can not stop twice. - AlreadyStopped(5002), + AlreadyStopped(5002, false), // auth related - AuthenticateFailure(5100), + AuthenticateFailure(5100, false), // the flowing 4 code is used by clients - SessionTokenExpired(5101), - RefreshTokenExpired(5102), - SessionTokenNotFound(5103), - RefreshTokenNotFound(5104) + SessionTokenExpired(5101, false), + RefreshTokenExpired(5102, false), + SessionTokenNotFound(5103, false), + RefreshTokenNotFound(5104, false) } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index 72788198bd350..b3a5507976b9e 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -65,6 +65,7 @@ impl From for ErrorCode { "anyhow", format!("{}, source: {:?}", error, error.source()), String::new(), + false, Some(Box::new(OtherErrors::AnyHow { error })), capture(), ) @@ -79,13 +80,13 @@ impl From<&str> for ErrorCode { impl From for ErrorCode { fn from(error: std::num::ParseIntError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } impl From for ErrorCode { fn from(error: std::str::ParseBoolError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } @@ -97,13 +98,13 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: std::num::ParseFloatError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } impl From for ErrorCode { fn from(error: std::num::TryFromIntError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } @@ -112,7 +113,7 @@ impl From for ErrorCode { use databend_common_arrow::arrow::error::Error; match error { Error::NotYetImplemented(v) => ErrorCode::Unimplemented(format!("arrow: {v}")), - v => ErrorCode::from_std_error(v), + v => ErrorCode::from_std_error(v, false), } } } @@ -123,26 +124,26 @@ impl From for ErrorCode { arrow_schema::ArrowError::NotYetImplemented(v) => { ErrorCode::Unimplemented(format!("arrow: {v}")) } - v => ErrorCode::from_std_error(v), + v => ErrorCode::from_std_error(v, false), } } } impl From for ErrorCode { fn from(error: parquet::errors::ParquetError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } impl From for ErrorCode { fn from(error: bincode::error::EncodeError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } impl From for ErrorCode { fn from(error: bincode::error::DecodeError) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } @@ -153,6 +154,7 @@ impl From for ErrorCode { "EncodeError", format!("{error:?}"), String::new(), + false, None, capture(), ) @@ -166,6 +168,7 @@ impl From for ErrorCode { "DecodeError", format!("{error:?}"), String::new(), + false, None, capture(), ) @@ -174,13 +177,13 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: serde_json::Error) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, false) } } impl From for ErrorCode { fn from(v: std::convert::Infallible) -> Self { - ErrorCode::from_std_error(v) + ErrorCode::from_std_error(v, false) } } @@ -198,7 +201,7 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: http::Error) -> Self { - ErrorCode::from_std_error(error) + ErrorCode::from_std_error(error, true) } } @@ -303,6 +306,7 @@ impl From<&SerializedError> for ErrorCode { se.name.clone(), se.message.clone(), String::new(), + false, None, se.backtrace.clone(), ) @@ -376,6 +380,7 @@ impl From for ErrorCode { serialized_error.name, serialized_error.message, String::new(), + false, None, serialized_error.backtrace, ) diff --git a/src/common/exception/tests/it/exception_flight.rs b/src/common/exception/tests/it/exception_flight.rs index 59db53a0404ec..03cdc29219564 100644 --- a/src/common/exception/tests/it/exception_flight.rs +++ b/src/common/exception/tests/it/exception_flight.rs @@ -24,6 +24,7 @@ fn test_serialize() -> Result<()> { "test_name", String::from("test_message"), String::new(), + false, None, StackTrace::capture(), ) diff --git a/src/common/grpc/src/dns_resolver.rs b/src/common/grpc/src/dns_resolver.rs index 7d816eb502ad4..62062eb4abf2c 100644 --- a/src/common/grpc/src/dns_resolver.rs +++ b/src/common/grpc/src/dns_resolver.rs @@ -64,6 +64,7 @@ impl DNSResolver { error.name(), error.message(), String::new(), + true, None, error.backtrace(), )), diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index b384c69afd27e..6e647e1ac0318 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -409,39 +409,48 @@ async fn execute( executor: Arc>, ) -> Result<(), ExecutionError> { let make_error = || format!("failed to execute {}", interpreter.name()); - - let mut data_stream = interpreter - .execute(ctx.clone()) - .await - .with_context(make_error)?; - match data_stream.next().await { - None => { - let block = DataBlock::empty_with_schema(schema); - block_sender.send(block, 0).await; - Executor::stop::<()>(&executor, Ok(())).await; - block_sender.close(); - } - Some(Err(err)) => { - Executor::stop(&executor, Err(err)).await; - block_sender.close(); - } - Some(Ok(block)) => { - let size = block.num_rows(); - block_sender.send(block, size).await; - while let Some(block_r) = data_stream.next().await { - match block_r { - Ok(block) => { - block_sender.send(block.clone(), block.num_rows()).await; - } - Err(err) => { - block_sender.close(); - return Err(err.with_context(make_error())); - } - }; + let settings = ctx.get_settings(); + let mut query_max_failures = settings.get_query_max_failures().unwrap_or(0); + loop { + let mut data_stream = interpreter + .execute(ctx.clone()) + .await + .with_context(make_error)?; + match data_stream.next().await { + None => { + let block = DataBlock::empty_with_schema(schema); + block_sender.send(block, 0).await; + Executor::stop::<()>(&executor, Ok(())).await; + block_sender.close(); + } + Some(Err(err)) => { + // If the error is retryable, such as network error, we can retry multiple times + if err.is_retryable() && query_max_failures > 0 { + query_max_failures -= 1; + continue; + } + Executor::stop(&executor, Err(err)).await; + block_sender.close(); + } + Some(Ok(block)) => { + let size = block.num_rows(); + block_sender.send(block, size).await; + while let Some(block_r) = data_stream.next().await { + match block_r { + Ok(block) => { + block_sender.send(block.clone(), block.num_rows()).await; + } + Err(err) => { + block_sender.close(); + return Err(err.with_context(make_error())); + } + }; + } + Executor::stop::<()>(&executor, Ok(())).await; + block_sender.close(); } - Executor::stop::<()>(&executor, Ok(())).await; - block_sender.close(); } + break; } Ok(()) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 35e74afbb3122..2deaaddc5ca62 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -941,6 +941,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(1..=1024*1024)), }), + ("query_max_failures", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Sets the query maximum failure retry times.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=5)), + }), ]); Ok(Arc::new(DefaultSettings { @@ -1012,7 +1018,8 @@ impl DefaultSettings { } fn max_memory_usage() -> Result { - let memory_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; + let memory_info = + sys_info::mem_info().map_err(|err| ErrorCode::from_std_error(err, false))?; Ok(match GlobalConfig::try_get_instance() { None => 1024 * memory_info.total * 80 / 100, diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 8938fcd616ddb..2d2e4467d5005 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -809,4 +809,12 @@ impl Settings { pub fn set_short_sql_max_length(&self, val: u64) -> Result<()> { self.try_set_u64("short_sql_max_length", val) } + + pub fn get_query_max_failures(&self) -> Result { + self.try_get_u64("query_max_failures") + } + + pub fn set_query_max_failures(&self, val: u64) -> Result<()> { + self.try_set_u64("query_max_failures", val) + } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index 5de6cb538442b..ee13d1beb4b12 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -196,7 +196,7 @@ impl ReclusterMutator { } // Compute memory threshold and maximum number of blocks allowed for reclustering - let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; + let mem_info = sys_info::mem_info().map_err(|err| ErrorCode::from_std_error(err, false))?; let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 30 / 100); // specify a rather small value, so that `recluster_block_size` might be tuned to lower value. diff --git a/src/query/storages/orc/src/hashable_schema.rs b/src/query/storages/orc/src/hashable_schema.rs index d2b07a848b404..9cc20f1745dd3 100644 --- a/src/query/storages/orc/src/hashable_schema.rs +++ b/src/query/storages/orc/src/hashable_schema.rs @@ -37,7 +37,8 @@ pub struct HashableSchema { impl HashableSchema { pub fn try_create(arrow_schema: SchemaRef) -> Result { let table_schema = Arc::new( - TableSchema::try_from(arrow_schema.as_ref()).map_err(ErrorCode::from_std_error)?, + TableSchema::try_from(arrow_schema.as_ref()) + .map_err(|err| ErrorCode::from_std_error(err, false))?, ); let data_schema = Arc::new(DataSchema::from(table_schema.clone())); diff --git a/src/query/storages/orc/src/table.rs b/src/query/storages/orc/src/table.rs index d3da0a13008ab..87f6ce45f24bb 100644 --- a/src/query/storages/orc/src/table.rs +++ b/src/query/storages/orc/src/table.rs @@ -83,7 +83,8 @@ impl OrcTable { let arrow_schema = Self::prepare_metas(first_file, operator.clone()).await?; let table_schema = Arc::new( - TableSchema::try_from(arrow_schema.as_ref()).map_err(ErrorCode::from_std_error)?, + TableSchema::try_from(arrow_schema.as_ref()) + .map_err(|err| ErrorCode::from_std_error(err, false))?, ); let table_info = create_orc_table_info(table_schema.clone(), stage_info)?; diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs b/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs index 15c0c68b68943..d8894dbeec9b3 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs @@ -296,7 +296,9 @@ where T: Send + 'static, { match databend_common_base::runtime::try_spawn_blocking(f) { - Ok(handler) => handler.await.map_err(ErrorCode::from_std_error)?, + Ok(handler) => handler + .await + .map_err(|err| ErrorCode::from_std_error(err, true))?, Err(f) => f(), } } diff --git a/src/query/storages/parquet/src/parquet_rs/schema.rs b/src/query/storages/parquet/src/parquet_rs/schema.rs index 37fef98f2a011..e1f89b7bec970 100644 --- a/src/query/storages/parquet/src/parquet_rs/schema.rs +++ b/src/query/storages/parquet/src/parquet_rs/schema.rs @@ -50,5 +50,5 @@ pub(crate) fn arrow_to_table_schema(schema: &ArrowSchema) -> Result .map(|f| Arc::new(lower_field_name(f))) .collect::>(); let schema = ArrowSchema::new_with_metadata(fields, schema.metadata().clone()); - TableSchema::try_from(&schema).map_err(ErrorCode::from_std_error) + TableSchema::try_from(&schema).map_err(|err| ErrorCode::from_std_error(err, false)) } From ea5395ff8bca007b4d27a1628432a9baec1b3f4e Mon Sep 17 00:00:00 2001 From: baishen Date: Wed, 4 Dec 2024 18:00:55 +0800 Subject: [PATCH 2/4] fix --- src/common/exception/src/context.rs | 1 - src/common/exception/src/exception.rs | 20 +- src/common/exception/src/exception_code.rs | 491 +++++++++--------- src/common/exception/src/exception_into.rs | 27 +- .../exception/tests/it/exception_flight.rs | 1 - src/common/grpc/src/dns_resolver.rs | 1 - .../servers/http/v1/query/execute_state.rs | 71 ++- src/query/settings/src/settings_default.rs | 3 +- .../mutation/mutator/recluster_mutator.rs | 2 +- src/query/storages/orc/src/hashable_schema.rs | 3 +- src/query/storages/orc/src/table.rs | 3 +- .../parquet_rs/parquet_reader/row_group.rs | 4 +- .../storages/parquet/src/parquet_rs/schema.rs | 2 +- 13 files changed, 294 insertions(+), 335 deletions(-) diff --git a/src/common/exception/src/context.rs b/src/common/exception/src/context.rs index 5a91ea21e4c45..304fe0ef6ecf0 100644 --- a/src/common/exception/src/context.rs +++ b/src/common/exception/src/context.rs @@ -41,7 +41,6 @@ impl ErrorCode { name: self.name, display_text: self.display_text, detail: self.detail, - is_retryable: self.is_retryable, span: self.span, cause: self.cause, backtrace: self.backtrace, diff --git a/src/common/exception/src/exception.rs b/src/common/exception/src/exception.rs index 31175d5730be1..668aeba8c6b53 100644 --- a/src/common/exception/src/exception.rs +++ b/src/common/exception/src/exception.rs @@ -33,7 +33,6 @@ pub struct ErrorCode { pub(crate) name: String, pub(crate) display_text: String, pub(crate) detail: String, - pub(crate) is_retryable: bool, pub(crate) span: Span, // cause is only used to contain an `anyhow::Error`. // TODO: remove `cause` when we completely get rid of `anyhow::Error`. @@ -119,17 +118,6 @@ impl ErrorCode { } } - pub fn is_retryable(&self) -> bool { - self.is_retryable - } - - pub fn set_is_retryable(self, is_retryable: bool) -> Self { - Self { - is_retryable, - ..self - } - } - pub fn span(&self) -> Span { self.span } @@ -205,13 +193,12 @@ impl Display for ErrorCode { impl ErrorCode { /// All std error will be converted to InternalError #[track_caller] - pub fn from_std_error(error: T, is_retryable: bool) -> Self { + pub fn from_std_error(error: T) -> Self { ErrorCode { code: 1001, name: String::from("FromStdError"), display_text: error.to_string(), detail: String::new(), - is_retryable, span: None, cause: None, backtrace: capture(), @@ -227,7 +214,6 @@ impl ErrorCode { name: String::from("Internal"), display_text: error.clone(), detail: String::new(), - is_retryable: false, span: None, cause: None, backtrace: capture(), @@ -243,7 +229,6 @@ impl ErrorCode { name: String::from("Internal"), display_text: error, detail: String::new(), - is_retryable: false, span: None, cause: None, stacks: vec![], @@ -257,7 +242,6 @@ impl ErrorCode { name: impl ToString, display_text: String, detail: String, - is_retryable: bool, cause: Option>, backtrace: StackTrace, ) -> Self { @@ -265,7 +249,6 @@ impl ErrorCode { code, display_text: display_text.clone(), detail, - is_retryable, span: None, cause, backtrace, @@ -330,7 +313,6 @@ impl Clone for ErrorCode { &self.name, self.display_text(), self.detail.clone(), - self.is_retryable, None, self.backtrace(), ) diff --git a/src/common/exception/src/exception_code.rs b/src/common/exception/src/exception_code.rs index 202120d009261..3ac860b8f7a0c 100644 --- a/src/common/exception/src/exception_code.rs +++ b/src/common/exception/src/exception_code.rs @@ -18,7 +18,7 @@ use crate::exception_backtrace::capture; use crate::ErrorCode; macro_rules! build_exceptions { - ($($(#[$meta:meta])* $body:ident($code:expr, $is_retryable:expr)),*$(,)*) => { + ($($(#[$meta:meta])* $body:ident($code:expr)),*$(,)*) => { impl ErrorCode { $( @@ -38,7 +38,6 @@ macro_rules! build_exceptions { stringify!($body), display_text.into(), String::new(), - $is_retryable, None, bt, ) @@ -50,7 +49,7 @@ macro_rules! build_exceptions { // Internal errors [0, 2000]. build_exceptions! { - Ok(0, false), + Ok(0), /// Internal means this is the internal error that no action /// can be taken by neither developers or users. @@ -64,7 +63,7 @@ build_exceptions! { /// This error should never be used to for error checking. An error /// that returns as internal error could be assigned a separate error /// code at anytime. - Internal(1001, false), + Internal(1001), /// Unimplemented means this is a not implemented feature. /// @@ -74,361 +73,361 @@ build_exceptions! { /// /// It's OK to use this error code for not implemented feature in /// our dependences. For example, in arrow. - Unimplemented(1002, false), + Unimplemented(1002), // Legacy error codes, we will refactor them one by one. - UnknownDatabase(1003, false), - UnknownDatabaseId(1004, false), - SyntaxException(1005, false), - BadArguments(1006, false), - IllegalDataType(1007, false), - UnknownFunction(1008, false), - BadDataValueType(1010, false), - EmptyData(1016, false), - DataStructMissMatch(1017, false), - BadDataArrayLength(1018, false), - UnknownTableId(1020, false), - UnknownTable(1025, false), - UnknownView(1026, false), - UnknownAggregateFunction(1027, false), - NumberArgumentsNotMatch(1028, false), - EmptyDataFromServer(1030, false), - NotFoundClusterNode(1035, false), - BadAddressFormat(1036, false), - DnsParseError(1037, false), - CannotConnectNode(1038, true), - TooManyUserConnections(1041, false), - AbortedSession(1042, false), - AbortedQuery(1043, false), - ClosedQuery(1044, false), - CannotListenerPort(1045, false), - BadBytes(1046, false), - InitPrometheusFailure(1047, false), - Overflow(1049, false), - TLSConfigurationFailure(1052, false), - UnknownSession(1053, false), - SHA1CheckFailed(1057, false), - UnknownColumn(1058, false), - StrParseError(1060, false), - IllegalGrant(1061, false), - ManagementModePermissionDenied(1062, false), - PermissionDenied(1063, false), - UnmarshalError(1064, false), - SemanticError(1065, false), - NeedChangePasswordDenied(1066, false), - UnknownException(1067, false), - TokioError(1068, false), - HttpNotFound(1072, false), - UnknownFormat(1074, false), - UnknownCompressionType(1075, false), - InvalidCompressionData(1076, false), - InvalidTimezone(1078, false), - InvalidDate(1079, false), - InvalidTimestamp(1080, false), - InvalidClusterKeys(1081, false), - UnknownFragmentExchange(1082, false), - TenantIsEmpty(1101, false), - IndexOutOfBounds(1102, false), - LayoutError(1103, false), - PanicError(1104, false), - TableInfoError(1106, false), - ReadTableDataError(1107, false), - AddColumnExistError(1108, false), - DropColumnEmptyError(1109, false), + UnknownDatabase(1003), + UnknownDatabaseId(1004), + SyntaxException(1005), + BadArguments(1006), + IllegalDataType(1007), + UnknownFunction(1008), + BadDataValueType(1010), + EmptyData(1016), + DataStructMissMatch(1017), + BadDataArrayLength(1018), + UnknownTableId(1020), + UnknownTable(1025), + UnknownView(1026), + UnknownAggregateFunction(1027), + NumberArgumentsNotMatch(1028), + EmptyDataFromServer(1030), + NotFoundClusterNode(1035), + BadAddressFormat(1036), + DnsParseError(1037), + CannotConnectNode(1038), + TooManyUserConnections(1041), + AbortedSession(1042), + AbortedQuery(1043), + ClosedQuery(1044), + CannotListenerPort(1045), + BadBytes(1046), + InitPrometheusFailure(1047), + Overflow(1049), + TLSConfigurationFailure(1052), + UnknownSession(1053), + SHA1CheckFailed(1057), + UnknownColumn(1058), + StrParseError(1060), + IllegalGrant(1061), + ManagementModePermissionDenied(1062), + PermissionDenied(1063), + UnmarshalError(1064), + SemanticError(1065), + NeedChangePasswordDenied(1066), + UnknownException(1067), + TokioError(1068), + HttpNotFound(1072), + UnknownFormat(1074), + UnknownCompressionType(1075), + InvalidCompressionData(1076), + InvalidTimezone(1078), + InvalidDate(1079), + InvalidTimestamp(1080), + InvalidClusterKeys(1081), + UnknownFragmentExchange(1082), + TenantIsEmpty(1101), + IndexOutOfBounds(1102), + LayoutError(1103), + PanicError(1104), + TableInfoError(1106), + ReadTableDataError(1107), + AddColumnExistError(1108), + DropColumnEmptyError(1109), // create table or alter table add column with internal column name - TableWithInternalColumnName(1110, false), - EmptyShareEndpointConfig(1111, false), - LicenceDenied(1112, false), - UnknownDatamask(1113, false), - UnmatchColumnDataType(1114, false), - VirtualColumnNotFound(1115, false), - VirtualColumnAlreadyExists(1116, false), - ColumnReferencedByComputedColumn(1117, false), - ColumnReferencedByInvertedIndex(1118, false), + TableWithInternalColumnName(1110), + EmptyShareEndpointConfig(1111), + LicenceDenied(1112), + UnknownDatamask(1113), + UnmatchColumnDataType(1114), + VirtualColumnNotFound(1115), + VirtualColumnAlreadyExists(1116), + ColumnReferencedByComputedColumn(1117), + ColumnReferencedByInvertedIndex(1118), // The table is not a clustered table. - UnclusteredTable(1118, false), - UnknownCatalog(1119, false), - UnknownCatalogType(1120, false), - UnmatchMaskPolicyReturnType(1121, false), - Timeout(1122, false), - Outdated(1123, false), + UnclusteredTable(1118), + UnknownCatalog(1119), + UnknownCatalogType(1120), + UnmatchMaskPolicyReturnType(1121), + Timeout(1122), + Outdated(1123), // sequence - OutofSequenceRange(1124, false), - WrongSequenceCount(1125, false), - UnknownSequence(1126, false), - UnknownQuery(1127, false), + OutofSequenceRange(1124), + WrongSequenceCount(1125), + UnknownSequence(1126), + UnknownQuery(1127), // Data Related Errors /// ParquetFileInvalid is used when given parquet file is invalid. - ParquetFileInvalid(1201, false), + ParquetFileInvalid(1201), /// InvalidUtf8String is used when given string is not a valid utf8 string. - InvalidUtf8String(1202, false), + InvalidUtf8String(1202), // Table related errors starts here. /// TableOptionInvalid is used when users input an invalid option. /// /// For example: try to set a reserved table option. - TableOptionInvalid(1301, false), + TableOptionInvalid(1301), /// TableEngineMismatch is used when users try to do not supported /// operations on specified engine. /// /// For example: drop on `view` engine. - TableEngineNotSupported(1302, false), + TableEngineNotSupported(1302), /// TableSchemaMismatch is used when table's schema is not match with input /// /// For example: try to with 3 columns into a table with 4 columns. - TableSchemaMismatch(1303, false), + TableSchemaMismatch(1303), // License related errors starts here /// LicenseKeyParseError is used when license key cannot be pared by the jwt public key /// /// For example: license key is not valid - LicenseKeyParseError(1401, false), + LicenseKeyParseError(1401), /// LicenseKeyInvalid is used when license key verification error occurs /// /// For example: license key is expired - LicenseKeyInvalid(1402, false), - EnterpriseFeatureNotEnable(1403, false), - LicenseKeyExpired(1404, false), + LicenseKeyInvalid(1402), + EnterpriseFeatureNotEnable(1403), + LicenseKeyExpired(1404), - BackgroundJobAlreadyExists(1501, false), - UnknownBackgroundJob(1502, false), + BackgroundJobAlreadyExists(1501), + UnknownBackgroundJob(1502), - InvalidRowIdIndex(1503, false), + InvalidRowIdIndex(1503), // Index related errors. - UnsupportedIndex(1601, false), - RefreshIndexError(1602, false), - IndexOptionInvalid(1603, false), + UnsupportedIndex(1601), + RefreshIndexError(1602), + IndexOptionInvalid(1603), // Cloud control error codes - CloudControlConnectError(1701, true), - CloudControlNotEnabled(1702, false), - IllegalCloudControlMessageFormat(1703, false), + CloudControlConnectError(1701), + CloudControlNotEnabled(1702), + IllegalCloudControlMessageFormat(1703), // Geometry errors. - GeometryError(1801, false), - InvalidGeometryFormat(1802, false), + GeometryError(1801), + InvalidGeometryFormat(1802), // Tantivy errors. - TantivyError(1901, false), - TantivyOpenReadError(1902, false), - TantivyQueryParserError(1903, false), + TantivyError(1901), + TantivyOpenReadError(1902), + TantivyQueryParserError(1903), - ReqwestError(1910, false) + ReqwestError(1910) } // Meta service errors [2001, 3000]. build_exceptions! { // Meta service does not work. - MetaServiceError(2001, false), - InvalidConfig(2002, false), - MetaStorageError(2003, false), - InvalidArgument(2004, false), + MetaServiceError(2001), + InvalidConfig(2002), + MetaStorageError(2003), + InvalidArgument(2004), // Meta service replied with invalid data - InvalidReply(2005, false), + InvalidReply(2005), - TableVersionMismatched(2009, false), - OCCRetryFailure(2011, false), - TableNotWritable(2012, false), - TableHistoricalDataNotFound(2013, false), - DuplicatedUpsertFiles(2014, false), - TableAlreadyLocked(2015, false), - TableLockExpired(2016, false), + TableVersionMismatched(2009), + OCCRetryFailure(2011), + TableNotWritable(2012), + TableHistoricalDataNotFound(2013), + DuplicatedUpsertFiles(2014), + TableAlreadyLocked(2015), + TableLockExpired(2016), // User api error codes. - UnknownUser(2201, false), - UserAlreadyExists(2202, false), - IllegalUserInfoFormat(2203, false), - UnknownRole(2204, false), - InvalidRole(2206, false), - UnknownNetworkPolicy(2207, false), - NetworkPolicyAlreadyExists(2208, false), - IllegalNetworkPolicy(2209, false), - NetworkPolicyIsUsedByUser(2210, false), - UnknownPasswordPolicy(2211, false), - PasswordPolicyAlreadyExists(2212, false), - IllegalPasswordPolicy(2213, false), - PasswordPolicyIsUsedByUser(2214, false), - InvalidPassword(2215, false), - RoleAlreadyExists(2216, false), - IllegalRole(2217, false), - IllegalUser(2218, false), + UnknownUser(2201), + UserAlreadyExists(2202), + IllegalUserInfoFormat(2203), + UnknownRole(2204), + InvalidRole(2206), + UnknownNetworkPolicy(2207), + NetworkPolicyAlreadyExists(2208), + IllegalNetworkPolicy(2209), + NetworkPolicyIsUsedByUser(2210), + UnknownPasswordPolicy(2211), + PasswordPolicyAlreadyExists(2212), + IllegalPasswordPolicy(2213), + PasswordPolicyIsUsedByUser(2214), + InvalidPassword(2215), + RoleAlreadyExists(2216), + IllegalRole(2217), + IllegalUser(2218), // Meta api error codes. - DatabaseAlreadyExists(2301, false), - TableAlreadyExists(2302, false), - ViewAlreadyExists(2306, false), - CreateTableWithDropTime(2307, false), - UndropTableAlreadyExists(2308, false), - UndropTableHasNoHistory(2309, false), - CreateDatabaseWithDropTime(2310, false), - UndropDbHasNoHistory(2312, false), - UndropTableWithNoDropTime(2313, false), - DropTableWithDropTime(2314, false), - DropDbWithDropTime(2315, false), - UndropDbWithNoDropTime(2316, false), - TxnRetryMaxTimes(2317, false), + DatabaseAlreadyExists(2301), + TableAlreadyExists(2302), + ViewAlreadyExists(2306), + CreateTableWithDropTime(2307), + UndropTableAlreadyExists(2308), + UndropTableHasNoHistory(2309), + CreateDatabaseWithDropTime(2310), + UndropDbHasNoHistory(2312), + UndropTableWithNoDropTime(2313), + DropTableWithDropTime(2314), + DropDbWithDropTime(2315), + UndropDbWithNoDropTime(2316), + TxnRetryMaxTimes(2317), /// `CatalogNotSupported` should be raised when defining a catalog, which is: /// - not supported by the application, like creating a `HIVE` catalog but `HIVE` feature not enabled; /// - forbidden to create, like creating a `DEFAULT` catalog - CatalogNotSupported(2318, false), + CatalogNotSupported(2318), /// `CatalogAlreadyExists` should be raised when defining a catalog, which is: /// - having the same name as a already exist, like `default` /// - and without `IF NOT EXISTS` - CatalogAlreadyExists(2319, false), + CatalogAlreadyExists(2319), /// `CatalogNotFound` should be raised when trying to drop a catalog that is: /// - not exists. /// - and without `IF EXISTS` - CatalogNotFound(2320, false), + CatalogNotFound(2320), /// data mask error codes - DatamaskAlreadyExists(2321, false), + DatamaskAlreadyExists(2321), - CommitTableMetaError(2322, false), - CreateAsDropTableWithoutDropTime(2323, false), + CommitTableMetaError(2322), + CreateAsDropTableWithoutDropTime(2323), // Cluster error codes. - ClusterUnknownNode(2401, false), - ClusterNodeAlreadyExists(2402, false), + ClusterUnknownNode(2401), + ClusterNodeAlreadyExists(2402), // Stage error codes. - UnknownStage(2501, false), - StageAlreadyExists(2502, false), - IllegalUserStageFormat(2503, false), - StageFileAlreadyExists(2504, false), - IllegalStageFileFormat(2505, false), - StagePermissionDenied(2506, false), + UnknownStage(2501), + StageAlreadyExists(2502), + IllegalUserStageFormat(2503), + StageFileAlreadyExists(2504), + IllegalStageFileFormat(2505), + StagePermissionDenied(2506), // FileFormat error codes. - UnknownFileFormat(2507, false), - IllegalFileFormat(2508, false), - FileFormatAlreadyExists(2509, false), + UnknownFileFormat(2507), + IllegalFileFormat(2508), + FileFormatAlreadyExists(2509), // Connection error codes. - UnknownConnection(2510, false), - IllegalConnection(2511, false), - ConnectionAlreadyExists(2512, false), + UnknownConnection(2510), + IllegalConnection(2511), + ConnectionAlreadyExists(2512), // User defined function error codes. - IllegalUDFFormat(2601, false), - UnknownUDF(2602, false), - UdfAlreadyExists(2603, false), - UDFServerConnectError(2604, true), - UDFSchemaMismatch(2605, false), - UnsupportedDataType(2606, false), - UDFDataError(2607, false), + IllegalUDFFormat(2601), + UnknownUDF(2602), + UdfAlreadyExists(2603), + UDFServerConnectError(2604), + UDFSchemaMismatch(2605), + UnsupportedDataType(2606), + UDFDataError(2607), // Database error codes. - UnknownDatabaseEngine(2701, false), - UnknownTableEngine(2702, false), - UnsupportedEngineParams(2703, false), + UnknownDatabaseEngine(2701), + UnknownTableEngine(2702), + UnsupportedEngineParams(2703), // Share error codes. - ShareAlreadyExists(2705, false), - UnknownShare(2706, false), - UnknownShareId(2707, false), - ShareAccountsAlreadyExists(2708, false), - UnknownShareAccounts(2709, false), - WrongShareObject(2710, false), - WrongShare(2711, false), - ShareHasNoGrantedDatabase(2712, false), - ShareHasNoGrantedPrivilege(2713, false), - ShareEndpointAlreadyExists(2714, false), - UnknownShareEndpoint(2715, false), - UnknownShareEndpointId(2716, false), - CannotAccessShareTable(2717, false), - CannotShareDatabaseCreatedFromShare(2718, false), - ShareStorageError(2719, false), + ShareAlreadyExists(2705), + UnknownShare(2706), + UnknownShareId(2707), + ShareAccountsAlreadyExists(2708), + UnknownShareAccounts(2709), + WrongShareObject(2710), + WrongShare(2711), + ShareHasNoGrantedDatabase(2712), + ShareHasNoGrantedPrivilege(2713), + ShareEndpointAlreadyExists(2714), + UnknownShareEndpoint(2715), + UnknownShareEndpointId(2716), + CannotAccessShareTable(2717), + CannotShareDatabaseCreatedFromShare(2718), + ShareStorageError(2719), // Index error codes. - CreateIndexWithDropTime(2720, false), - IndexAlreadyExists(2721, false), - UnknownIndex(2722, false), - DropIndexWithDropTime(2723, false), - GetIndexWithDropTime(2724, false), - DuplicatedIndexColumnId(2725, false), - IndexColumnIdNotFound(2726, false), + CreateIndexWithDropTime(2720), + IndexAlreadyExists(2721), + UnknownIndex(2722), + DropIndexWithDropTime(2723), + GetIndexWithDropTime(2724), + DuplicatedIndexColumnId(2725), + IndexColumnIdNotFound(2726), // Stream error codes. - UnknownStream(2730, false), - UnknownStreamId(2731, false), - StreamAlreadyExists(2732, false), - IllegalStream(2733, false), - StreamVersionMismatched(2734, false), - WithOptionInvalid(2735, false), + UnknownStream(2730), + UnknownStreamId(2731), + StreamAlreadyExists(2732), + IllegalStream(2733), + StreamVersionMismatched(2734), + WithOptionInvalid(2735), // dynamic error codes. - IllegalDynamicTable(2740, false), + IllegalDynamicTable(2740), // Variable error codes. - UnknownVariable(2801, false), - OnlySupportAsciiChars(2802, false), - WrongValueForVariable(2803, false), + UnknownVariable(2801), + OnlySupportAsciiChars(2802), + WrongValueForVariable(2803), // Tenant quota error codes. - IllegalTenantQuotaFormat(2901, false), - TenantQuotaUnknown(2902, false), - TenantQuotaExceeded(2903, false), + IllegalTenantQuotaFormat(2901), + TenantQuotaUnknown(2902), + TenantQuotaExceeded(2903), // Script error codes. - ScriptSemanticError(3001, false), - ScriptExecutionError(3002, false), + ScriptSemanticError(3001), + ScriptExecutionError(3002), // sequence - SequenceError(3101, false), + SequenceError(3101), - // Share error codes(continue, false). - ErrorShareEndpointCredential(3111, false), - WrongSharePrivileges(3112, false), + // Share error codes(continue). + ErrorShareEndpointCredential(3111), + WrongSharePrivileges(3112), // dictionary - DictionaryAlreadyExists(3113, false), - UnknownDictionary(3114, false), - DictionarySourceError(3115, false), + DictionaryAlreadyExists(3113), + UnknownDictionary(3114), + DictionarySourceError(3115), // Procedure - UnknownProcedure(3130, false), - ProcedureAlreadyExists(3131, false), - IllegalProcedureFormat(3132, false), + UnknownProcedure(3130), + ProcedureAlreadyExists(3131), + IllegalProcedureFormat(3132), } // Storage errors [3001, 4000]. build_exceptions! { - StorageNotFound(3001, false), - StoragePermissionDenied(3002, false), - StorageUnavailable(3901, false), - StorageUnsupported(3902, false), - StorageInsecure(3903, false), - DeprecatedIndexFormat(3904, false), - InvalidOperation(3905, false), - StorageOther(4000, false), - UnresolvableConflict(4001, false), + StorageNotFound(3001), + StoragePermissionDenied(3002), + StorageUnavailable(3901), + StorageUnsupported(3902), + StorageInsecure(3903), + DeprecatedIndexFormat(3904), + InvalidOperation(3905), + StorageOther(4000), + UnresolvableConflict(4001), // transaction error codes - CurrentTransactionIsAborted(4002, false), - TransactionTimeout(4003, false), - InvalidSessionState(4004, false), + CurrentTransactionIsAborted(4002), + TransactionTimeout(4003), + InvalidSessionState(4004), // recluster error codes - NoNeedToRecluster(4011, false), - NoNeedToCompact(4012, false), + NoNeedToRecluster(4011), + NoNeedToCompact(4012), - RefreshTableInfoFailure(4012, false), + RefreshTableInfoFailure(4012), } // Service errors [5001,6000]. build_exceptions! { // A task that already stopped and can not stop twice. - AlreadyStopped(5002, false), + AlreadyStopped(5002), // auth related - AuthenticateFailure(5100, false), + AuthenticateFailure(5100), // the flowing 4 code is used by clients - SessionTokenExpired(5101, false), - RefreshTokenExpired(5102, false), - SessionTokenNotFound(5103, false), - RefreshTokenNotFound(5104, false) + SessionTokenExpired(5101), + RefreshTokenExpired(5102), + SessionTokenNotFound(5103), + RefreshTokenNotFound(5104) } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index 2716554c7a275..f2d42093c023a 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -65,7 +65,6 @@ impl From for ErrorCode { "anyhow", format!("{}, source: {:?}", error, error.source()), String::new(), - false, Some(Box::new(OtherErrors::AnyHow { error })), capture(), ) @@ -80,13 +79,13 @@ impl From<&str> for ErrorCode { impl From for ErrorCode { fn from(error: std::num::ParseIntError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } impl From for ErrorCode { fn from(error: std::str::ParseBoolError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } @@ -98,13 +97,13 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: std::num::ParseFloatError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } impl From for ErrorCode { fn from(error: std::num::TryFromIntError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } @@ -114,26 +113,26 @@ impl From for ErrorCode { arrow_schema::ArrowError::NotYetImplemented(v) => { ErrorCode::Unimplemented(format!("arrow: {v}")) } - v => ErrorCode::from_std_error(v, false), + v => ErrorCode::from_std_error(v), } } } impl From for ErrorCode { fn from(error: parquet::errors::ParquetError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } impl From for ErrorCode { fn from(error: bincode::error::EncodeError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } impl From for ErrorCode { fn from(error: bincode::error::DecodeError) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } @@ -144,7 +143,6 @@ impl From for ErrorCode { "EncodeError", format!("{error:?}"), String::new(), - false, None, capture(), ) @@ -158,7 +156,6 @@ impl From for ErrorCode { "DecodeError", format!("{error:?}"), String::new(), - false, None, capture(), ) @@ -167,13 +164,13 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: serde_json::Error) -> Self { - ErrorCode::from_std_error(error, false) + ErrorCode::from_std_error(error) } } impl From for ErrorCode { fn from(v: std::convert::Infallible) -> Self { - ErrorCode::from_std_error(v, false) + ErrorCode::from_std_error(v) } } @@ -191,7 +188,7 @@ impl From for ErrorCode { impl From for ErrorCode { fn from(error: http::Error) -> Self { - ErrorCode::from_std_error(error, true) + ErrorCode::from_std_error(error) } } @@ -296,7 +293,6 @@ impl From<&SerializedError> for ErrorCode { se.name.clone(), se.message.clone(), String::new(), - false, None, se.backtrace.clone(), ) @@ -370,7 +366,6 @@ impl From for ErrorCode { serialized_error.name, serialized_error.message, String::new(), - false, None, serialized_error.backtrace, ) diff --git a/src/common/exception/tests/it/exception_flight.rs b/src/common/exception/tests/it/exception_flight.rs index 03cdc29219564..59db53a0404ec 100644 --- a/src/common/exception/tests/it/exception_flight.rs +++ b/src/common/exception/tests/it/exception_flight.rs @@ -24,7 +24,6 @@ fn test_serialize() -> Result<()> { "test_name", String::from("test_message"), String::new(), - false, None, StackTrace::capture(), ) diff --git a/src/common/grpc/src/dns_resolver.rs b/src/common/grpc/src/dns_resolver.rs index 62062eb4abf2c..7d816eb502ad4 100644 --- a/src/common/grpc/src/dns_resolver.rs +++ b/src/common/grpc/src/dns_resolver.rs @@ -64,7 +64,6 @@ impl DNSResolver { error.name(), error.message(), String::new(), - true, None, error.backtrace(), )), diff --git a/src/query/service/src/servers/http/v1/query/execute_state.rs b/src/query/service/src/servers/http/v1/query/execute_state.rs index e38821bb0d7df..d1e2fe8e0bdfa 100644 --- a/src/query/service/src/servers/http/v1/query/execute_state.rs +++ b/src/query/service/src/servers/http/v1/query/execute_state.rs @@ -411,48 +411,39 @@ async fn execute( executor: Arc>, ) -> Result<(), ExecutionError> { let make_error = || format!("failed to execute {}", interpreter.name()); - let settings = ctx.get_settings(); - let mut query_max_failures = settings.get_query_max_failures().unwrap_or(0); - loop { - let mut data_stream = interpreter - .execute(ctx.clone()) - .await - .with_context(make_error)?; - match data_stream.next().await { - None => { - let block = DataBlock::empty_with_schema(schema); - block_sender.send(block, 0).await; - Executor::stop::<()>(&executor, Ok(())).await; - block_sender.close(); - } - Some(Err(err)) => { - // If the error is retryable, such as network error, we can retry multiple times - if err.is_retryable() && query_max_failures > 0 { - query_max_failures -= 1; - continue; - } - Executor::stop(&executor, Err(err)).await; - block_sender.close(); - } - Some(Ok(block)) => { - let size = block.num_rows(); - block_sender.send(block, size).await; - while let Some(block_r) = data_stream.next().await { - match block_r { - Ok(block) => { - block_sender.send(block.clone(), block.num_rows()).await; - } - Err(err) => { - block_sender.close(); - return Err(err.with_context(make_error())); - } - }; - } - Executor::stop::<()>(&executor, Ok(())).await; - block_sender.close(); + + let mut data_stream = interpreter + .execute(ctx.clone()) + .await + .with_context(make_error)?; + match data_stream.next().await { + None => { + let block = DataBlock::empty_with_schema(schema); + block_sender.send(block, 0).await; + Executor::stop::<()>(&executor, Ok(())).await; + block_sender.close(); + } + Some(Err(err)) => { + Executor::stop(&executor, Err(err)).await; + block_sender.close(); + } + Some(Ok(block)) => { + let size = block.num_rows(); + block_sender.send(block, size).await; + while let Some(block_r) = data_stream.next().await { + match block_r { + Ok(block) => { + block_sender.send(block.clone(), block.num_rows()).await; + } + Err(err) => { + block_sender.close(); + return Err(err.with_context(make_error())); + } + }; } + Executor::stop::<()>(&executor, Ok(())).await; + block_sender.close(); } - break; } Ok(()) } diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 903162fac1c61..e857372585a2a 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -1029,8 +1029,7 @@ impl DefaultSettings { } fn max_memory_usage() -> Result { - let memory_info = - sys_info::mem_info().map_err(|err| ErrorCode::from_std_error(err, false))?; + let memory_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; Ok(match GlobalConfig::try_get_instance() { None => 1024 * memory_info.total * 80 / 100, diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs index ee13d1beb4b12..5de6cb538442b 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs @@ -196,7 +196,7 @@ impl ReclusterMutator { } // Compute memory threshold and maximum number of blocks allowed for reclustering - let mem_info = sys_info::mem_info().map_err(|err| ErrorCode::from_std_error(err, false))?; + let mem_info = sys_info::mem_info().map_err(ErrorCode::from_std_error)?; let recluster_block_size = self.ctx.get_settings().get_recluster_block_size()? as usize; let memory_threshold = recluster_block_size.min(mem_info.avail as usize * 1024 * 30 / 100); // specify a rather small value, so that `recluster_block_size` might be tuned to lower value. diff --git a/src/query/storages/orc/src/hashable_schema.rs b/src/query/storages/orc/src/hashable_schema.rs index 9cc20f1745dd3..d2b07a848b404 100644 --- a/src/query/storages/orc/src/hashable_schema.rs +++ b/src/query/storages/orc/src/hashable_schema.rs @@ -37,8 +37,7 @@ pub struct HashableSchema { impl HashableSchema { pub fn try_create(arrow_schema: SchemaRef) -> Result { let table_schema = Arc::new( - TableSchema::try_from(arrow_schema.as_ref()) - .map_err(|err| ErrorCode::from_std_error(err, false))?, + TableSchema::try_from(arrow_schema.as_ref()).map_err(ErrorCode::from_std_error)?, ); let data_schema = Arc::new(DataSchema::from(table_schema.clone())); diff --git a/src/query/storages/orc/src/table.rs b/src/query/storages/orc/src/table.rs index 87f6ce45f24bb..d3da0a13008ab 100644 --- a/src/query/storages/orc/src/table.rs +++ b/src/query/storages/orc/src/table.rs @@ -83,8 +83,7 @@ impl OrcTable { let arrow_schema = Self::prepare_metas(first_file, operator.clone()).await?; let table_schema = Arc::new( - TableSchema::try_from(arrow_schema.as_ref()) - .map_err(|err| ErrorCode::from_std_error(err, false))?, + TableSchema::try_from(arrow_schema.as_ref()).map_err(ErrorCode::from_std_error)?, ); let table_info = create_orc_table_info(table_schema.clone(), stage_info)?; diff --git a/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs b/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs index d8894dbeec9b3..15c0c68b68943 100644 --- a/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs +++ b/src/query/storages/parquet/src/parquet_rs/parquet_reader/row_group.rs @@ -296,9 +296,7 @@ where T: Send + 'static, { match databend_common_base::runtime::try_spawn_blocking(f) { - Ok(handler) => handler - .await - .map_err(|err| ErrorCode::from_std_error(err, true))?, + Ok(handler) => handler.await.map_err(ErrorCode::from_std_error)?, Err(f) => f(), } } diff --git a/src/query/storages/parquet/src/parquet_rs/schema.rs b/src/query/storages/parquet/src/parquet_rs/schema.rs index 1bd93d428104a..062ad73037b56 100644 --- a/src/query/storages/parquet/src/parquet_rs/schema.rs +++ b/src/query/storages/parquet/src/parquet_rs/schema.rs @@ -59,5 +59,5 @@ pub(crate) fn arrow_to_table_schema( }) .collect::>(); let schema = ArrowSchema::new_with_metadata(fields, schema.metadata().clone()); - TableSchema::try_from(&schema).map_err(|err| ErrorCode::from_std_error(err, false)) + TableSchema::try_from(&schema).map_err(ErrorCode::from_std_error) } From 3e7bb6a20fa177289879493e16aab7b43f1cc5dd Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 5 Dec 2024 15:41:27 +0800 Subject: [PATCH 3/4] cluster do_action retry --- Cargo.lock | 1 + src/common/exception/Cargo.toml | 1 + src/common/exception/src/exception_into.rs | 7 ++ src/query/service/src/clusters/cluster.rs | 67 ++++++++++++++----- src/query/service/src/clusters/mod.rs | 1 + .../src/interpreters/interpreter_kill.rs | 9 ++- .../interpreters/interpreter_set_priority.rs | 9 ++- .../interpreters/interpreter_system_action.rs | 9 ++- .../interpreter_table_truncate.rs | 9 ++- .../src/servers/admin/v1/query_profiling.rs | 8 ++- .../flight/v1/exchange/exchange_manager.rs | 13 ++-- .../flight/v1/packets/packet_executor.rs | 2 +- .../flight/v1/packets/packet_publisher.rs | 5 +- src/query/settings/src/settings_default.rs | 20 +++--- .../settings/src/settings_getter_setter.rs | 17 +++-- 15 files changed, 128 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f190d2e76270..b4ad0cfb6dddc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3243,6 +3243,7 @@ dependencies = [ "geozero", "gimli 0.31.1", "http 1.1.0", + "hyper 1.4.1", "libc", "object", "once_cell", diff --git a/src/common/exception/Cargo.toml b/src/common/exception/Cargo.toml index bec53de426861..d4071ff4f7578 100644 --- a/src/common/exception/Cargo.toml +++ b/src/common/exception/Cargo.toml @@ -21,6 +21,7 @@ bincode = { workspace = true } geozero = { workspace = true } gimli = { workspace = true } http = { workspace = true } +hyper = { workspace = true } libc = { workspace = true } object = { workspace = true } once_cell = { workspace = true } diff --git a/src/common/exception/src/exception_into.rs b/src/common/exception/src/exception_into.rs index f2d42093c023a..43b4d185b8d8a 100644 --- a/src/common/exception/src/exception_into.rs +++ b/src/common/exception/src/exception_into.rs @@ -353,6 +353,13 @@ impl From for ErrorCode { tonic::Code::Unknown => { let details = status.details(); if details.is_empty() { + if status.source().map_or(false, |e| e.is::()) { + return ErrorCode::CannotConnectNode(format!( + "{}, source: {:?}", + status.message(), + status.source() + )); + } return ErrorCode::UnknownException(format!( "{}, source: {:?}", status.message(), diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index 9c66a8b5770cc..07bedc6f3bebe 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -50,12 +50,14 @@ use futures::future::Either; use futures::Future; use futures::StreamExt; use log::error; +use log::info; use log::warn; use parking_lot::RwLock; use rand::thread_rng; use rand::Rng; use serde::Deserialize; use serde::Serialize; +use tokio::time::sleep; use crate::servers::flight::FlightClient; @@ -81,11 +83,11 @@ pub trait ClusterHelper { fn get_nodes(&self) -> Vec>; - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result>; } @@ -118,11 +120,11 @@ impl ClusterHelper for Cluster { self.nodes.to_vec() } - async fn do_action Deserialize<'de> + Send>( + async fn do_action Deserialize<'de> + Send>( &self, path: &str, message: HashMap, - timeout: u64, + flight_params: FlightParams, ) -> Result> { fn get_node<'a>(nodes: &'a [Arc], id: &str) -> Result<&'a Arc> { for node in nodes { @@ -137,23 +139,47 @@ impl ClusterHelper for Cluster { ))) } - let mut response = HashMap::with_capacity(message.len()); + let mut futures = Vec::with_capacity(message.len()); for (id, message) in message { let node = get_node(&self.nodes, &id)?; - let config = GlobalConfig::instance(); - let flight_address = node.flight_address.clone(); - let node_secret = node.secret.clone(); - - let mut conn = create_client(&config, &flight_address).await?; - response.insert( - id, - conn.do_action::<_, Res>(path, node_secret, message, timeout) - .await?, - ); + futures.push({ + let config = GlobalConfig::instance(); + let flight_address = node.flight_address.clone(); + let node_secret = node.secret.clone(); + + async move { + let mut attempt = 0; + + loop { + let mut conn = create_client(&config, &flight_address).await?; + match conn + .do_action::<_, Res>( + path, + node_secret.clone(), + message.clone(), + flight_params.timeout, + ) + .await + { + Ok(result) => return Ok((id, result)), + Err(e) + if e.code() == ErrorCode::CANNOT_CONNECT_NODE + && attempt < flight_params.retry_times => + { + // only retry when error is network problem + info!("retry do_action, attempt: {}", attempt); + attempt += 1; + sleep(Duration::from_secs(flight_params.retry_interval)).await; + } + Err(e) => return Err(e), + } + } + } + }); } - - Ok(response) + let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?; + Ok(responses.into_iter().collect::>()) } } @@ -537,3 +563,10 @@ pub async fn create_client(config: &InnerConfig, address: &str) -> Result Result { let cluster = self.ctx.get_cluster(); let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_max_flight_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let mut message = HashMap::with_capacity(cluster.nodes.len()); @@ -65,7 +70,7 @@ impl KillInterpreter { } let res = cluster - .do_action::<_, bool>(KILL_QUERY, message, timeout) + .do_action::<_, bool>(KILL_QUERY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_set_priority.rs b/src/query/service/src/interpreters/interpreter_set_priority.rs index 0dda6b9dd656b..05f64b30af65a 100644 --- a/src/query/service/src/interpreters/interpreter_set_priority.rs +++ b/src/query/service/src/interpreters/interpreter_set_priority.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::SetPriorityPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SET_PRIORITY; @@ -61,9 +62,13 @@ impl SetPriorityInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_max_flight_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let res = cluster - .do_action::<_, bool>(SET_PRIORITY, message, timeout) + .do_action::<_, bool>(SET_PRIORITY, message, flight_params) .await?; match res.values().any(|x| *x) { diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index 86e747e865ee7..c3570923ff9d2 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -22,6 +22,7 @@ use databend_common_sql::plans::SystemAction; use databend_common_sql::plans::SystemPlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::SYSTEM_ACTION; @@ -74,9 +75,13 @@ impl Interpreter for SystemActionInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_max_flight_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(SYSTEM_ACTION, message, timeout) + .do_action::<_, ()>(SYSTEM_ACTION, message, flight_params) .await?; } diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 09a19f79cc43d..850ef56f0303c 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -21,6 +21,7 @@ use databend_common_exception::Result; use databend_common_sql::plans::TruncateTablePlan; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::interpreters::Interpreter; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::actions::TRUNCATE_TABLE; @@ -95,9 +96,13 @@ impl Interpreter for TruncateTableInterpreter { } let settings = self.ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_max_flight_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; cluster - .do_action::<_, ()>(TRUNCATE_TABLE, message, timeout) + .do_action::<_, ()>(TRUNCATE_TABLE, message, flight_params) .await?; } diff --git a/src/query/service/src/servers/admin/v1/query_profiling.rs b/src/query/service/src/servers/admin/v1/query_profiling.rs index 649c16baeb3a2..a479f228ac17a 100644 --- a/src/query/service/src/servers/admin/v1/query_profiling.rs +++ b/src/query/service/src/servers/admin/v1/query_profiling.rs @@ -30,6 +30,7 @@ use poem::IntoResponse; use crate::clusters::ClusterDiscovery; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::GET_PROFILE; use crate::sessions::SessionManager; @@ -104,8 +105,13 @@ async fn get_cluster_profile(query_id: &str) -> Result, ErrorCo } } + let flight_params = FlightParams { + timeout: 60, + retry_times: 3, + retry_interval: 3, + }; let res = cluster - .do_action::<_, Option>>(GET_PROFILE, message, 60) + .do_action::<_, Option>>(GET_PROFILE, message, flight_params) .await?; match res.into_values().find(Option::is_some) { diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index 3f929c99f594b..c33fc0c1ca1d4 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -52,6 +52,7 @@ use super::exchange_transform::ExchangeTransform; use super::statistics_receiver::StatisticsReceiver; use super::statistics_sender::StatisticsSender; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -416,13 +417,17 @@ impl DataExchangeManager { actions: QueryFragmentsActions, ) -> Result { let settings = ctx.get_settings(); - let timeout = settings.get_flight_client_timeout()?; + let flight_params = FlightParams { + timeout: settings.get_flight_client_timeout()?, + retry_times: settings.get_max_flight_retry_times()?, + retry_interval: settings.get_flight_retry_interval()?, + }; let root_actions = actions.get_root_actions()?; let conf = GlobalConfig::instance(); // Initialize query env between cluster nodes let query_env = actions.get_query_env()?; - query_env.init(&ctx, timeout).await?; + query_env.init(&ctx, flight_params).await?; // Submit distributed tasks to all nodes. let cluster = ctx.get_cluster(); @@ -431,7 +436,7 @@ impl DataExchangeManager { let local_fragments = query_fragments.remove(&conf.query.node_id); let _: HashMap = cluster - .do_action(INIT_QUERY_FRAGMENTS, query_fragments, timeout) + .do_action(INIT_QUERY_FRAGMENTS, query_fragments, flight_params) .await?; self.set_ctx(&ctx.get_id(), ctx.clone())?; @@ -444,7 +449,7 @@ impl DataExchangeManager { let prepared_query = actions.prepared_query()?; let _: HashMap = cluster - .do_action(START_PREPARED_QUERY, prepared_query, timeout) + .do_action(START_PREPARED_QUERY, prepared_query, flight_params) .await?; Ok(build_res) diff --git a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs index c555184d82912..29999b7727b31 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_executor.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_executor.rs @@ -14,7 +14,7 @@ use crate::servers::flight::v1::packets::QueryFragment; -#[derive(Debug, serde::Serialize, serde::Deserialize)] +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] pub struct QueryFragments { pub query_id: String, pub fragments: Vec, diff --git a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs index 33c5f20e11b7d..12bb478627759 100644 --- a/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs +++ b/src/query/service/src/servers/flight/v1/packets/packet_publisher.rs @@ -34,6 +34,7 @@ use serde::Deserialize; use serde::Serialize; use crate::clusters::ClusterHelper; +use crate::clusters::FlightParams; use crate::servers::flight::v1::actions::INIT_QUERY_ENV; use crate::sessions::QueryContext; use crate::sessions::SessionManager; @@ -140,7 +141,7 @@ pub struct QueryEnv { } impl QueryEnv { - pub async fn init(&self, ctx: &Arc, timeout: u64) -> Result<()> { + pub async fn init(&self, ctx: &Arc, flight_params: FlightParams) -> Result<()> { debug!("Dataflow diagram {:?}", self.dataflow_diagram); let cluster = ctx.get_cluster(); @@ -151,7 +152,7 @@ impl QueryEnv { } let _ = cluster - .do_action::<_, ()>(INIT_QUERY_ENV, message, timeout) + .do_action::<_, ()>(INIT_QUERY_ENV, message, flight_params) .await?; Ok(()) diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index c9eb0af8b94b6..e0219887415a5 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -952,19 +952,23 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), -<<<<<<< HEAD - ("query_max_failures", DefaultSettingValue { - value: UserSettingValue::UInt64(0), - desc: "Sets the query maximum failure retry times.", - mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=5)), -======= ("persist_materialized_cte", DefaultSettingValue { value: UserSettingValue::UInt64(0), // 0 for in-memory, 1 for disk desc: "Decides if materialized CTEs should be persisted to disk.", mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), ->>>>>>> main + }), + ("max_flight_connection_retry_times", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The maximum retry count for cluster flight. Disable if 0.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=30)), + }), + ("flight_connection_retry_interval", DefaultSettingValue { + value: UserSettingValue::UInt64(3), + desc: "The retry interval of cluster flight is in seconds.", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=900)), }), ]); diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index fdcd600436097..d591457642e91 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -822,16 +822,15 @@ impl Settings { Ok(self.try_get_u64("enable_distributed_pruning")? == 1) } -<<<<<<< HEAD - pub fn get_query_max_failures(&self) -> Result { - self.try_get_u64("query_max_failures") - } - - pub fn set_query_max_failures(&self, val: u64) -> Result<()> { - self.try_set_u64("query_max_failures", val) -======= pub fn get_persist_materialized_cte(&self) -> Result { Ok(self.try_get_u64("persist_materialized_cte")? != 0) ->>>>>>> main + } + + pub fn get_flight_retry_interval(&self) -> Result { + self.try_get_u64("flight_connection_retry_interval") + } + + pub fn get_max_flight_retry_times(&self) -> Result { + self.try_get_u64("max_flight_connection_retry_times") } } From e91f86d4022f05cdf7731526561b686c36ec2b5c Mon Sep 17 00:00:00 2001 From: baishen Date: Thu, 5 Dec 2024 16:55:10 +0800 Subject: [PATCH 4/4] fix --- src/query/service/src/interpreters/interpreter_kill.rs | 2 +- .../service/src/interpreters/interpreter_set_priority.rs | 2 +- .../service/src/interpreters/interpreter_system_action.rs | 2 +- .../src/interpreters/interpreter_table_truncate.rs | 2 +- .../src/servers/flight/v1/exchange/exchange_manager.rs | 2 +- src/query/settings/src/settings_default.rs | 6 +++--- src/query/settings/src/settings_getter_setter.rs | 8 ++++---- 7 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_kill.rs b/src/query/service/src/interpreters/interpreter_kill.rs index 96655f4bd6176..72309f0c2ecf8 100644 --- a/src/query/service/src/interpreters/interpreter_kill.rs +++ b/src/query/service/src/interpreters/interpreter_kill.rs @@ -57,7 +57,7 @@ impl KillInterpreter { let settings = self.ctx.get_settings(); let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_max_flight_retry_times()?, + retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; diff --git a/src/query/service/src/interpreters/interpreter_set_priority.rs b/src/query/service/src/interpreters/interpreter_set_priority.rs index 05f64b30af65a..1bf830b24be01 100644 --- a/src/query/service/src/interpreters/interpreter_set_priority.rs +++ b/src/query/service/src/interpreters/interpreter_set_priority.rs @@ -64,7 +64,7 @@ impl SetPriorityInterpreter { let settings = self.ctx.get_settings(); let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_max_flight_retry_times()?, + retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; let res = cluster diff --git a/src/query/service/src/interpreters/interpreter_system_action.rs b/src/query/service/src/interpreters/interpreter_system_action.rs index c3570923ff9d2..ca7e3ffb50c7d 100644 --- a/src/query/service/src/interpreters/interpreter_system_action.rs +++ b/src/query/service/src/interpreters/interpreter_system_action.rs @@ -77,7 +77,7 @@ impl Interpreter for SystemActionInterpreter { let settings = self.ctx.get_settings(); let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_max_flight_retry_times()?, + retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; cluster diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 850ef56f0303c..70afa08e38660 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -98,7 +98,7 @@ impl Interpreter for TruncateTableInterpreter { let settings = self.ctx.get_settings(); let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_max_flight_retry_times()?, + retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; cluster diff --git a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs index c33fc0c1ca1d4..1cbb961f798d7 100644 --- a/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs +++ b/src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs @@ -419,7 +419,7 @@ impl DataExchangeManager { let settings = ctx.get_settings(); let flight_params = FlightParams { timeout: settings.get_flight_client_timeout()?, - retry_times: settings.get_max_flight_retry_times()?, + retry_times: settings.get_flight_max_retry_times()?, retry_interval: settings.get_flight_retry_interval()?, }; let root_actions = actions.get_root_actions()?; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index e0219887415a5..ab561c5b43c9c 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -958,17 +958,17 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(0..=1)), }), - ("max_flight_connection_retry_times", DefaultSettingValue { + ("flight_connection_max_retry_times", DefaultSettingValue { value: UserSettingValue::UInt64(3), desc: "The maximum retry count for cluster flight. Disable if 0.", mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=30)), + range: Some(SettingRange::Numeric(0..=10)), }), ("flight_connection_retry_interval", DefaultSettingValue { value: UserSettingValue::UInt64(3), desc: "The retry interval of cluster flight is in seconds.", mode: SettingMode::Both, - range: Some(SettingRange::Numeric(0..=900)), + range: Some(SettingRange::Numeric(0..=30)), }), ]); diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index d591457642e91..b76b50d9dc334 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -826,11 +826,11 @@ impl Settings { Ok(self.try_get_u64("persist_materialized_cte")? != 0) } - pub fn get_flight_retry_interval(&self) -> Result { - self.try_get_u64("flight_connection_retry_interval") + pub fn get_flight_max_retry_times(&self) -> Result { + self.try_get_u64("flight_connection_max_retry_times") } - pub fn get_max_flight_retry_times(&self) -> Result { - self.try_get_u64("max_flight_connection_retry_times") + pub fn get_flight_retry_interval(&self) -> Result { + self.try_get_u64("flight_connection_retry_interval") } }