We adopted Google Cloud PubSub as our messaging platform at the start of 2020. At the time, it came from the Google Cloud BigData solution and it was obvious that it was tailored to those scenarios. One of the challenges we initially faced was dealing with errors during message processing; thanks to a new release from Google, this is now supported out of the box.
Data availability with PubSub
Our architecture is built around the premise that data should always be available for anyone to innovate on. As such we have decided to use GCP PubSub as one key part of our data platform; whenever a data element is created or modified it is published for others to consume and react on.
Typically, this flow would work something like this
For details on this flow, see the PubSub documentation. The challenging step was #5 where the Subscriber sends an Acknowledgement back to PubSub, telling it that the message has been successfully processed.
Indefinite Delivery – Bounce Storms
Early on we discovered that if the Acknowledgement is not sent, the message will just bounce back and forth between the Subscription and the Subscriber, indefinitely. This was a real set back but something we could overcome by adding code to our Subscriber for proper error handling. Coming from a background working a lot with JMS and Apache ActiveMQ, we were looking for something similar to Dead Letter handling.
At this point, most of our Subscribers grew a lot in lines of code, they had to detect and handle errors and each microservice had to do dead letter handling all by itself.
@Override protected void consume(BasicAcknowledgeablePubsubMessage basicAcknowledgeablePubsubMessage) { String data = null; Map<String, String> headers = null; try { data = basicAcknowledgeablePubsubMessage.getPubsubMessage().getData().toStringUtf8(); headers = basicAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap(); if (correctRecipient(headers.get(HEADER_RECIPIENT), SERVICE_NAME)) { Product p = pmService.parseProduct(data); String requestId = !StringUtils.isEmpty(headers.get(HEADER_REQUESTID)) ? headers.get(HEADER_REQUESTID) : UUID.randomUUID().toString(); // for now, this will always be a random UUID LocalDateTime modifiedTs = null; if (headers != null && headers.containsKey(HEADER_MODIFIEDTS)) { modifiedTs = LocalDateTime.parse(headers.get(HEADER_MODIFIEDTS), MODIFIEDTS_PARSER); } pmService.processProduct(p, modifiedTs, requestId); } } catch(RuntimeException | JsonProcessingException e) { log.error("Could not receive message", e); // nack only bounces the message, we need to decide if it is transient or infra or something wrong with the message // otherwise we will get a (re)bounce storm // send to an error / retry queue if ( ENABLE_GCP_DLQ ) { publisher.send(PRODUCT_ERROR_TOPIC, data, headers != null ? headers : Collections.emptyMap()); } else { log.error("Could not parse message: {}", data); } } finally { doBlocking(basicAcknowledgeablePubsubMessage.ack()); } }
Another issue with this setup was that every for every subscriber, we need an additional error topic and an additonal subscriber on the error topic so that our messages are not lost.
The final issue with this setup was that retries was never implemented by us, issues that are intermittent (e.g. redeploy of another service) would automatically cause the message to be sent to the DLQ.
Support for dead lettering and retries
Thanks to the latest release of GCP PubSub, we now have support for dead lettering and retries out of the box. Using config-connector, we can configure our applications as this
--- apiVersion: pubsub.cnrm.cloud.google.com/v1beta1 kind: PubSubSubscription metadata: labels: {{- include "springboot.labels" . | nindent 4 }} name: product-topic-product-service spec: ackDeadlineSeconds: 300 messageRetentionDuration: 604800s retainAckedMessages: false topicRef: name: product-topic deadLetterPolicy: deadLetterTopicRef: name: global-dlq maxDeliveryAttempts: 5 retryPolicy: minimumBackoff: 10s maximumBackoff: 600s --- apiVersion: pubsub.cnrm.cloud.google.com/v1beta1 kind: PubSubSubscription metadata: labels: {{- include "springboot.labels" . | nindent 4 }} name: product-topic-product-service-error-subscriber spec: ackDeadlineSeconds: 15 messageRetentionDuration: 604800s retainAckedMessages: false topicRef: name: global-dlq filter: attributes.CloudPubSubDeadLetterSourceSubscription = "product-topic-product-service"
PubSub Dead Letter attributes
When PubSub detects that our message has been retried more than the configured maxDeliveryAttempts, it will send the message to deadLetterTopicRef. Besides from just sending the message to the dead letter topic, it will add a few attributes to the message as well
- CloudPubSubDeadLetterSourceDeliveryCount
- CloudPubSubDeadLetterSourceSubscription
- CloudPubSubDeadLetterSourceSubscriptionProject
- CloudPubSubDeadLetterSourceTopicPublishTime
Using these attributes, we can add a filter to our error subscriber that will host the broken messages from product-service. Our applications now only need to worry about setting up their retry and dead letter configuration and decide if the should send an ack() or a nack() back to GCP PubSub, reducing the complexity and risk of manually configured errors.
@Override protected void consume(BasicAcknowledgeablePubsubMessage msg) { String data = null; Map<String, String> headers = null; try { data = basicAcknowledgeablePubsubMessage.getPubsubMessage().getData().toStringUtf8(); headers = basicAcknowledgeablePubsubMessage.getPubsubMessage().getAttributesMap(); if (correctRecipient(headers.get(HEADER_RECIPIENT), SERVICE_NAME)) { Product p = pmService.parseProduct(data); String requestId = !StringUtils.isEmpty(headers.get(HEADER_REQUESTID)) ? headers.get(HEADER_REQUESTID) : UUID.randomUUID().toString(); // for now, this will always be a random UUID LocalDateTime modifiedTs = null; if (headers != null && headers.containsKey(HEADER_MODIFIEDTS)) { modifiedTs = LocalDateTime.parse(headers.get(HEADER_MODIFIEDTS), MODIFIEDTS_PARSER); } pmService.processProduct(p, modifiedTs, requestId); basicAcknowledgeablePubsubMessage.ack() } } catch(RuntimeException | JsonProcessingException e) { log.error("Could not receive message", e); basicAcknowledgeablePubsubMessage.nack() } }
Summary
Using the latest features of GCP PubSub it is now possible to have proper retry management, dead letter handling of broken messages and most importantly to reduce the complexity of our Subscription microservices. This makes GCP PubSub much more suitable to use in traditional application messaging.
Fler insikter och blogginlägg
När vi stöter på intressanta tekniska saker på våra äventyr så brukar vi skriva om dom. Sharing is caring!
A summary of the most interesting AI Use Cases we have implemented.
Composable commerce skapar förmågan att möta kunders ändrade förväntningar snabbt och framgångsrikt.
Data Mesh is a strategy for scaling up your reporting and analysis capabilities. Learn more about the Google Cloud building blocks that enable your Data Mesh.