@@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM
313313 'undefined' | non_neg_integer (), qistate ()}.
314314
315315recover (# resource { virtual_host = VHost } = Name , Terms , MsgStoreRecovered ,
316- ContainsCheckFun , OnSyncFun , OnSyncMsgFun , Context ) ->
316+ ContainsCheckFun , OnSyncFun , OnSyncMsgFun ,
317+ % % We only allow using this module when converting to v2.
318+ convert ) ->
317319 #{segment_entry_count := SegmentEntryCount } = rabbit_vhost :read_config (VHost ),
318320 put (segment_entry_count , SegmentEntryCount ),
319321 VHostDir = rabbit_vhost :msg_store_dir_path (VHost ),
@@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered,
323325 CleanShutdown = Terms /= non_clean_shutdown ,
324326 case CleanShutdown andalso MsgStoreRecovered of
325327 true -> case proplists :get_value (segments , Terms , non_clean_shutdown ) of
326- non_clean_shutdown -> init_dirty (false , ContainsCheckFun , State1 , Context );
328+ non_clean_shutdown -> init_dirty (false , ContainsCheckFun , State1 );
327329 RecoveredCounts -> init_clean (RecoveredCounts , State1 )
328330 end ;
329- false -> init_dirty (CleanShutdown , ContainsCheckFun , State1 , Context )
331+ false -> init_dirty (CleanShutdown , ContainsCheckFun , State1 )
330332 end .
331333
332334-spec terminate (rabbit_types :vhost (), [any ()], qistate ()) -> qistate ().
@@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) ->
644646-define (RECOVER_BYTES , 2 ).
645647-define (RECOVER_COUNTER_SIZE , 2 ).
646648
647- init_dirty (CleanShutdown , ContainsCheckFun , State , Context ) ->
649+ init_dirty (CleanShutdown , ContainsCheckFun , State ) ->
648650 % % Recover the journal completely. This will also load segments
649651 % % which have entries in the journal and remove duplicates. The
650652 % % counts will correctly reflect the combination of the segment
@@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) ->
679681 % % recovery fails with a crash.
680682 State2 = flush_journal (State1 # qistate { segments = Segments1 ,
681683 dirty_count = DirtyCount }),
682- case Context of
683- convert ->
684- {Count , Bytes , State2 };
685- main ->
686- % % We try to see if there are segment files from the v2 index.
687- case rabbit_file :wildcard (" .*\\ .qi" , Dir ) of
688- % % We are recovering a dirty queue that was using the v2 index or in
689- % % the process of converting from v2 to v1.
690- [_ |_ ] ->
691- # resource {virtual_host = VHost , name = QName } = State2 # qistate .queue_name ,
692- rabbit_log :info (" Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert" ,
693- [QName , VHost , Count ]),
694- CountersRef = counters :new (? RECOVER_COUNTER_SIZE , []),
695- State3 = recover_index_v2_dirty (State2 , ContainsCheckFun , CountersRef ),
696- {Count + counters :get (CountersRef , ? RECOVER_COUNT ),
697- Bytes + counters :get (CountersRef , ? RECOVER_BYTES ),
698- State3 };
699- % % Otherwise keep default values.
700- [] ->
701- {Count , Bytes , State2 }
702- end
703- end .
704-
705- recover_index_v2_dirty (State0 = # qistate { queue_name = Name ,
706- on_sync = OnSyncFun ,
707- on_sync_msg = OnSyncMsgFun },
708- ContainsCheckFun , CountersRef ) ->
709- # resource {virtual_host = VHost , name = QName } = Name ,
710- rabbit_log :info (" Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown" , [QName , VHost ]),
711- % % We cannot use the counts/bytes because some messages may be in both
712- % % the v1 and v2 indexes after a crash.
713- {_ , _ , V2State } = rabbit_classic_queue_index_v2 :recover (Name , non_clean_shutdown , true ,
714- ContainsCheckFun , OnSyncFun , OnSyncMsgFun ,
715- convert ),
716- State = recover_index_v2_common (State0 , V2State , CountersRef ),
717- rabbit_log :info (" Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1" ,
718- [QName , VHost , counters :get (CountersRef , ? RECOVER_COUNT )]),
719- State .
720-
721- % % At this point all messages are persistent because transient messages
722- % % were dropped during the v2 index recovery.
723- recover_index_v2_common (State0 = # qistate { queue_name = Name , dir = Dir },
724- V2State , CountersRef ) ->
725- % % Use a temporary per-queue store state to read embedded messages.
726- StoreState0 = rabbit_classic_queue_store_v2 :init (Name ),
727- % % Go through the v2 index and publish messages to v1 index.
728- {LoSeqId , HiSeqId , _ } = rabbit_classic_queue_index_v2 :bounds (V2State ),
729- % % When resuming after a crash we need to double check the messages that are both
730- % % in the v1 and v2 index (effectively the messages below the upper bound of the
731- % % v1 index that are about to be written to it).
732- {_ , V1HiSeqId , _ } = bounds (State0 ),
733- SkipFun = fun
734- (SeqId , FunState0 ) when SeqId < V1HiSeqId ->
735- case read (SeqId , SeqId + 1 , FunState0 ) of
736- % % Message already exists, skip.
737- {[_ ], FunState } ->
738- {skip , FunState };
739- % % Message doesn't exist, write.
740- {[], FunState } ->
741- {write , FunState }
742- end ;
743- % % Message is out of bounds of the v1 index.
744- (_ , FunState ) ->
745- {write , FunState }
746- end ,
747- % % We use a common function also used with conversion on policy change.
748- {State1 , _StoreState } = rabbit_variable_queue :convert_from_v2_to_v1_loop (Name , State0 , V2State , StoreState0 ,
749- {CountersRef , ? RECOVER_COUNT , ? RECOVER_BYTES },
750- LoSeqId , HiSeqId , SkipFun ),
751- % % Delete any remaining v2 index files.
752- OldFiles = rabbit_file :wildcard (" .*\\ .qi" , Dir )
753- ++ rabbit_file :wildcard (" .*\\ .qs" , Dir ),
754- _ = [rabbit_file :delete (filename :join (Dir , F )) || F <- OldFiles ],
755- % % Ensure that everything in the v1 index is written to disk.
756- State = flush (State1 ),
757- % % Clean up all the garbage that we have surely been creating.
758- garbage_collect (),
759- State .
684+ {Count , Bytes , State2 }.
760685
761686terminate (State = # qistate { journal_handle = JournalHdl ,
762687 segments = Segments }) ->
0 commit comments