@@ -407,8 +407,8 @@ impl StreamingSyncIteration {
407407 SyncStateMachineTransition :: Empty
408408 } else {
409409 // Periodically check whether any subscriptions that are part of this stream
410- // have become expired. We currently do this by re-creating the request and
411- // aborting the iteration if it has changed.
410+ // are expired. We currently do this by re-creating the request and aborting the
411+ // iteration if it has changed.
412412 let updated_request = self
413413 . adapter
414414 . collect_subscription_requests ( self . options . include_defaults ) ?;
@@ -600,6 +600,15 @@ impl StreamingSyncIteration {
600600 Ok ( progress)
601601 }
602602
603+ /// Reconciles local stream subscriptions with service-side state received in a checkpoint.
604+ ///
605+ /// This involves:
606+ ///
607+ /// 1. Marking local streams that don't exist in the checkpoint as inactive or deleting them.
608+ /// 2. Creating new subscriptions for auto-subscribed streams we weren't tracking before.
609+ /// 3. Associating buckets in the checkpoint with the stream subscriptions that created them.
610+ /// 4. Reporting errors for stream subscriptions that are marked as errorenous in the
611+ /// checkpoint.
603612 fn resolve_subscription_state (
604613 & self ,
605614 tracked : & TrackedCheckpoint ,
@@ -628,7 +637,6 @@ impl StreamingSyncIteration {
628637 } ) ;
629638 } ) ?;
630639
631- // If they don't exist already, create default subscriptions included in checkpoint
632640 for ( server_index, subscription) in tracked. streams . iter ( ) . enumerate ( ) {
633641 let matching_local_subscriptions = tracked_subscriptions
634642 . iter_mut ( )
@@ -640,57 +648,56 @@ impl StreamingSyncIteration {
640648 local. local . active = true ;
641649 local. local . is_default = subscription. is_default ;
642650 has_local = true ;
651+ }
643652
644- // Warn if this local subscription has errors. This search is quadratic because we
645- // only get the index of the faulty subscription with an error, which we then map
646- // to the local id via `tracked.requested_subscription_ids` and finally compare with
647- // one of the matching subscription here. That's fine though because we don't expect
648- // most of the streams to error in practice.
649- for error in & * subscription. errors {
650- if let StreamSubscriptionErrorCause :: ExplicitSubscription ( index) =
651- error. subscription
652- {
653+ for error in & * subscription. errors {
654+ match error. subscription {
655+ StreamSubscriptionErrorCause :: Default => {
656+ event. instructions . push ( Instruction :: LogLine {
657+ severity : LogSeverity :: WARNING ,
658+ line : Cow :: Owned ( format ! (
659+ "Default subscription {} has errors: {}" ,
660+ subscription. name, error. message
661+ ) ) ,
662+ } ) ;
663+ }
664+ StreamSubscriptionErrorCause :: ExplicitSubscription ( index) => {
653665 let Some ( local_id_for_error) =
654666 tracked. requested_subscriptions . subscription_ids . get ( index)
655667 else {
656668 continue ;
657669 } ;
658670
659- if * local_id_for_error == local. local . id {
660- let mut desc = String :: new ( ) ;
661- let _ = write ! (
662- & mut desc,
663- "Subscription to stream {} " ,
664- local. local. stream_name
665- ) ;
666- if let Some ( params) = & local. local . local_params {
667- let _ = write ! ( & mut desc, "(with parameters {params})" ) ;
668- } else {
669- desc. push_str ( "(without parameters)" ) ;
671+ // Find the matching explicit subscription to contextualize this error
672+ // message with the name of the stream and parameters used for the
673+ // subscription.
674+ for local in & tracked_subscriptions {
675+ if * local_id_for_error == local. local . id {
676+ let mut desc = String :: new ( ) ;
677+ let _ = write ! (
678+ & mut desc,
679+ "Subscription to stream {} " ,
680+ local. local. stream_name
681+ ) ;
682+ if let Some ( params) = & local. local . local_params {
683+ let _ = write ! ( & mut desc, "(with parameters {params})" ) ;
684+ } else {
685+ desc. push_str ( "(without parameters)" ) ;
686+ }
687+
688+ let _ =
689+ write ! ( & mut desc, " could not be resolved: {}" , error. message) ;
690+ event. instructions . push ( Instruction :: LogLine {
691+ severity : LogSeverity :: WARNING ,
692+ line : Cow :: Owned ( desc) ,
693+ } ) ;
670694 }
671-
672- let _ = write ! ( & mut desc, " could not be resolved: {}" , error. message) ;
673- event. instructions . push ( Instruction :: LogLine {
674- severity : LogSeverity :: WARNING ,
675- line : Cow :: Owned ( desc) ,
676- } ) ;
677695 }
678696 }
679- }
680- }
681-
682- for error in & * subscription. errors {
683- if let StreamSubscriptionErrorCause :: Default = error. subscription {
684- event. instructions . push ( Instruction :: LogLine {
685- severity : LogSeverity :: WARNING ,
686- line : Cow :: Owned ( format ! (
687- "Default subscription {} has errors: {}" ,
688- subscription. name, error. message
689- ) ) ,
690- } ) ;
691- }
697+ } ;
692698 }
693699
700+ // If they don't exist already, create default subscriptions included in checkpoint
694701 if !has_local && subscription. is_default {
695702 let local = self . adapter . create_default_subscription ( subscription) ?;
696703 tracked_subscriptions. push ( LocalAndServerSubscription {
0 commit comments