Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 19 additions & 5 deletions lib/executor/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ use hive_router_query_planner::planner::plan_nodes::{FetchNode, FetchRewrite, Qu

use crate::{
headers::plan::ResponseHeaderAggregator,
response::{graphql_error::GraphQLError, storage::ResponsesStorage, value::Value},
response::{
graphql_error::{GraphQLError, GraphQLErrorPath},
storage::ResponsesStorage,
value::Value,
},
};

pub struct ExecutionContext<'a> {
Expand Down Expand Up @@ -38,10 +42,20 @@ impl<'a> ExecutionContext<'a> {
}
}

pub fn handle_errors(&mut self, errors: Option<Vec<GraphQLError>>) {
if let Some(errors) = errors {
for error in errors {
self.errors.push(error);
pub fn handle_errors(
&mut self,
errors: Option<Vec<GraphQLError>>,
entity_index_error_map: Option<HashMap<&usize, Vec<GraphQLErrorPath>>>,
) {
if let Some(response_errors) = errors {
for response_error in response_errors {
if let Some(entity_index_error_map) = &entity_index_error_map {
let normalized_errors =
response_error.normalize_entity_error(entity_index_error_map);
self.errors.extend(normalized_errors);
} else {
self.errors.push(response_error);
}
}
}
}
Expand Down
106 changes: 95 additions & 11 deletions lib/executor/src/execution/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ use crate::{
response::project_by_operation,
},
response::{
graphql_error::GraphQLError, merge::deep_merge, subgraph_response::SubgraphResponse,
graphql_error::{GraphQLError, GraphQLErrorPath},
merge::deep_merge,
subgraph_response::SubgraphResponse,
value::Value,
},
utils::{
Expand Down Expand Up @@ -420,7 +422,7 @@ impl<'exec> Executor<'exec> {
ctx: &mut ExecutionContext<'exec>,
response_bytes: Bytes,
fetch_node_id: i64,
) -> Option<(Value<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
) -> Option<(SubgraphResponse<'exec>, Option<&'exec Vec<FetchRewrite>>)> {
let idx = ctx.response_storage.add_response(response_bytes);
// SAFETY: The `bytes` are transmuted to the lifetime `'a` of the `ExecutionContext`.
// This is safe because the `response_storage` is part of the `ExecutionContext` (`ctx`)
Expand Down Expand Up @@ -452,9 +454,7 @@ impl<'exec> Executor<'exec> {
}
};

ctx.handle_errors(response.errors);

Some((response.data, output_rewrites))
Some((response, output_rewrites))
}

fn process_job_result(
Expand All @@ -472,16 +472,18 @@ impl<'exec> Executor<'exec> {
&mut ctx.response_headers_aggregator,
)?;

if let Some((mut data, output_rewrites)) =
if let Some((mut response, output_rewrites)) =
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
{
ctx.handle_errors(response.errors, None);
if let Some(output_rewrites) = output_rewrites {
for output_rewrite in output_rewrites {
output_rewrite.rewrite(&self.schema_metadata.possible_types, &mut data);
output_rewrite
.rewrite(&self.schema_metadata.possible_types, &mut response.data);
}
}

deep_merge(&mut ctx.final_response, data);
deep_merge(&mut ctx.final_response, response.data);
}
}
ExecutionJob::FlattenFetch(job) => {
Expand All @@ -493,10 +495,10 @@ impl<'exec> Executor<'exec> {
&mut ctx.response_headers_aggregator,
)?;

if let Some((mut data, output_rewrites)) =
if let Some((mut response, output_rewrites)) =
self.process_subgraph_response(ctx, job.response.body, job.fetch_node_id)
{
if let Some(mut entities) = data.take_entities() {
if let Some(mut entities) = response.data.take_entities() {
if let Some(output_rewrites) = output_rewrites {
for output_rewrite in output_rewrites {
for entity in &mut entities {
Expand All @@ -508,15 +510,33 @@ impl<'exec> Executor<'exec> {

let mut index = 0;
let normalized_path = job.flatten_node_path.as_slice();
// If there is an error in the response, then collect the paths for normalizing the error
let initial_error_path = response
.errors
.as_ref()
.map(|_| GraphQLErrorPath::with_capacity(normalized_path.len() + 2));
let mut entity_index_error_map = response
.errors
.as_ref()
.map(|_| HashMap::with_capacity(entities.len()));
traverse_and_callback_mut(
&mut ctx.final_response,
normalized_path,
self.schema_metadata,
&mut |target| {
initial_error_path,
&mut |target, error_path| {
let hash = job.representation_hashes[index];
if let Some(entity_index) =
job.representation_hash_to_index.get(&hash)
{
if let (Some(error_path), Some(entity_index_error_map)) =
(error_path, entity_index_error_map.as_mut())
{
let error_paths = entity_index_error_map
.entry(entity_index)
.or_insert_with(Vec::new);
error_paths.push(error_path);
}
if let Some(entity) = entities.get(*entity_index) {
// SAFETY: `new_val` is a clone of an entity that lives for `'a`.
// The transmute is to satisfy the compiler, but the lifetime
Expand All @@ -529,6 +549,7 @@ impl<'exec> Executor<'exec> {
index += 1;
},
);
ctx.handle_errors(response.errors, entity_index_error_map);
}
}
}
Expand Down Expand Up @@ -714,6 +735,8 @@ fn select_fetch_variables<'a>(

#[cfg(test)]
mod tests {
use crate::{context::ExecutionContext, response::graphql_error::GraphQLErrorPath};

use super::select_fetch_variables;
use sonic_rs::Value;
use std::collections::{BTreeSet, HashMap};
Expand Down Expand Up @@ -768,4 +791,65 @@ mod tests {

assert!(selected.is_none());
}
#[test]
/**
* We have the same entity in two different paths ["a", 0] and ["b", 1],
* and the subgraph response has an error for this entity.
* So we should duplicate the error for both paths.
*/
fn normalize_entity_errors_correctly() {
use crate::response::graphql_error::{GraphQLError, GraphQLErrorPathSegment};
use std::collections::HashMap;
let mut ctx = ExecutionContext::default();
let mut entity_index_error_map: HashMap<&usize, Vec<GraphQLErrorPath>> = HashMap::new();
entity_index_error_map.insert(
&0,
vec![
GraphQLErrorPath {
segments: vec![
GraphQLErrorPathSegment::String("a".to_string()),
GraphQLErrorPathSegment::Index(0),
],
},
GraphQLErrorPath {
segments: vec![
GraphQLErrorPathSegment::String("b".to_string()),
GraphQLErrorPathSegment::Index(1),
],
},
],
);
let response_errors = vec![GraphQLError {
message: "Error 1".to_string(),
locations: None,
path: Some(GraphQLErrorPath {
segments: vec![
GraphQLErrorPathSegment::String("_entities".to_string()),
GraphQLErrorPathSegment::Index(0),
GraphQLErrorPathSegment::String("field1".to_string()),
],
}),
extensions: None,
}];
ctx.handle_errors(Some(response_errors), Some(entity_index_error_map));
assert_eq!(ctx.errors.len(), 2);
assert_eq!(ctx.errors[0].message, "Error 1");
assert_eq!(
ctx.errors[0].path.as_ref().unwrap().segments,
vec![
GraphQLErrorPathSegment::String("a".to_string()),
GraphQLErrorPathSegment::Index(0),
GraphQLErrorPathSegment::String("field1".to_string())
]
);
assert_eq!(ctx.errors[1].message, "Error 1");
assert_eq!(
ctx.errors[1].path.as_ref().unwrap().segments,
vec![
GraphQLErrorPathSegment::String("b".to_string()),
GraphQLErrorPathSegment::Index(1),
GraphQLErrorPathSegment::String("field1".to_string())
]
);
}
}
86 changes: 83 additions & 3 deletions lib/executor/src/response/graphql_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use graphql_parser::Pos;
use graphql_tools::validation::utils::ValidationError;
use serde::{de, Deserialize, Deserializer, Serialize};
use sonic_rs::Value;
use std::fmt;
use std::{collections::HashMap, fmt};

#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
Expand All @@ -11,7 +11,7 @@ pub struct GraphQLError {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub locations: Option<Vec<GraphQLErrorLocation>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub path: Option<Vec<GraphQLErrorPathSegment>>,
pub path: Option<GraphQLErrorPath>,
pub extensions: Option<Value>,
}

Expand Down Expand Up @@ -46,13 +46,43 @@ impl From<&Pos> for GraphQLErrorLocation {
}
}

impl GraphQLError {
pub fn entity_index_and_path<'a>(&'a self) -> Option<EntityIndexAndPath<'a>> {
self.path.as_ref().and_then(|p| p.entity_index_and_path())
}

pub fn normalize_entity_error(
self,
entity_index_error_map: &HashMap<&usize, Vec<GraphQLErrorPath>>,
) -> Vec<GraphQLError> {
if let Some(entity_index_and_path) = &self.entity_index_and_path() {
if let Some(entity_error_paths) =
entity_index_error_map.get(&entity_index_and_path.entity_index)
{
return entity_error_paths
.iter()
.map(|error_path| {
let mut new_error_path = error_path.clone();
new_error_path.extend_from_slice(entity_index_and_path.rest_of_path);
GraphQLError {
path: Some(new_error_path),
..self.clone()
}
})
.collect();
}
}
vec![self]
}
}

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct GraphQLErrorLocation {
pub line: usize,
pub column: usize,
}

#[derive(Clone, Debug, Serialize)]
#[derive(Clone, Debug, Serialize, PartialEq)]
pub enum GraphQLErrorPathSegment {
String(String),
Index(usize),
Expand Down Expand Up @@ -110,3 +140,53 @@ impl<'de> Deserialize<'de> for GraphQLErrorPathSegment {
deserializer.deserialize_any(PathSegmentVisitor)
}
}

#[derive(Clone, Debug, Deserialize, Serialize, Default)]
pub struct GraphQLErrorPath {
#[serde(flatten)]
pub segments: Vec<GraphQLErrorPathSegment>,
}

pub struct EntityIndexAndPath<'a> {
pub entity_index: usize,
pub rest_of_path: &'a [GraphQLErrorPathSegment],
}

impl GraphQLErrorPath {
pub fn with_capacity(capacity: usize) -> Self {
GraphQLErrorPath {
segments: Vec::with_capacity(capacity),
}
}
pub fn concat(&self, segment: GraphQLErrorPathSegment) -> Self {
let mut new_path = self.segments.clone();
new_path.push(segment);
GraphQLErrorPath { segments: new_path }
}

pub fn concat_index(&self, index: usize) -> Self {
self.concat(GraphQLErrorPathSegment::Index(index))
}

pub fn concat_str(&self, field: String) -> Self {
self.concat(GraphQLErrorPathSegment::String(field))
}

pub fn extend_from_slice(&mut self, other: &[GraphQLErrorPathSegment]) {
self.segments.extend_from_slice(other);
}

pub fn entity_index_and_path<'a>(&'a self) -> Option<EntityIndexAndPath<'a>> {
match &self.segments.as_slice() {
[GraphQLErrorPathSegment::String(maybe_entities), GraphQLErrorPathSegment::Index(entity_index), rest_of_path @ ..]
if maybe_entities == "_entities" =>
{
Some(EntityIndexAndPath {
entity_index: *entity_index,
rest_of_path,
})
}
_ => None,
}
}
}
Loading
Loading