3030import java .util .concurrent .*;
3131import org .junit .Before ;
3232import org .junit .Test ;
33+ import org .mockito .invocation .InvocationOnMock ;
34+ import org .mockito .stubbing .Answer ;
3335import org .threeten .bp .Duration ;
3436
3537public class MessageDispatcherTest {
3638 private static final ByteString MESSAGE_DATA = ByteString .copyFromUtf8 ("message-data" );
3739 private static final int DELIVERY_INFO_COUNT = 3 ;
3840 private static final String ACK_ID = "ACK-ID" ;
41+ private static final String ORDERING_KEY = "KEY" ;
3942 private static final ReceivedMessage TEST_MESSAGE =
4043 ReceivedMessage .newBuilder ()
4144 .setAckId (ACK_ID )
4245 .setMessage (PubsubMessage .newBuilder ().setData (MESSAGE_DATA ).build ())
4346 .setDeliveryAttempt (DELIVERY_INFO_COUNT )
4447 .build ();
48+ private static final ByteString ORDERED_MESSAGE_DATA_1 = ByteString .copyFromUtf8 ("message-data1" );
49+ private static final ReceivedMessage ORDERED_TEST_MESSAGE_1 =
50+ ReceivedMessage .newBuilder ()
51+ .setAckId ("ACK-ID-1" )
52+ .setMessage (
53+ PubsubMessage .newBuilder ()
54+ .setData (ORDERED_MESSAGE_DATA_1 )
55+ .setOrderingKey (ORDERING_KEY )
56+ .build ())
57+ .build ();
58+ private static final ByteString ORDERED_MESSAGE_DATA_2 = ByteString .copyFromUtf8 ("message-data2" );
59+ private static final ReceivedMessage ORDERED_TEST_MESSAGE_2 =
60+ ReceivedMessage .newBuilder ()
61+ .setAckId ("ACK-ID-2" )
62+ .setMessage (
63+ PubsubMessage .newBuilder ()
64+ .setData (ORDERED_MESSAGE_DATA_2 )
65+ .setOrderingKey (ORDERING_KEY )
66+ .build ())
67+ .build ();
4568 private static final int MAX_SECONDS_PER_ACK_EXTENSION = 60 ;
4669 private static final int MIN_ACK_DEADLINE_SECONDS = 10 ;
4770 private static final Duration MAX_ACK_EXTENSION_PERIOD = Duration .ofMinutes (60 );
@@ -494,6 +517,84 @@ public void testAckExtensionDefaultsExactlyOnceDeliveryEnabledThenDisabled() {
494517 Math .toIntExact (Subscriber .MAX_STREAM_ACK_DEADLINE .getSeconds ()));
495518 }
496519
520+ @ Test
521+ public void testOrderedDeliveryOrderingDisabled () throws Exception {
522+ MessageReceiver mockMessageReceiver = mock (MessageReceiver .class );
523+ MessageDispatcher messageDispatcher =
524+ getMessageDispatcher (mockMessageReceiver , Executors .newFixedThreadPool (5 ));
525+
526+ // This would normally be set from the streaming pull response in the
527+ // StreamingSubscriberConnection
528+ messageDispatcher .setMessageOrderingEnabled (false );
529+
530+ CountDownLatch receiveCalls = new CountDownLatch (2 );
531+
532+ doAnswer (
533+ new Answer <Void >() {
534+ public Void answer (InvocationOnMock invocation ) throws Exception {
535+ Thread .sleep (1000 );
536+ receiveCalls .countDown ();
537+ return null ;
538+ }
539+ })
540+ .when (mockMessageReceiver )
541+ .receiveMessage (eq (ORDERED_TEST_MESSAGE_1 .getMessage ()), any (AckReplyConsumer .class ));
542+ doAnswer (
543+ new Answer <Void >() {
544+ public Void answer (InvocationOnMock invocation ) {
545+ // Ensure the previous method didn't finish and we could process in parallel.
546+ assertEquals (2 , receiveCalls .getCount ());
547+ receiveCalls .countDown ();
548+ return null ;
549+ }
550+ })
551+ .when (mockMessageReceiver )
552+ .receiveMessage (eq (ORDERED_TEST_MESSAGE_2 .getMessage ()), any (AckReplyConsumer .class ));
553+
554+ messageDispatcher .processReceivedMessages (
555+ Arrays .asList (ORDERED_TEST_MESSAGE_1 , ORDERED_TEST_MESSAGE_2 ));
556+ receiveCalls .await ();
557+ }
558+
559+ @ Test
560+ public void testOrderedDeliveryOrderingEnabled () throws Exception {
561+ MessageReceiver mockMessageReceiver = mock (MessageReceiver .class );
562+ MessageDispatcher messageDispatcher =
563+ getMessageDispatcher (mockMessageReceiver , Executors .newFixedThreadPool (5 ));
564+
565+ // This would normally be set from the streaming pull response in the
566+ // StreamingSubscriberConnection
567+ messageDispatcher .setMessageOrderingEnabled (true );
568+
569+ CountDownLatch receiveCalls = new CountDownLatch (2 );
570+
571+ doAnswer (
572+ new Answer <Void >() {
573+ public Void answer (InvocationOnMock invocation ) throws Exception {
574+ Thread .sleep (1000 );
575+ receiveCalls .countDown ();
576+ return null ;
577+ }
578+ })
579+ .when (mockMessageReceiver )
580+ .receiveMessage (eq (ORDERED_TEST_MESSAGE_1 .getMessage ()), any (AckReplyConsumer .class ));
581+ doAnswer (
582+ new Answer <Void >() {
583+ public Void answer (InvocationOnMock invocation ) {
584+ // Ensure the previous method has finished completely.
585+ assertEquals (1 , receiveCalls .getCount ());
586+ receiveCalls .countDown ();
587+ return null ;
588+ }
589+ })
590+ .when (mockMessageReceiver )
591+ .receiveMessage (eq (ORDERED_TEST_MESSAGE_2 .getMessage ()), any (AckReplyConsumer .class ));
592+
593+ messageDispatcher .processReceivedMessages (
594+ Arrays .asList (ORDERED_TEST_MESSAGE_1 , ORDERED_TEST_MESSAGE_2 ));
595+ receiveCalls .await ();
596+ }
597+
497598 @ Test
498599 public void testAckExtensionCustomMinExactlyOnceDeliveryDisabledThenEnabled () {
499600 int customMinSeconds = 30 ;
@@ -569,20 +670,28 @@ private void assertMinAndMaxAckDeadlines(
569670 }
570671
571672 private MessageDispatcher getMessageDispatcher () {
572- return getMessageDispatcher (mock (MessageReceiver .class ));
673+ return getMessageDispatcher (mock (MessageReceiver .class ), MoreExecutors . directExecutor () );
573674 }
574675
575676 private MessageDispatcher getMessageDispatcher (MessageReceiver messageReceiver ) {
576- return getMessageDispatcherFromBuilder (MessageDispatcher .newBuilder (messageReceiver ));
677+ return getMessageDispatcherFromBuilder (
678+ MessageDispatcher .newBuilder (messageReceiver ), MoreExecutors .directExecutor ());
679+ }
680+
681+ private MessageDispatcher getMessageDispatcher (
682+ MessageReceiver messageReceiver , Executor executor ) {
683+ return getMessageDispatcherFromBuilder (MessageDispatcher .newBuilder (messageReceiver ), executor );
577684 }
578685
579686 private MessageDispatcher getMessageDispatcher (
580687 MessageReceiverWithAckResponse messageReceiverWithAckResponse ) {
581688 return getMessageDispatcherFromBuilder (
582- MessageDispatcher .newBuilder (messageReceiverWithAckResponse ));
689+ MessageDispatcher .newBuilder (messageReceiverWithAckResponse ),
690+ MoreExecutors .directExecutor ());
583691 }
584692
585- private MessageDispatcher getMessageDispatcherFromBuilder (MessageDispatcher .Builder builder ) {
693+ private MessageDispatcher getMessageDispatcherFromBuilder (
694+ MessageDispatcher .Builder builder , Executor executor ) {
586695 MessageDispatcher messageDispatcher =
587696 builder
588697 .setAckProcessor (mockAckProcessor )
@@ -594,7 +703,7 @@ private MessageDispatcher getMessageDispatcherFromBuilder(MessageDispatcher.Buil
594703 .setMaxDurationPerAckExtensionDefaultUsed (true )
595704 .setAckLatencyDistribution (mock (Distribution .class ))
596705 .setFlowController (mock (FlowController .class ))
597- .setExecutor (MoreExecutors . directExecutor () )
706+ .setExecutor (executor )
598707 .setSystemExecutor (systemExecutor )
599708 .setApiClock (clock )
600709 .build ();
0 commit comments