(2025-June-30) Working with different abbreviations or acronyms can speed up communication, but at the same time, it can exclude those who are not familiar with them. Sometimes this exclusion happens without intent, but the worst case is when a speaker knowingly acts as a gatekeeper, using specialized professional abbreviations as a display of personal pride or achievement, often mixed with a hint of arrogance or disregard for others’ understanding.
I think there is real value in providing a brief explanation when shortened terms are unintentionally omitted; it's a small gesture that can go a long way in fostering clarity and inclusion. As for the latter case, when jargon is used to exclude or impress, just don’t be that way! 😊
Image by Robert Owen-Wahl from Pixabay
There is a scene in the Shrek 2 animated movie where Donkey can’t help but annoy everyone with his endless question: “Are we there yet?” Shrek tries to keep his cool, but after hearing the same question over and over again, there is nothing left of his patience but a scream.
Imagine you’re working on a data integration project that aims to collect and store any new data changes from your Salesforce platform or any other source system. You want to retrieve those change events as soon as they become available, which instantly puts you in a position to ask the question: “Are there any updates?”. And you may find yourself asking these questions repeatedly.
Salesforce offers a robust Streaming API built on a publish-subscribe model. Instead of repeatedly querying Salesforce with “Are there any updates?”, you can rely on this API to receive notifications whenever new records appear. This approach enables you to create connected applications that achieve near real-time data synchronization without the overhead of constant polling.
Here are several key stages to enable this streaming workflow:
First, you select the objects you want to track (e.g., Account, Contact, Opportunity) in the Salesforce configuration setup. When enabled, Salesforce will generate change events automatically whenever records in these objects are created, updated, deleted, or undeleted.
- /data/AccountChangeEvent
- /data/CaseChangeEvent
- The change type (CREATE, UPDATE, DELETE, UNDELETE)
- Record IDs
- Changed fields and their new values
- Transaction and commit metadata
- A Replay ID (a unique identifier for the event)
How Salesforce Change Data Capture (CDC) Streaming Works Behind the Scenes
Implementing Salesforce CDC Streaming in Azure Functions with Python
I’m going to describe a few key functions for implementing Salesforce CDC data integration using a Python Azure Function App. You can always extend this example and add more functional steps as needed.
Obtain a Salesforce session with an access token using OAuth 2.0
def get_salesforce_session(self): # Obtain Salesforce session with access token using OAuth 2.0. try: token_url = f'{INSTANCE_URL}/services/oauth2/token' payload = { "grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET } response = requests.post(token_url, data=payload) response.raise_for_status() token_data = response.json() access_token = token_data.get("access_token") instance_url = token_data.get("instance_url") if not access_token or not instance_url: raise Exception("Failed to obtain access token or instance URL") self.sf = Salesforce(instance_url=instance_url, session_id=access_token) logger.info("Successfully authenticated with Salesforce") return self.sf except Exception as e: logger.error(f"Error obtaining Salesforce session: {e}") raise
Perform a single CometD handshake for all channels.
def cometd_handshake(self, cometd_url): # Perform single CometD handshake for all channels. self.session = self.get_cometd_session() handshake_payload = [{ "version": "1.0", "minimumVersion": "0.9", "channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "advice": {"timeout": 60000, "interval": 0} }] response = self.session.post(cometd_url, json=handshake_payload) if response.status_code == 200: result = response.json()[0] if result.get('successful'): self.client_id = result['clientId'] logger.info(f"CometD handshake successful. Client ID: {self.client_id}") return True logger.error(f"Handshake failed: {response.text}") return False
Subscribe to all CDC channels one by one, using replayIds from saved checkpoints
def cometd_subscribe_all(self, cometd_url): # Subscribe to all CDC channels one by one, using replayIds from Azure Blob checkpoints. replay_ids = {} for channel in self.channels: # Construct Azure-safe blob name blob_name = f"Checkpoint{channel.replace('/', '_').replace(':', '')}.json" blob_client = BLOB_SERVICE_CLIENT.get_blob_client(container=CHECKPOINT_CONTAINER, blob=blob_name) try: # Attempt to read the blob content blob_data = blob_client.download_blob().readall() replay_id = json.loads(blob_data).get("replayId") replay_ids[channel] = replay_id except ResourceNotFoundError: replay_ids[channel] = None logger.info(f"No checkpoint found for channel {channel}. Starting fresh.") except (json.JSONDecodeError, Exception) as e: replay_ids[channel] = None logger.error(f"Failed to load checkpoint for channel {channel}: {e}", exc_info=True) def build_payload(channel, replay_id): return { "channel": "/meta/subscribe", "clientId": self.client_id, "subscription": channel, "ext": { "payload.format": "FULL", "replay": {channel: replay_id} } } final_channels = [] for channel in self.channels: current_replay_id = replay_ids[channel] if replay_ids[channel] is not None else -1 payload = build_payload(channel, current_replay_id) response = self.session.post(cometd_url, json=[payload]) if response.status_code != 200: logger.error(f"Failed to connect to channel {channel}. HTTP {response.status_code}: {response.text}") continue results = response.json() for result in results: result_channel = result.get('channel', '') if result_channel.startswith("/meta/"): # Subscription confirmation if result.get('successful'): logger.info(f"Subscribed to channel: {channel} with replayId={current_replay_id}.") final_channels.append(channel) else: error = result.get('error', '') logger.warning(f"Subscription failed for {channel} with replayId={current_replay_id}: {error}") if "invalid" in error.lower() and "replayid" in error.lower(): logger.info(f"Retrying subscription for {channel} with replayId = -1") retry_payload = build_payload(channel, -1) retry_response = self.session.post(cometd_url, json=[retry_payload]) retry_result = retry_response.json()[0] if retry_response.status_code == 200 and retry_result.get('successful'): logger.info(f"Subscribed to channel: {channel} with replayId = -1 after retry") final_channels.append(channel) save_replay_id(channel, -1) else: logger.error(f"Retry failed for {channel}: {retry_response.text}") elif "403::User not allowed" in error: logger.error(f"Permission denied for channel {channel}. Skipping.") else: logger.error(f"Unknown error for channel {channel}: {error}") logger.warning(f"Subscription result: {result}") elif result_channel.startswith("/data/"): # Data message bundled in subscription response logger.info(f"Received data event for channel {result_channel}: {json.dumps(result)}") self.on_message(result) self.channels = final_channels return bool(final_channels)
Maintain a single long-polling connection for all channels
def cometd_connect(self, cometd_url): # Maintain single long-polling connection for all channels. MAX_RUNTIME = 260 # 4m20s for a 5m timeout (40s buffer) start_time = time.time() while True: # Timeout check at the start of each iteration elapsed = time.time() - start_time if elapsed > MAX_RUNTIME: logger.info("Approaching function timeout - exiting gracefully") return True # Graceful exit, run() will treat as successful try: connect_payload = [{ "channel": "/meta/connect", "clientId": self.client_id, "connectionType": "long-polling" }] response = self.session.post(cometd_url, json=connect_payload, timeout=40) if response.status_code == 200: messages = response.json() for message in messages: if message.get('channel') in self.channels: # CDC event self.on_message(message) elif message.get('error') == '403::Unknown client': logger.warning("Client expired, reconnecting...") return False else: logger.error(f"Connect failed: {response.text}") time.sleep(10) except requests.exceptions.ReadTimeout: logger.info("Long-polling timeout reached, re-establishing connection...") continue # Immediately try again except requests.exceptions.RequestException as e: logger.error(f"Connect error: {e}") time.sleep(10) except Exception as e: logger.error(f"Unexpected error: {e}") time.sleep(10)
Since I didn’t post the entire code of my Azure Function App, you might have inferred some details from the code comments. All the processed event JSON messages were saved in Azure File Storage along with checkpoint files that stored the last replayId of each successfully processed message. This allowed the application to safely reconnect to the channels after a possible connection disruption and retrieve all the “missed” event messages during the downtime.
Testing confirmed that this mechanism, using the replayIds, was reliable and stable. Kudos to the Salesforce team for retaining all those messages in its internal Service Bus message repository for the configured CDC entities!
Comments
Post a Comment