CoolR Data Integration Process Walkthrough
- Version: 1.0
- Last Updated: February 24, 2026
- Audience: Data Engineers, Developers, DevOps Teams
Table of Contents
- Integration Setup
- Access Method Implementation
- Common Integration Patterns
- Platform-Specific Examples
- Troubleshooting Guide
Integration Setup
Prerequisites
Before starting your integration, ensure you have:
- CoolR export schedule configured (contact your account manager)
- Export notification method chosen (Email or Webhook)
- Storage access credentials (depends on chosen method)
- Target data platform ready (database, data warehouse, or BI tool)
Access Method Implementation
Method 1: Managed Identity (Recommended for Azure)
Use Case: Your application runs in Azure (VM, Function App, Data Factory, Synapse)
Step 1: Enable Managed Identity
# Enable system-assigned managed identity on your Azure resource
az vm identity assign --name MyDataPipeline --resource-group MyResourceGroup
# Get the principal ID
az vm identity show --name MyDataPipeline --resource-group MyResourceGroup --query principalId
Step 2: Grant Storage Access
# Grant Storage Blob Data Reader role to your managed identity
az role assignment create \
--role "Storage Blob Data Reader" \
--assignee <PRINCIPAL_ID> \
--scope /subscriptions/<SUB_ID>/resourceGroups/<RG>/providers/Microsoft.Storage/storageAccounts/<STORAGE_ACCOUNT>
Step 3: Access Files (Python Example)
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
# Authenticate using managed identity
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(
account_url="https://coolrexports.blob.core.windows.net",
credential=credential
)
# List and download files
container_client = blob_service_client.get_container_client("exports")
for blob in container_client.list_blobs(name_starts_with="Full/YourClient/"):
blob_client = container_client.get_blob_client(blob.name)
with open(f"./data/{blob.name}", "wb") as file:
file.write(blob_client.download_blob().readall())
print(f"Downloaded: {blob.name}")
Method 2: Workload Identity Federation
Use Case: Running in Kubernetes, GitHub Actions, or non-Azure cloud
Step 1: Create Federated Credential
# For Kubernetes workload
az identity federated-credential create \
--name coolr-k8s-integration \
--identity-name MyK8sIdentity \
--resource-group MyResourceGroup \
--issuer https://kubernetes.example.com \
--subject system:serviceaccount:default:coolr-integration \
--audience api://AzureADTokenExchange
Step 2: Configure Kubernetes Service Account
apiVersion: v1
kind: ServiceAccount
metadata:
name: coolr-integration
namespace: default
annotations:
azure.workload.identity/client-id: "<MANAGED_IDENTITY_CLIENT_ID>"
Step 3: Use in Pod
# Same code as Managed Identity - DefaultAzureCredential handles federation
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
credential = DefaultAzureCredential()
blob_service_client = BlobServiceClient(
account_url="https://coolrexports.blob.core.windows.net",
credential=credential
)
Method 3: Webhook Notification
Use Case: Event-driven architecture, real-time processing
Step 1: Create Webhook Endpoint
from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
@app.route('/api/exports/notification', methods=['POST'])
def handle_export_notification():
try:
# Parse webhook payload
data = request.get_json()
client_name = data['clientName']
export_type = data['exportType']
export_date = data['exportDateTime']
files = data['files']
print(f"Received export notification: {client_name} - {export_type}")
# Download each file
for file in files:
file_name = file['fileName']
download_url = file['downloadUrl'] # Includes SAS token
# Validate the URL is from the expected Azure Blob Storage host before downloading.
# In production, also verify the webhook request signature/HMAC and restrict
# access to known source IPs.
allowed_host = "coolrgroup.blob.core.windows.net"
from urllib.parse import urlparse
parsed = urlparse(download_url)
if parsed.scheme != "https" or parsed.hostname != allowed_host:
raise ValueError(f"Unexpected download URL host: {parsed.hostname}")
response = requests.get(download_url)
with open(f"./downloads/{file_name}", "wb") as f:
f.write(response.content)
print(f"Downloaded: {file_name}")
# Trigger your ETL pipeline
trigger_etl_pipeline(client_name, export_type, files)
return jsonify({"status": "success"}), 200
except Exception as e:
print(f"Error processing webhook: {e}")
return jsonify({"status": "error", "message": str(e)}), 500
def trigger_etl_pipeline(client, export_type, files):
# Your pipeline logic here
pass
if __name__ == '__main__':
app.run(host='0.0.0.0', port=8080, ssl_context='adhoc')
Step 2: Secure Your Endpoint
# Add webhook signature validation
import hmac
import hashlib
WEBHOOK_SECRET = "your-shared-secret"
def validate_webhook_signature(request):
signature = request.headers.get('X-CoolR-Signature')
payload = request.get_data()
expected_signature = hmac.new(
WEBHOOK_SECRET.encode(),
payload,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
@app.route('/api/exports/notification', methods=['POST'])
def handle_export_notification():
if not validate_webhook_signature(request):
return jsonify({"error": "Invalid signature"}), 401
# ... rest of handler
Method 4: SAS Token
Use Case: Quick setup, temporary access, testing
Step 1: Receive SAS Token
You'll receive a SAS token via email or webhook notification:
https://coolrexports.blob.core.windows.net/exports/Full/YourClient/20260224120000?sv=2021-06-08&se=2026-03-03&sr=d&sp=rl&sig=...
Step 2: Download Files (Python)
from azure.storage.blob import ContainerClient
# Use the SAS URL from notification
sas_url = "https://coolrexports.blob.core.windows.net/exports/Full/YourClient/20260224120000?sv=..."
container_client = ContainerClient.from_container_url(sas_url)
# List and download all files in the export
for blob in container_client.list_blobs():
blob_client = container_client.get_blob_client(blob.name)
with open(f"./data/{blob.name}", "wb") as file:
file.write(blob_client.download_blob().readall())
print(f"Downloaded: {blob.name}")
Step 3: Download Files (PowerShell)
# Using AzCopy (fastest for large files)
$sasUrl = "https://coolrexports.blob.core.windows.net/exports/Full/YourClient/20260224120000?sv=..."
azcopy copy $sasUrl "C:\Data\CoolR\" --recursive
# Or using Azure PowerShell
Install-Module -Name Az.Storage
$ctx = New-AzStorageContext -SasToken $sasToken -StorageAccountName "coolrexports"
Get-AzStorageBlob -Container "exports" -Context $ctx |
Get-AzStorageBlobContent -Destination "C:\Data\CoolR\"
Method 5: SFTP
Use Case: Legacy systems, non-cloud infrastructure
Step 1: Configure SFTP Credentials
Provide to CoolR team:
- SFTP hostname
- Port (default: 22)
- Username
- Authentication method (password or SSH key)
Step 2: Download Files (Python)
import paramiko
# Connect to SFTP server
transport = paramiko.Transport(('your-sftp-server.com', 22))
transport.connect(username='coolr-user', password='your-password')
sftp = paramiko.SFTPClient.from_transport(transport)
# Navigate to export directory
sftp.chdir('/coolr/prod/lrsukz-464/20260224120000')
# Download all files
for filename in sftp.listdir():
local_path = f"./data/{filename}"
sftp.get(filename, local_path)
print(f"Downloaded: {filename}")
sftp.close()
transport.close()
Step 3: Automated SFTP Sync (Bash)
#!/bin/bash
SFTP_HOST="your-sftp-server.com"
SFTP_USER="coolr-user"
REMOTE_DIR="/coolr/prod/lrsukz-464"
LOCAL_DIR="/data/coolr"
# Download new files using SFTP with SSH key authentication (recommended for automation).
# Ensure your SSH key is configured in ~/.ssh/authorized_keys on the SFTP server.
sftp -oBatchMode=yes "$SFTP_USER@$SFTP_HOST" <<EOF
cd "$REMOTE_DIR"
lcd "$LOCAL_DIR"
mget *.parquet
EOF
echo "SFTP sync completed"
Common Integration Patterns
Pattern 1: Initial Full Load + Incremental Delta
Use Case: Most common pattern for data warehouses
import pandas as pd
from sqlalchemy import create_engine, text
# Database connection
engine = create_engine('postgresql://user:pass@localhost/warehouse')
def initial_load():
"""One-time full data load"""
print("Starting initial load...")
# Download and load each export file
files = ['location', 'asset', 'product', 'order', 'orderitem']
for table in files:
# Read parquet file
df = pd.read_parquet(f'./data/Full/YourClient/20260224/{table}.parquet')
# Filter active records only
df = df[df['IsDeleted'] == 0]
# Load to database
df.to_sql(table, engine, if_exists='replace', index=False)
# Track watermark
max_modified = df['ModifiedOn'].max()
update_watermark(table, max_modified)
print(f"Loaded {len(df)} records into {table}")
def delta_update():
"""Regular incremental updates"""
print("Starting delta update...")
files = ['location', 'asset', 'product', 'order', 'orderitem']
for table in files:
# Get last processed timestamp
last_watermark = get_watermark(table)
# Read delta parquet file
df = pd.read_parquet(f'./data/Delta/YourClient/latest/{table}.parquet')
# Filter to only new changes
df = df[pd.to_datetime(df['ModifiedOn']) > last_watermark]
if len(df) == 0:
print(f"No new records for {table}")
continue
# Handle deletes
deletes = df[df['IsDeleted'] == 1]
if len(deletes) > 0:
# Use parameterized query for IDs; table names come from the fixed `files` list above.
delete_ids = [int(v) for v in deletes[f'{table}Id']]
placeholders = ', '.join([f':id_{i}' for i in range(len(delete_ids))])
params = {f'id_{i}': delete_ids[i] for i in range(len(delete_ids))}
with engine.begin() as conn:
conn.execute(text(f"DELETE FROM {table} WHERE {table}Id IN ({placeholders})"), params)
# Upsert active records
active = df[df['IsDeleted'] == 0]
if len(active) > 0:
active.to_sql(f'{table}_staging', engine, if_exists='replace', index=False)
# Merge from staging; table names come from the fixed `files` list above.
with engine.begin() as conn:
conn.execute(text(f"""
INSERT INTO {table}
SELECT * FROM {table}_staging
ON CONFLICT ({table}Id) DO UPDATE SET
ModifiedOn = EXCLUDED.ModifiedOn
-- ... other columns
"""))
# Update watermark
max_modified = df['ModifiedOn'].max()
update_watermark(table, max_modified)
print(f"Processed {len(df)} changes for {table}")
def get_watermark(table):
with engine.connect() as conn:
result = conn.execute(
text("SELECT last_modified FROM watermark WHERE table_name = :table"),
{"table": table}
)
row = result.fetchone()
return row[0] if row else pd.Timestamp('1900-01-01')
def update_watermark(table, timestamp):
with engine.begin() as conn:
conn.execute(
text("""
INSERT INTO watermark (table_name, last_modified, updated_at)
VALUES (:table, :ts, NOW())
ON CONFLICT (table_name) DO UPDATE SET
last_modified = EXCLUDED.last_modified,
updated_at = NOW()
"""),
{"table": table, "ts": timestamp}
)
# Run initial load once
initial_load()
# Schedule delta_update() to run every 6 hours
Pattern 2: Real-Time Webhook Processing
Use Case: Event-driven pipelines with immediate processing
from flask import Flask, request
from azure.storage.blob import BlobServiceClient
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import urlparse
app = Flask(__name__)
engine = create_engine('postgresql://user:pass@localhost/warehouse')
ALLOWED_BLOB_HOST = "coolrgroup.blob.core.windows.net"
@app.route('/webhook', methods=['POST'])
def process_export():
data = request.json
# Extract export details
export_type = data['exportType']
files = data['files']
for file in files:
# Download file
download_url = file['downloadUrl']
table_name = file['fileName'].replace('.parquet', '')
# Validate URL host before fetching to prevent SSRF.
# In production, also verify the webhook request signature/HMAC.
parsed = urlparse(download_url)
if parsed.scheme != "https" or parsed.hostname != ALLOWED_BLOB_HOST:
raise ValueError(f"Unexpected download URL host: {parsed.hostname}")
# Read directly from validated URL
df = pd.read_parquet(download_url)
if export_type == 'Full':
# Replace entire table
df[df['IsDeleted'] == 0].to_sql(table_name, engine, if_exists='replace', index=False)
else:
# Delta processing
process_delta(table_name, df)
print(f"Processed {table_name}: {len(df)} records")
# Trigger downstream jobs
trigger_bi_refresh()
return {"status": "success"}, 200
def process_delta(table, df):
# Handle deletes
deletes = df[df['IsDeleted'] == 1]
if len(deletes) > 0:
# Use parameterized query for IDs; table names come from validated file names above.
delete_ids = [int(v) for v in deletes[f'{table}Id']]
placeholders = ', '.join([f':id_{i}' for i in range(len(delete_ids))])
params = {f'id_{i}': delete_ids[i] for i in range(len(delete_ids))}
with engine.begin() as conn:
conn.execute(text(f"DELETE FROM {table} WHERE {table}Id IN ({placeholders})"), params)
# Upsert active records
active = df[df['IsDeleted'] == 0]
if len(active) > 0:
# Use pandas upsert or staging table approach
active.to_sql(f'{table}_temp', engine, if_exists='replace', index=False)
with engine.begin() as conn:
conn.execute(text(f"INSERT INTO {table} SELECT * FROM {table}_temp ON CONFLICT DO UPDATE ..."))
if __name__ == '__main__':
app.run(port=8080)
Platform-Specific Examples
Power BI Integration
Step 1: Open Power Query Editor
Open Power BI Desktop Click Home > Transform data > Transform data This opens the Power Query Editor
Step 2: Create a New Blank Query
In Power Query Editor, click Home > New Source > Blank Query A new query called Query1 appears in the left panel Rename it (right-click > Rename) to something like ActiveOutlets (to match the main table returned by the M query below)
Step 3: Open the Advanced Editor
With your blank query selected, click Home > Advanced Editor Clear the default content Paste in the full M query:
let
Source = AzureStorage.Blobs("https://coolr3data3exports.blob.core.windows.net/folder"),
// Keep only actual files - paths with exactly 3 segments (timestamp/subfolder/guid.parquet)
FilesOnly = Table.SelectRows(Source, each List.Count(Text.Split([Name], "/")) = 3),
// Add timestamp column
WithTimestamp = Table.AddColumn(FilesOnly, "Timestamp", each Text.BeforeDelimiter([Name], "/")),
// Filter to dump blobs only
DumpBlobs = Table.SelectRows(WithTimestamp, each Text.Contains([Name], "/outlet_dump_")),
// Latest dump timestamp
LatestDumpTimestamp = List.Max(DumpBlobs[Timestamp]),
// Filter to latest dump
LatestDump = Table.SelectRows(DumpBlobs, each [Timestamp] = LatestDumpTimestamp),
// Take the first parquet file
OutletContent = LatestDump{0}[Content],
// Parse parquet
OutletData = Parquet.Document(OutletContent),
// Filter active records
ActiveOutlets = Table.SelectRows(OutletData, each [IsDeleted] = 0)
in
ActiveOutlets
Step 4: Handle Authentication
Power BI will prompt you to authenticate against the Azure Blob storage.
- A credential dialog will appear -- Edit Credentials
- Choose A Shared Access Signature (SAS)
- SAS: paste the SAS token URL you generated for the client
- Set the Privacy Level to Organizational (or None for testing)
- Click Connect
Step 5: Verify the Data Loads
The query should execute and show a preview of ActiveOutlets Check that columns like LocationId, Name, IsDeleted etc. appear correctly If you get an error you can see the results of each step such as LatestDumpTimestamp etc and let us know where you aren’t seeing results
You can repeat this step for each dataset.
Snowflake Integration
-- Step 1: Create external stage
CREATE OR REPLACE STAGE coolr_exports
URL = 'azure://coolrexports.blob.core.windows.net/exports'
CREDENTIALS = (AZURE_SAS_TOKEN = '?sv=2021...');
-- Step 2: Create file format
CREATE OR REPLACE FILE FORMAT parquet_format
TYPE = PARQUET
COMPRESSION = SNAPPY;
-- Step 3: Load data using COPY INTO
COPY INTO location
FROM @coolr_exports/Full/YourClient/20260224/location.parquet
FILE_FORMAT = (FORMAT_NAME = parquet_format)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
PURGE = FALSE;
-- Step 4: Automated incremental load
CREATE OR REPLACE TASK load_delta_location
WAREHOUSE = COMPUTE_WH
SCHEDULE = 'USING CRON 0 */6 * * * UTC' -- Every 6 hours
AS
MERGE INTO location AS target
USING (
SELECT $1:LocationId::INT AS LocationId,
$1:Name::VARCHAR AS Name,
$1:ModifiedOn::TIMESTAMP AS ModifiedOn,
$1:IsDeleted::BOOLEAN AS IsDeleted
FROM @coolr_exports/Delta/YourClient/latest/location.parquet
(FILE_FORMAT => parquet_format)
) AS source
ON target.LocationId = source.LocationId
WHEN MATCHED AND source.IsDeleted = TRUE THEN DELETE
WHEN MATCHED AND source.IsDeleted = FALSE THEN UPDATE SET
target.Name = source.Name,
target.ModifiedOn = source.ModifiedOn
WHEN NOT MATCHED AND source.IsDeleted = FALSE THEN INSERT
(LocationId, Name, ModifiedOn, IsDeleted)
VALUES (source.LocationId, source.Name, source.ModifiedOn, source.IsDeleted);
-- Enable task
ALTER TASK load_delta_location RESUME;
Apache Airflow DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.microsoft.azure.hooks.wasb import WasbHook
from datetime import datetime, timedelta
import pandas as pd
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2026, 2, 24),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'coolr_delta_import',
default_args=default_args,
description='Import CoolR delta exports',
schedule_interval='0 */6 * * *', # Every 6 hours
catchup=False
)
def download_export(**context):
hook = WasbHook(wasb_conn_id='azure_coolr')
# List latest delta export
blobs = hook.get_blobs_list(container_name='exports', prefix='Delta/YourClient/')
latest_export = max(blobs)
# Download all parquet files
files = hook.get_blobs_list(container_name='exports', prefix=latest_export)
for blob in files:
content = hook.read_file(container_name='exports', blob_name=blob)
with open(f'/tmp/coolr/{blob.split("/")[-1]}', 'wb') as f:
f.write(content)
return latest_export
def process_delta(**context):
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/warehouse')
files = ['location', 'asset', 'product', 'order', 'orderitem']
for table in files:
df = pd.read_parquet(f'/tmp/coolr/{table}.parquet')
# Process deletes and upserts
# ... (similar to earlier examples)
print(f"Processed {table}: {len(df)} records")
download_task = PythonOperator(
task_id='download_export',
python_callable=download_export,
dag=dag
)
process_task = PythonOperator(
task_id='process_delta',
python_callable=process_delta,
dag=dag
)
download_task >> process_task
Troubleshooting Guide
Issue 1: Cannot Download Files - Access Denied
Symptoms:
- HTTP 403 Forbidden errors
- "This request is not authorized to perform this operation"
Solutions:
# Check 1: Verify SAS token hasn't expired
from urllib.parse import parse_qs, urlparse
url = "your-sas-url-here"
params = parse_qs(urlparse(url).query)
expiry = params.get('se', [''])[0]
print(f"SAS Token expires: {expiry}")
# Check 2: Verify managed identity has correct role
# Azure CLI
az role assignment list --assignee <PRINCIPAL_ID> --scope <STORAGE_ACCOUNT_SCOPE>
# Check 3: Test connection with minimal code
from azure.storage.blob import BlobServiceClient
from azure.identity import DefaultAzureCredential
try:
client = BlobServiceClient(
account_url="https://coolrexports.blob.core.windows.net",
credential=DefaultAzureCredential()
)
containers = client.list_containers()
print("Connection successful!")
except Exception as e:
print(f"Connection failed: {e}")
Issue 2: Parquet Files Won't Load
Symptoms:
- "Unable to open Parquet file"
- Schema errors
Solutions:
# Check 1: Verify file isn't corrupted
import pyarrow.parquet as pq
try:
table = pq.read_table('location.parquet')
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
except Exception as e:
print(f"File error: {e}")
# Check 2: Handle schema evolution
df = pd.read_parquet('location.parquet', engine='pyarrow')
# Add missing columns if schema changed
expected_columns = ['LocationId', 'Code', 'Name', 'IsDeleted', 'ModifiedOn']
for col in expected_columns:
if col not in df.columns:
df[col] = None
# Check 3: Use different Parquet engine
df = pd.read_parquet('location.parquet', engine='fastparquet') # Alternative engine
Issue 3: Delta Processing Misses Records
Symptoms:
- Records not appearing in target database
- Duplicate records appearing
Solutions:
# Check 1: Verify watermark tracking
def verify_watermark():
engine = create_engine('your-connection-string')
result = engine.execute("SELECT * FROM watermark ORDER BY updated_at DESC")
for row in result:
print(f"Table: {row.table_name}, Last Modified: {row.last_modified}")
# Check 2: Check for timezone issues
df = pd.read_parquet('delta.parquet')
print(f"ModifiedOn dtype: {df['ModifiedOn'].dtype}")
print(f"Sample values: {df['ModifiedOn'].head()}")
# Convert to UTC if needed
df['ModifiedOn'] = pd.to_datetime(df['ModifiedOn']).dt.tz_localize('UTC')
# Check 3: Audit delta processing
def audit_delta(table_name):
df = pd.read_parquet(f'{table_name}.parquet')
print(f"Total records: {len(df)}")
print(f"Deleted records: {len(df[df['IsDeleted'] == 1])}")
print(f"Active records: {len(df[df['IsDeleted'] == 0])}")
print(f"Date range: {df['ModifiedOn'].min()} to {df['ModifiedOn'].max()}")
Issue 4: Webhook Not Receiving Notifications
Symptoms:
- No webhook calls received
- Sporadic webhook delivery
Solutions:
# Check 1: Verify webhook endpoint is accessible
# Test with curl
curl -X POST https://your-domain.com/webhook \
-H "Content-Type: application/json" \
-d '{"test": "payload"}'
# Check 2: Add detailed logging
import logging
logging.basicConfig(level=logging.DEBUG)
@app.route('/webhook', methods=['POST'])
def webhook_handler():
logging.info(f"Webhook received from {request.remote_addr}")
logging.info(f"Headers: {request.headers}")
logging.info(f"Payload: {request.get_data()}")
# Your processing logic
return {"status": "success"}, 200
# Check 3: Implement retry mechanism
from flask import Flask
from functools import wraps
import time
def retry_on_failure(max_retries=3):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return f(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt)
return wrapper
return decorator
@app.route('/webhook', methods=['POST'])
@retry_on_failure(max_retries=3)
def webhook_handler():
# Your logic here
pass
Issue 5: Large File Performance
Symptoms:
- Slow download speeds
- Memory errors when loading files
Solutions:
# Solution 1: Use chunked reading for large Parquet files
import pyarrow.parquet as pq
parquet_file = pq.ParquetFile('large_file.parquet')
for batch in parquet_file.iter_batches(batch_size=10000):
df = batch.to_pandas()
# Process chunk
df.to_sql('table', engine, if_exists='append', index=False)
# Solution 2: Use AzCopy for faster downloads
import subprocess
subprocess.run([
'azcopy', 'copy',
'https://coolrexports.blob.core.windows.net/exports/Full/YourClient/?sv=...',
'./data/',
'--recursive'
])
# Solution 3: Stream directly to database without saving to disk
from azure.storage.blob import BlobClient
import io
blob_client = BlobClient.from_blob_url(blob_url)
stream = io.BytesIO()
blob_client.download_blob().readinto(stream)
stream.seek(0)
df = pd.read_parquet(stream)
df.to_sql('table', engine, if_exists='append', index=False)
Best Practices Summary
- Always use ModifiedOn for delta processing - Don't rely on CreatedOn
- Handle IsDeleted properly - Include deleted records in delta sync
- Track watermarks per table - Don't use a single global watermark
- Use Parquet format - Much faster and smaller than CSV
- Implement retry logic - Network failures happen, be resilient
- Monitor export processing - Set up alerts for failures and delays
- Test with small datasets first - Validate your logic before full production
- Use managed identity when possible - Most secure option for Azure
- Keep SAS tokens secure - Never commit to git, use secrets management
- Document your pipeline - Future you will thank present you
Need Help?
- Review the main specification:
DATA_INTEGRATION_SPECIFICATION.md - Migrating from API:
API_MIGRATION_GUIDE.md - Contact: support@coolrgroup.com