@@ -844,7 +844,7 @@ impl BlueskyStreamInner {
844844 let ( _sink, mut messages) = stream. into_stream ( ) ;
845845
846846 loop {
847- if * self . status . read ( ) ! = StreamStatus :: Stopped {
847+ if * self . status . read ( ) = = StreamStatus :: Stopped {
848848 return Ok ( ( ) ) ;
849849 }
850850
@@ -868,16 +868,15 @@ impl BlueskyStreamInner {
868868 Some ( result) = messages. next( ) => {
869869 * self . last_message_time. write( ) = Some ( Instant :: now( ) ) ;
870870
871- if * self . status. read( ) == StreamStatus :: Running {
872- match result {
873- Ok ( msg) => {
874- self . handle_message( msg) ;
875- }
876- Err ( e) => {
877- error!( "BlueskyStream {} message error: {}" , self . source_id, e) ;
878- }
871+ match result {
872+ Ok ( msg) => {
873+ self . handle_message( msg) ;
874+ }
875+ Err ( e) => {
876+ error!( "BlueskyStream {} message error: {}" , self . source_id, e) ;
879877 }
880878 }
879+
881880 }
882881 _ = tokio:: time:: sleep( Duration :: from_secs( 1 ) ) => {
883882 self . flush_expired_batches( ) . await ;
@@ -896,13 +895,15 @@ impl BlueskyStreamInner {
896895 } => {
897896 * self . current_cursor . write ( ) = Some ( time_us) ;
898897
899- if let Some ( post) = self . parse_commit ( & did, time_us, & commit) {
900- if self . should_include_post ( & post) {
901- debug ! (
902- "BlueskyStream {} accepted post from {} ({})" ,
903- self . source_id, post. did, post. uri
904- ) ;
905- self . pending_batch . add_post ( post) ;
898+ if * self . status . read ( ) == StreamStatus :: Running {
899+ if let Some ( post) = self . parse_commit ( & did, time_us, & commit) {
900+ if self . should_include_post ( & post) {
901+ debug ! (
902+ "BlueskyStream {} accepted post from {} ({})" ,
903+ self . source_id, post. did, post. uri
904+ ) ;
905+ self . pending_batch . add_post ( post) ;
906+ }
906907 }
907908 }
908909 }
0 commit comments