File tree 1 file changed +17
-11
lines changed 1 file changed +17
-11
lines changed Original file line number Diff line number Diff line change @@ -832,23 +832,29 @@ impl Parseable {
832
832
/// Updates schema by merging schemas stored by ingestors when running in Query mode
833
833
pub async fn update_schema_when_distributed (
834
834
& self ,
835
- tables : & Vec < String > ,
835
+ streams : & Vec < String > ,
836
836
) -> Result < ( ) , EventError > {
837
837
if self . options . mode != Mode :: Query {
838
838
return Ok ( ( ) ) ;
839
839
}
840
840
841
- for table in tables {
842
- if let Ok ( schemas) = self . storage . get_object_store ( ) . fetch_schemas ( table) . await {
843
- let new_schema = Schema :: try_merge ( schemas) ?;
844
- // commit schema merges the schema internally and updates the schema in storage.
845
- self . storage
846
- . get_object_store ( )
847
- . commit_schema ( table, new_schema. clone ( ) )
848
- . await ?;
841
+ for stream_name in streams {
842
+ let Ok ( schemas) = self
843
+ . storage
844
+ . get_object_store ( )
845
+ . fetch_schemas ( stream_name)
846
+ . await
847
+ else {
848
+ continue ;
849
+ } ;
850
+ let new_schema = Schema :: try_merge ( schemas) ?;
851
+ // commit schema merges the schema internally and updates the schema in storage.
852
+ self . storage
853
+ . get_object_store ( )
854
+ . commit_schema ( stream_name, new_schema. clone ( ) )
855
+ . await ?;
849
856
850
- self . get_stream ( table) ?. commit_schema ( new_schema) ?;
851
- }
857
+ self . get_stream ( stream_name) ?. commit_schema ( new_schema) ?;
852
858
}
853
859
854
860
Ok ( ( ) )
You can’t perform that action at this time.
0 commit comments