Skip to content

Commit bcf3a6c

Browse files
authored
Merge pull request #1054 from muzarski/serializable_request_error
errors: remove ParseError
2 parents ab07be6 + 62c661b commit bcf3a6c

File tree

22 files changed

+574
-305
lines changed

22 files changed

+574
-305
lines changed

scylla-cql/src/frame/frame_errors.rs

Lines changed: 108 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,61 +1,129 @@
1+
use std::error::Error;
12
use std::sync::Arc;
23

4+
pub use super::request::{
5+
auth_response::AuthResponseSerializationError,
6+
batch::{BatchSerializationError, BatchStatementSerializationError},
7+
execute::ExecuteSerializationError,
8+
prepare::PrepareSerializationError,
9+
query::{QueryParametersSerializationError, QuerySerializationError},
10+
register::RegisterSerializationError,
11+
startup::StartupSerializationError,
12+
};
13+
314
use super::response::CqlResponseKind;
415
use super::TryFromPrimitiveError;
5-
use crate::cql_to_rust::CqlTypeError;
6-
use crate::frame::value::SerializeValuesError;
7-
use crate::types::deserialize::{DeserializationError, TypeCheckError};
8-
use crate::types::serialize::SerializationError;
16+
use crate::types::deserialize::DeserializationError;
917
use thiserror::Error;
1018

