We adopted Google Cloud PubSub as our messaging platform at the start of 2020. At the time, it came from the Google Cloud BigData solution and it was obvious that it was tailored to those scenarios. One of the challenges we initially faced was dealing with errors during message processing; thanks to a new release from Google, this is now supported out of the box.

Data availability with PubSub

Our architecture is built around the premise that data should always be available for anyone to innovate on. As such we have decided to use GCP PubSub as one key part of our data platform; whenever a data element is created or modified it is published for others to consume and react on.

Typically, this flow would work something like this

Flöde PubSub

For details on this flow, see the PubSub documentation. The challenging step was #5 where the Subscriber sends an Acknowledgement back to PubSub, telling it that the message has been successfully processed.

Indefinite Delivery – Bounce Storms

Early on we discovered that if the Acknowledgement is not sent, the message will just bounce back and forth between the Subscription and the Subscriber, indefinitely. This was a real set back but something we could overcome by adding code to our Subscriber for proper error handling. Coming from a background working a lot with JMS and Apache ActiveMQ, we were looking for something similar to Dead Letter handling.

At this point, most of our Subscribers grew a lot in lines of code, they had to detect and handle errors and each microservice had to do dead letter handling all by itself.

@Override
protected void consume(BasicAcknowledgeablePubsubMessage basicAcknowledgeablePubsubMessage) {
    String data = null;
    Map<String, String> headers = null;
    try {
        data = basicAcknowledgeablePubsubMessage.getPubsubMessage().getData().toStringUtf8();
        headers = basicAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap();

        if (correctRecipient(headers.get(HEADER_RECIPIENT), SERVICE_NAME)) {
            Product p = pmService.parseProduct(data);

            String requestId = !StringUtils.isEmpty(headers.get(HEADER_REQUESTID)) ? headers.get(HEADER_REQUESTID) : UUID.randomUUID().toString();      // for now, this will always be a random UUID
            LocalDateTime modifiedTs = null;
            if (headers != null && headers.containsKey(HEADER_MODIFIEDTS)) {
                modifiedTs = LocalDateTime.parse(headers.get(HEADER_MODIFIEDTS), MODIFIEDTS_PARSER);
            }

            pmService.processProduct(p, modifiedTs, requestId);
        }
    } catch(RuntimeException | JsonProcessingException e) {
        log.error("Could not receive message", e);
        // nack only bounces the message, we need to decide if it is transient or infra or something wrong with the message
        // otherwise we will get a (re)bounce storm
        // send to an error / retry queue
        if ( ENABLE_GCP_DLQ ) {
            publisher.send(PRODUCT_ERROR_TOPIC, data, headers != null ? headers : Collections.emptyMap());
        } else {
            log.error("Could not parse message: {}", data);
        }
    } finally {
        doBlocking(basicAcknowledgeablePubsubMessage.ack());
    }
}

Another issue with this setup was that every for every subscriber, we need an additional error topic and an additonal subscriber on the error topic so that our messages are not lost.

The final issue with this setup was that retries was never implemented by us, issues that are intermittent (e.g. redeploy of another service) would automatically cause the message to be sent to the DLQ.

Support for dead lettering and retries

Thanks to the latest release of GCP PubSub, we now have support for dead lettering and retries out of the box. Using config-connector, we can configure our applications as this

---
apiVersion: pubsub.cnrm.cloud.google.com/v1beta1
kind: PubSubSubscription
metadata:
  labels:
  {{- include "springboot.labels" . | nindent 4 }}
  name: product-topic-product-service
spec:
  ackDeadlineSeconds: 300
  messageRetentionDuration: 604800s
  retainAckedMessages: false
  topicRef:
    name: product-topic
  deadLetterPolicy:
    deadLetterTopicRef:
      name: global-dlq
    maxDeliveryAttempts: 5
  retryPolicy:
    minimumBackoff: 10s
    maximumBackoff: 600s
---
apiVersion: pubsub.cnrm.cloud.google.com/v1beta1
kind: PubSubSubscription
metadata:
  labels:
  {{- include "springboot.labels" . | nindent 4 }}
  name: product-topic-product-service-error-subscriber
spec:
  ackDeadlineSeconds: 15
  messageRetentionDuration: 604800s
  retainAckedMessages: false
  topicRef:
    name: global-dlq
  filter: attributes.CloudPubSubDeadLetterSourceSubscription = "product-topic-product-service"
PubSub Dead Letter attributes

When PubSub detects that our message has been retried more than the configured maxDeliveryAttempts, it will send the message to deadLetterTopicRef. Besides from just sending the message to the dead letter topic, it will add a few attributes to the message as well

  • CloudPubSubDeadLetterSourceDeliveryCount
  • CloudPubSubDeadLetterSourceSubscription
  • CloudPubSubDeadLetterSourceSubscriptionProject
  • CloudPubSubDeadLetterSourceTopicPublishTime

Using these attributes, we can add a filter to our error subscriber that will host the broken messages from product-service. Our applications now only need to worry about setting up their retry and dead letter configuration and decide if the should send an ack() or a nack() back to GCP PubSub, reducing the complexity and risk of manually configured errors.

@Override
protected void consume(BasicAcknowledgeablePubsubMessage msg) {
    String data = null;
    Map<String, String> headers = null;
    try {
        data = basicAcknowledgeablePubsubMessage.getPubsubMessage().getData().toStringUtf8();
        headers = basicAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap();

        if (correctRecipient(headers.get(HEADER_RECIPIENT), SERVICE_NAME)) {
            Product p = pmService.parseProduct(data);

            String requestId = !StringUtils.isEmpty(headers.get(HEADER_REQUESTID)) ? headers.get(HEADER_REQUESTID) : UUID.randomUUID().toString();      // for now, this will always be a random UUID
            LocalDateTime modifiedTs = null;
            if (headers != null && headers.containsKey(HEADER_MODIFIEDTS)) {
                modifiedTs = LocalDateTime.parse(headers.get(HEADER_MODIFIEDTS), MODIFIEDTS_PARSER);
            }

            pmService.processProduct(p, modifiedTs, requestId);
            basicAcknowledgeablePubsubMessage.ack()
        }
    } catch(RuntimeException | JsonProcessingException e) {
        log.error("Could not receive message", e);
        basicAcknowledgeablePubsubMessage.nack()
    }
}
Summary

Using the latest features of GCP PubSub it is now possible to have proper retry management, dead letter handling of broken messages and most importantly to reduce the complexity of our Subscription microservices. This makes GCP PubSub much more suitable to use in traditional application messaging.

Fler insikter och blogginlägg

När vi stöter på intressanta tekniska saker på våra äventyr så brukar vi skriva om dom. Sharing is caring!