Skip to content

Commit e89fd6c

Browse files
authored
fix(pubsub): fix out of order issue when exactly once is enabled (#9472)
* fix(pubsub): fix out of ordering issue with exactly once * add ordering test and fix race condition with resource cleanup * remove TODO comment
1 parent 92e7b7f commit e89fd6c

File tree

2 files changed

+112
-20
lines changed

2 files changed

+112
-20
lines changed

pubsub/integration_test.go

Lines changed: 106 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1177,29 +1177,32 @@ func TestIntegration_OrderedKeys_Basic(t *testing.T) {
11771177
}
11781178

11791179
received := make(chan string, numItems)
1180+
ctx, cancel := context.WithCancel(ctx)
11801181
go func() {
1181-
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
1182-
defer msg.Ack()
1183-
if msg.OrderingKey != orderingKey {
1184-
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
1185-
}
1186-
1187-
received <- string(msg.Data)
1188-
}); err != nil {
1189-
if c := status.Code(err); c != codes.Canceled {
1190-
t.Error(err)
1182+
for i := 0; i < numItems; i++ {
1183+
select {
1184+
case r := <-received:
1185+
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
1186+
t.Errorf("%d: got %s, want %s", i, got, want)
1187+
}
1188+
case <-time.After(30 * time.Second):
1189+
t.Errorf("timed out after 30s waiting for item %d", i)
1190+
cancel()
11911191
}
11921192
}
1193+
cancel()
11931194
}()
11941195

1195-
for i := 0; i < numItems; i++ {
1196-
select {
1197-
case r := <-received:
1198-
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
1199-
t.Fatalf("%d: got %s, want %s", i, got, want)
1200-
}
1201-
case <-time.After(30 * time.Second):
1202-
t.Fatalf("timed out after 30s waiting for item %d", i)
1196+
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
1197+
defer msg.Ack()
1198+
if msg.OrderingKey != orderingKey {
1199+
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
1200+
}
1201+
1202+
received <- string(msg.Data)
1203+
}); err != nil {
1204+
if c := status.Code(err); c != codes.Canceled {
1205+
t.Error(err)
12031206
}
12041207
}
12051208
}
@@ -1445,6 +1448,91 @@ func TestIntegration_OrderedKeys_SubscriptionOrdering(t *testing.T) {
14451448
}
14461449
}
14471450

1451+
func TestIntegration_OrderingWithExactlyOnce(t *testing.T) {
1452+
ctx := context.Background()
1453+
client := integrationTestClient(ctx, t, option.WithEndpoint("us-west1-pubsub.googleapis.com:443"))
1454+
defer client.Close()
1455+
1456+
topic, err := createTopicWithRetry(ctx, t, client, topicIDs.New(), nil)
1457+
if err != nil {
1458+
t.Fatal(err)
1459+
}
1460+
defer topic.Delete(ctx)
1461+
defer topic.Stop()
1462+
exists, err := topic.Exists(ctx)
1463+
if err != nil {
1464+
t.Fatal(err)
1465+
}
1466+
if !exists {
1467+
t.Fatalf("topic %v should exist, but it doesn't", topic)
1468+
}
1469+
var sub *Subscription
1470+
if sub, err = createSubWithRetry(ctx, t, client, subIDs.New(), SubscriptionConfig{
1471+
Topic: topic,
1472+
EnableMessageOrdering: true,
1473+
EnableExactlyOnceDelivery: true,
1474+
}); err != nil {
1475+
t.Fatal(err)
1476+
}
1477+
defer sub.Delete(ctx)
1478+
exists, err = sub.Exists(ctx)
1479+
if err != nil {
1480+
t.Fatal(err)
1481+
}
1482+
if !exists {
1483+
t.Fatalf("subscription %s should exist, but it doesn't", sub.ID())
1484+
}
1485+
1486+
topic.PublishSettings.DelayThreshold = time.Second
1487+
topic.EnableMessageOrdering = true
1488+
1489+
orderingKey := "some-ordering-key"
1490+
numItems := 10
1491+
for i := 0; i < numItems; i++ {
1492+
r := topic.Publish(ctx, &Message{
1493+
ID: fmt.Sprintf("id-%d", i),
1494+
Data: []byte(fmt.Sprintf("item-%d", i)),
1495+
OrderingKey: orderingKey,
1496+
})
1497+
go func() {
1498+
if _, err := r.Get(ctx); err != nil {
1499+
t.Error(err)
1500+
}
1501+
}()
1502+
}
1503+
1504+
received := make(chan string, numItems)
1505+
ctx, cancel := context.WithCancel(ctx)
1506+
go func() {
1507+
for i := 0; i < numItems; i++ {
1508+
select {
1509+
case r := <-received:
1510+
if got, want := r, fmt.Sprintf("item-%d", i); got != want {
1511+
t.Errorf("%d: got %s, want %s", i, got, want)
1512+
}
1513+
case <-time.After(30 * time.Second):
1514+
t.Errorf("timed out after 30s waiting for item %d", i)
1515+
cancel()
1516+
}
1517+
}
1518+
cancel()
1519+
}()
1520+
1521+
if err := sub.Receive(ctx, func(ctx context.Context, msg *Message) {
1522+
defer msg.Ack()
1523+
if msg.OrderingKey != orderingKey {
1524+
t.Errorf("got ordering key %s, expected %s", msg.OrderingKey, orderingKey)
1525+
}
1526+
1527+
received <- string(msg.Data)
1528+
}); err != nil {
1529+
if c := status.Code(err); c != codes.Canceled {
1530+
t.Error(err)
1531+
}
1532+
}
1533+
1534+
}
1535+
14481536
func TestIntegration_CreateSubscription_DeadLetterPolicy(t *testing.T) {
14491537
t.Parallel()
14501538
ctx := context.Background()

pubsub/iterator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -313,9 +313,13 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
313313
}
314314
}
315315
// Only return for processing messages that were successfully modack'ed.
316+
// Iterate over the original messages slice for ordering.
316317
v := make([]*ipubsub.Message, 0, len(pendingMessages))
317-
for _, m := range pendingMessages {
318-
v = append(v, m)
318+
for _, m := range msgs {
319+
ackID := msgAckID(m)
320+
if _, ok := pendingMessages[ackID]; ok {
321+
v = append(v, m)
322+
}
319323
}
320324
return v, nil
321325
}

0 commit comments

Comments
 (0)