11-
#[derive(Error, Debug)]
12-
pub enum FrameError {
13-
#[error(transparent)]
14-
Parse(#[from] ParseError),
19+
/// An error returned by `parse_response_body_extensions`.
20+
///
21+
/// It represents an error that occurred during deserialization of
22+
/// frame body extensions. These extensions include tracing id,
23+
/// warnings and custom payload.
24+
///
25+
/// Possible error kinds:
26+
/// - failed to decompress frame body (decompression is required for further deserialization)
27+
/// - failed to deserialize tracing id (body ext.)
28+
/// - failed to deserialize warnings list (body ext.)
29+
/// - failed to deserialize custom payload map (body ext.)
30+
#[derive(Error, Debug, Clone)]
31+
#[non_exhaustive]
32+
pub enum FrameBodyExtensionsParseError {
33+
/// Frame is compressed, but no compression was negotiated for the connection.
1534
#[error("Frame is compressed, but no compression negotiated for connection.")]
1635
NoCompressionNegotiated,
36+
37+
/// Failed to deserialize frame trace id.
38+
#[error("Malformed trace id: {0}")]
39+
TraceIdParse(LowLevelDeserializationError),
40+
41+
/// Failed to deserialize warnings attached to frame.
42+
#[error("Malformed warnings list: {0}")]
43+
WarningsListParse(LowLevelDeserializationError),
44+
45+
/// Failed to deserialize frame's custom payload.
46+
#[error("Malformed custom payload map: {0}")]
47+
CustomPayloadMapParse(LowLevelDeserializationError),
48+
49+
/// Failed to decompress frame body (snap).
50+
#[error("Snap decompression error: {0}")]
51+
SnapDecompressError(Arc<dyn Error + Sync + Send>),
52+
53+
/// Failed to decompress frame body (lz4).
54+
#[error("Error decompressing lz4 data {0}")]
55+
Lz4DecompressError(Arc<dyn Error + Sync + Send>),
56+
}
57+
58+
/// An error that occurred during frame header deserialization.
59+
#[derive(Debug, Error)]
60+
#[non_exhaustive]
61+
pub enum FrameHeaderParseError {
62+
/// Failed to read the frame header from the socket.
63+
#[error("Failed to read the frame header: {0}")]
64+
HeaderIoError(std::io::Error),
65+
66+
/// Received a frame marked as coming from a client.
1767
#[error("Received frame marked as coming from a client")]
1868
FrameFromClient,
69+
70+
// FIXME: this should not belong here. User always expects a frame from server.
71+
// This variant is only used in scylla-proxy - need to investigate it later.
1972
#[error("Received frame marked as coming from the server")]
2073
FrameFromServer,
74+
75+
/// Received a frame with unsupported version.
2176
#[error("Received a frame from version {0}, but only 4 is supported")]
2277
VersionNotSupported(u8),
78+
79+
/// Received unknown response opcode.
80+
#[error("Unrecognized response opcode {0}")]
81+
UnknownResponseOpcode(#[from] TryFromPrimitiveError<u8>),
82+
83+
/// Failed to read frame body from the socket.
84+
#[error("Failed to read a chunk of response body. Expected {0} more bytes, error: {1}")]
85+
BodyChunkIoError(usize, std::io::Error),
86+
87+
/// Connection was closed before whole frame was read.
2388
#[error("Connection was closed before body was read: missing {0} out of {1}")]
2489
ConnectionClosed(usize, usize),
25-
#[error("Frame decompression failed.")]
26-
FrameDecompression,
27-
#[error("Frame compression failed.")]
28-
FrameCompression,
29-
#[error(transparent)]
30-
StdIoError(#[from] std::io::Error),
31-
#[error("Unrecognized opcode{0}")]
32-
TryFromPrimitiveError(#[from] TryFromPrimitiveError<u8>),
33-
#[error("Error compressing lz4 data {0}")]
34-
Lz4CompressError(#[from] lz4_flex::block::CompressError),
35-
#[error("Error decompressing lz4 data {0}")]
36-
Lz4DecompressError(#[from] lz4_flex::block::DecompressError),
3790
}
3891

39-
#[derive(Error, Debug)]
40-
pub enum ParseError {
41-
#[error("Low-level deserialization failed: {0}")]
42-
LowLevelDeserializationError(#[from] LowLevelDeserializationError),
43-
#[error("Could not serialize frame: {0}")]
44-
BadDataToSerialize(String),
45-
#[error("Could not deserialize frame: {0}")]
46-
BadIncomingData(String),
47-
#[error(transparent)]
48-
DeserializationError(#[from] DeserializationError),
49-
#[error(transparent)]
50-
DeserializationTypeCheckError(#[from] TypeCheckError),
51-
#[error(transparent)]
52-
IoError(#[from] std::io::Error),
53-
#[error(transparent)]
54-
SerializeValuesError(#[from] SerializeValuesError),
55-
#[error(transparent)]
56-
SerializationError(#[from] SerializationError),
57-
#[error(transparent)]
58-
CqlTypeError(#[from] CqlTypeError),
92+
/// An error that occurred during CQL request serialization.
93+
#[non_exhaustive]
94+
#[derive(Error, Debug, Clone)]
95+
pub enum CqlRequestSerializationError {
96+
/// Failed to serialize STARTUP request.
97+
#[error("Failed to serialize STARTUP request: {0}")]
98+
StartupSerialization(#[from] StartupSerializationError),
99+
100+
/// Failed to serialize REGISTER request.
101+
#[error("Failed to serialize REGISTER request: {0}")]
102+
RegisterSerialization(#[from] RegisterSerializationError),
103+
104+
/// Failed to serialize AUTH_RESPONSE request.
105+
#[error("Failed to serialize AUTH_RESPONSE request: {0}")]
106+
AuthResponseSerialization(#[from] AuthResponseSerializationError),
107+
108+
/// Failed to serialize BATCH request.
109+
#[error("Failed to serialize BATCH request: {0}")]
110+
BatchSerialization(#[from] BatchSerializationError),
111+
112+
/// Failed to serialize PREPARE request.
113+
#[error("Failed to serialize PREPARE request: {0}")]
114+
PrepareSerialization(#[from] PrepareSerializationError),
115+
116+
/// Failed to serialize EXECUTE request.
117+
#[error("Failed to serialize EXECUTE request: {0}")]
118+
ExecuteSerialization(#[from] ExecuteSerializationError),
119+
120+
/// Failed to serialize QUERY request.
121+
#[error("Failed to serialize QUERY request: {0}")]
122+
QuerySerialization(#[from] QuerySerializationError),
123+
124+
/// Request body compression failed.
125+
#[error("Snap compression error: {0}")]
126+
SnapCompressError(Arc<dyn Error + Sync + Send>),
59127
}
60128

61129
/// An error type returned when deserialization of CQL

scylla-cql/src/frame/mod.rs

Lines changed: 33 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,16 @@ pub mod value;
99
#[cfg(test)]
1010
mod value_tests;
1111

12-
use crate::frame::frame_errors::FrameError;
1312
use bytes::{Buf, BufMut, Bytes};
13+
use frame_errors::{
14+
CqlRequestSerializationError, FrameBodyExtensionsParseError, FrameHeaderParseError,
15+
};
1416
use thiserror::Error;
1517
use tokio::io::{AsyncRead, AsyncReadExt};
1618
use uuid::Uuid;
1719

1820
use std::fmt::Display;
21+
use std::sync::Arc;
1922
use std::{collections::HashMap, convert::TryFrom};
2023

2124
use request::SerializableRequest;
@@ -72,7 +75,7 @@ impl SerializedRequest {
7275
req: &R,
7376
compression: Option<Compression>,
7477
tracing: bool,
75-
) -> Result<SerializedRequest, FrameError> {
78+
) -> Result<SerializedRequest, CqlRequestSerializationError> {
7679
let mut flags = 0;
7780
let mut data = vec![0; HEADER_SIZE];
7881

@@ -128,19 +131,22 @@ impl Default for FrameParams {
128131

129132
pub async fn read_response_frame(
130133
reader: &mut (impl AsyncRead + Unpin),
131-
) -> Result<(FrameParams, ResponseOpcode, Bytes), FrameError> {
134+
) -> Result<(FrameParams, ResponseOpcode, Bytes), FrameHeaderParseError> {
132135
let mut raw_header = [0u8; HEADER_SIZE];
133-
reader.read_exact(&mut raw_header[..]).await?;
136+
reader
137+
.read_exact(&mut raw_header[..])
138+
.await
139+
.map_err(FrameHeaderParseError::HeaderIoError)?;
134140

135141
let mut buf = &raw_header[..];
136142

137143
// TODO: Validate version
138144
let version = buf.get_u8();
139145
if version & 0x80 != 0x80 {
140-
return Err(FrameError::FrameFromClient);
146+
return Err(FrameHeaderParseError::FrameFromClient);
141147
}
142148
if version & 0x7F != 0x04 {
143-
return Err(FrameError::VersionNotSupported(version & 0x7f));
149+
return Err(FrameHeaderParseError::VersionNotSupported(version & 0x7f));
144150
}
145151

146152
let flags = buf.get_u8();
@@ -159,10 +165,12 @@ pub async fn read_response_frame(
159165

160166
let mut raw_body = Vec::with_capacity(length).limit(length);
161167
while raw_body.has_remaining_mut() {
162-
let n = reader.read_buf(&mut raw_body).await?;
168+
let n = reader.read_buf(&mut raw_body).await.map_err(|err| {
169+
FrameHeaderParseError::BodyChunkIoError(raw_body.remaining_mut(), err)
170+
})?;
163171
if n == 0 {
164172
// EOF, too early
165-
return Err(FrameError::ConnectionClosed(
173+
return Err(FrameHeaderParseError::ConnectionClosed(
166174
raw_body.remaining_mut(),
167175
length,
168176
));
@@ -183,18 +191,19 @@ pub fn parse_response_body_extensions(
183191
flags: u8,
184192
compression: Option<Compression>,
185193
mut body: Bytes,
186-
) -> Result<ResponseBodyWithExtensions, FrameError> {
194+
) -> Result<ResponseBodyWithExtensions, FrameBodyExtensionsParseError> {
187195
if flags & FLAG_COMPRESSION != 0 {
188196
if let Some(compression) = compression {
189197
body = decompress(&body, compression)?.into();
190198
} else {
191-
return Err(FrameError::NoCompressionNegotiated);
199+
return Err(FrameBodyExtensionsParseError::NoCompressionNegotiated);
192200
}
193201
}
194202

195203
let trace_id = if flags & FLAG_TRACING != 0 {
196204
let buf = &mut &*body;
197-
let trace_id = types::read_uuid(buf).map_err(frame_errors::ParseError::from)?;
205+
let trace_id =
206+
types::read_uuid(buf).map_err(FrameBodyExtensionsParseError::TraceIdParse)?;
198207
body.advance(16);
199208
Some(trace_id)
200209
} else {
@@ -204,7 +213,8 @@ pub fn parse_response_body_extensions(
204213
let warnings = if flags & FLAG_WARNING != 0 {
205214
let body_len = body.len();
206215
let buf = &mut &*body;
207-
let warnings = types::read_string_list(buf).map_err(frame_errors::ParseError::from)?;
216+
let warnings = types::read_string_list(buf)
217+
.map_err(FrameBodyExtensionsParseError::WarningsListParse)?;
208218
let buf_len = buf.len();
209219
body.advance(body_len - buf_len);
210220
warnings
@@ -215,7 +225,8 @@ pub fn parse_response_body_extensions(
215225
let custom_payload = if flags & FLAG_CUSTOM_PAYLOAD != 0 {
216226
let body_len = body.len();
217227
let buf = &mut &*body;
218-
let payload_map = types::read_bytes_map(buf).map_err(frame_errors::ParseError::from)?;
228+
let payload_map = types::read_bytes_map(buf)
229+
.map_err(FrameBodyExtensionsParseError::CustomPayloadMapParse)?;
219230
let buf_len = buf.len();
220231
body.advance(body_len - buf_len);
221232
Some(payload_map)
@@ -235,7 +246,7 @@ fn compress_append(
235246
uncomp_body: &[u8],
236247
compression: Compression,
237248
out: &mut Vec<u8>,
238-
) -> Result<(), FrameError> {
249+
) -> Result<(), CqlRequestSerializationError> {
239250
match compression {
240251
Compression::Lz4 => {
241252
let uncomp_len = uncomp_body.len() as u32;
@@ -250,23 +261,27 @@ fn compress_append(
250261
out.resize(old_size + snap::raw::max_compress_len(uncomp_body.len()), 0);
251262
let compressed_size = snap::raw::Encoder::new()
252263
.compress(uncomp_body, &mut out[old_size..])
253-
.map_err(|_| FrameError::FrameCompression)?;
264+
.map_err(|err| CqlRequestSerializationError::SnapCompressError(Arc::new(err)))?;
254265
out.truncate(old_size + compressed_size);
255266
Ok(())
256267
}
257268
}
258269
}
259270

260-
fn decompress(mut comp_body: &[u8], compression: Compression) -> Result<Vec<u8>, FrameError> {
271+
fn decompress(
272+
mut comp_body: &[u8],
273+
compression: Compression,
274+
) -> Result<Vec<u8>, FrameBodyExtensionsParseError> {
261275
match compression {
262276
Compression::Lz4 => {
263277
let uncomp_len = comp_body.get_u32() as usize;
264-
let uncomp_body = lz4_flex::decompress(comp_body, uncomp_len)?;
278+
let uncomp_body = lz4_flex::decompress(comp_body, uncomp_len)
279+
.map_err(|err| FrameBodyExtensionsParseError::Lz4DecompressError(Arc::new(err)))?;
265280
Ok(uncomp_body)
266281
}
267282
Compression::Snappy => snap::raw::Decoder::new()
268283
.decompress_vec(comp_body)
269-
.map_err(|_| FrameError::FrameDecompression),
284+
.map_err(|err| FrameBodyExtensionsParseError::SnapDecompressError(Arc::new(err))),
270285
}
271286
}
272287

scylla-cql/src/frame/request/auth_response.rs

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
use crate::frame::frame_errors::ParseError;
1+
use std::num::TryFromIntError;
2+
3+
use thiserror::Error;
4+
5+
use crate::frame::frame_errors::CqlRequestSerializationError;
26

37
use crate::frame::request::{RequestOpcode, SerializableRequest};
48
use crate::frame::types::write_bytes_opt;
@@ -11,7 +15,17 @@ pub struct AuthResponse {
1115
impl SerializableRequest for AuthResponse {
1216
const OPCODE: RequestOpcode = RequestOpcode::AuthResponse;
1317

14-
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), ParseError> {
15-
Ok(write_bytes_opt(self.response.as_ref(), buf)?)
18+
fn serialize(&self, buf: &mut Vec<u8>) -> Result<(), CqlRequestSerializationError> {
19+
Ok(write_bytes_opt(self.response.as_ref(), buf)
20+
.map_err(AuthResponseSerializationError::ResponseSerialization)?)
1621
}
1722
}
23+
24+
/// An error type returned when serialization of AUTH_RESPONSE request fails.
25+
#[non_exhaustive]
26+
#[derive(Error, Debug, Clone)]
27+
pub enum AuthResponseSerializationError {
28+
/// Maximum response's body length exceeded.
29+
#[error("AUTH_RESPONSE body bytes length too big: {0}")]
30+
ResponseSerialization(TryFromIntError),
31+
}

0 commit comments

Comments
 (0)