CoolR API to Data Export Migration Guide
- Version: 1.0
- Last Updated: February 24, 2026
- Audience: Teams Currently Using CoolR REST API
Table of Contents
- Why Migrate?
- API vs Export Comparison
- Migration Strategies
- Endpoint Mapping
- Code Migration Examples
- Rollback Procedures
- Common Challenges
Why Migrate?
Current API Limitations
If you're using the CoolR REST API (https://docs.coolrgroup.com/docs/integration/planning), you've likely experienced:
| Challenge | Impact |
|---|---|
| Complex Pagination | Need to implement skip/take logic across millions of records |
| Rate Limiting | API throttling slows down large data pulls |
| Timeout Issues | Long-running queries fail mid-execution |
| Incremental Sync Complexity | Manual timestamp tracking for each endpoint |
| Network Dependency | Real-time API calls required during ETL windows |
| Cost | API compute costs for large data volumes |
Data Export Benefits
| Benefit | Value |
|---|---|
| No Pagination | Single file contains all data |
| No Rate Limits | Download at full network speed |
| Reliability | Pre-generated files, no query timeouts |
| Automated Delta | Built-in ModifiedOn filtering |
| Offline Processing | Download once, process multiple times |
| Cost Effective | Storage costs less than API compute costs |
| Better Performance | Parquet format loads 10-50x faster than JSON API responses |
API vs Export Comparison
Data Retrieval Pattern
Old API Approach:
# Complex pagination loop
skip = 0
take = 1000
all_locations = []
while True:
response = requests.get(
f"https://api.coolr.com/v1/locations?skip={skip}&take={take}",
headers={"Authorization": f"Bearer {token}"}
)
data = response.json()
if not data:
break
all_locations.extend(data)
skip += take
time.sleep(0.5) # Avoid rate limits
# Took 15 minutes for 50,000 records
New Export Approach:
# Single file download
import pandas as pd
df = pd.read_parquet('location.parquet')
# Took 5 seconds for 50,000 records
Incremental Updates
Old API Approach:
# Manual timestamp tracking per endpoint
last_sync = get_last_sync_timestamp('locations')
response = requests.get(
f"https://api.coolr.com/v1/locations?modifiedSince={last_sync}",
headers={"Authorization": f"Bearer {token}"}
)
locations = response.json()
# Repeat for each endpoint: locations, assets, products, orders...
# Need to track timestamps separately for each
New Export Approach:
# Automatic delta handling
df = pd.read_parquet('Delta/YourClient/latest/location.parquet')
# All changes since last export, pre-filtered by CoolR
Migration Strategies
Strategy 1: Parallel Run (Recommended)
Timeline: 2-4 weeks Risk: Low Effort: Medium
Run both API and Export integrations simultaneously, validate Export data matches API before cutover.
Week 1-2: Setup Export integration alongside API
Week 2-3: Compare data quality and completeness
Week 3-4: Gradually shift traffic to Export
Week 4+: Decommission API integration
Validation Script:
import pandas as pd
import requests
def validate_migration(table_name):
# Fetch from API (old method)
api_data = fetch_from_api(table_name)
api_df = pd.DataFrame(api_data)
# Load from Export (new method)
export_df = pd.read_parquet(f'{table_name}.parquet')
export_df = export_df[export_df['IsDeleted'] == 0] # Active records only
# Compare
print(f"API records: {len(api_df)}")
print(f"Export records: {len(export_df)}")
# Check for missing records
api_ids = set(api_df[f'{table_name}Id'])
export_ids = set(export_df[f'{table_name}Id'])
missing_in_export = api_ids - export_ids
missing_in_api = export_ids - api_ids
if missing_in_export:
print(f"⚠️ Missing in Export: {missing_in_export}")
if missing_in_api:
print(f"ℹ️ Extra in Export (recently added): {missing_in_api}")
if not missing_in_export:
print(f"✅ {table_name} validation passed")
validate_migration('location')
validate_migration('asset')
validate_migration('product')
Strategy 2: Phased Migration
Timeline: 4-8 weeks Risk: Very Low Effort: High
Migrate one data domain at a time, maintaining API for unmigrated domains.
Phase 1: Master data (Location, Asset, Product) -> Export
Keep API for transactional data
Phase 2: Transactional data (Order, Inventory) -> Export
Keep API for real-time queries
Phase 3: All data on Export, decommission API
Hybrid Integration Example:
def load_locations():
# Already migrated to Export
return pd.read_parquet('location.parquet')
def load_orders():
# Still using API during migration
return fetch_from_api_with_pagination('/v1/orders')
def build_report():
locations = load_locations() # From Export
orders = load_orders() # From API
report = orders.merge(locations, on='LocationId')
return report
Strategy 3: Big Bang Migration
Timeline: 1 week Risk: High Effort: Low
⚠️ Not Recommended unless:
- Small dataset (less than 10K records total)
- Non-critical reporting system
- Can tolerate downtime
Endpoint Mapping
Map your current API endpoints to Export files:
| API Endpoint | Export File | Notes |
|---|---|---|
GET /v1/locations | location.parquet | Filter IsDeleted = 0 for active only |
GET /v1/locations?modifiedSince= | Delta: location.parquet | Pre-filtered by ModifiedOn |
GET /v1/assets | asset.parquet | Includes AssetTypeCode and AssetTypeName |
GET /v1/assets?locationId= | asset.parquet + filter | Use pandas: df[df['LocationId'] == location_id] |
GET /v1/products | product.parquet | Includes category and manufacturer |
GET /v1/orders | order.parquet | Parent order records |
GET /v1/orders/{id}/items | orderitem.parquet | Filter: orderitem[orderitem['OrderId'] == id] |
GET /v1/inventory | inventory.parquet | Snapshot at export time |
GET /v1/scheduledorders | scheduledorder.parquet + scheduledorderitem.parquet | Two files for recurring orders |
Code Migration Examples
Example 1: Location Listing
Before (API):
import requests
import json
API_URL = "https://api.coolr.com/v1"
API_KEY = "your-api-key"
def get_all_locations():
headers = {"Authorization": f"Bearer {API_KEY}"}
skip = 0
take = 100
all_locations = []
while True:
response = requests.get(
f"{API_URL}/locations?skip={skip}&take={take}",
headers=headers
)
response.raise_for_status()
locations = response.json()
if not locations:
break
all_locations.extend(locations)
skip += take
return all_locations
# Usage
locations = get_all_locations()
print(f"Found {len(locations)} locations")
After (Export):
import pandas as pd
def get_all_locations():
df = pd.read_parquet('location.parquet')
# Filter active locations only (equivalent to API behavior)
df = df[df['IsDeleted'] == False]
# Convert to list of dicts if needed for compatibility
return df.to_dict('records')
# Usage
locations = get_all_locations()
print(f"Found {len(locations)} locations")
Example 2: Incremental Sync
Before (API):
from datetime import datetime, timedelta
import requests
def sync_locations_since_last_run():
# Get last sync time from database
last_sync = get_last_sync_time('locations')
# API call with timestamp filter
response = requests.get(
f"{API_URL}/locations",
params={
"modifiedSince": last_sync.isoformat(),
"skip": 0,
"take": 1000
},
headers={"Authorization": f"Bearer {API_KEY}"}
)
updated_locations = response.json()
# Update database
for location in updated_locations:
upsert_location(location)
# Save new sync timestamp
save_last_sync_time('locations', datetime.utcnow())
return len(updated_locations)
After (Export):
import pandas as pd
from sqlalchemy import create_engine, text
def sync_locations_since_last_run():
engine = create_engine('your-db-connection')
# Get watermark
with engine.connect() as conn:
result = conn.execute(
text("SELECT last_modified FROM watermark WHERE table_name = 'location'")
)
last_sync = result.fetchone()[0]
# Read delta export
df = pd.read_parquet('Delta/YourClient/latest/location.parquet')
# Filter to records modified after watermark
df = df[pd.to_datetime(df['ModifiedOn']) > last_sync]
# Handle deletes
deletes = df[df['IsDeleted'] == True]
if len(deletes) > 0:
delete_ids = [int(v) for v in deletes['LocationId']]
placeholders = ', '.join([f':id_{i}' for i in range(len(delete_ids))])
params = {f'id_{i}': delete_ids[i] for i in range(len(delete_ids))}
with engine.begin() as conn:
conn.execute(text(f"DELETE FROM location WHERE LocationId IN ({placeholders})"), params)
# Upsert active records
active = df[df['IsDeleted'] == False]
if len(active) > 0:
active.to_sql('location', engine, if_exists='append', index=False,
method='multi') # Batch insert
# Update watermark
if len(df) > 0:
new_watermark = df['ModifiedOn'].max()
with engine.begin() as conn:
conn.execute(
text("UPDATE watermark SET last_modified = :ts WHERE table_name = 'location'"),
{"ts": new_watermark}
)
return len(df)
Example 3: Orders with Line Items
Before (API):
def get_order_with_items(order_id):
# Fetch order header
response = requests.get(
f"{API_URL}/orders/{order_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
order = response.json()
# Fetch order items (separate API call)
response = requests.get(
f"{API_URL}/orders/{order_id}/items",
headers={"Authorization": f"Bearer {API_KEY}"}
)
items = response.json()
order['items'] = items
return order
# Usage
order = get_order_with_items(12345)
print(f"Order {order['OrderNumber']} has {len(order['items'])} items")
After (Export):
import pandas as pd
def get_order_with_items(order_id):
# Load both files
orders_df = pd.read_parquet('order.parquet')
items_df = pd.read_parquet('orderitem.parquet')
# Get specific order
order = orders_df[orders_df['OrderId'] == order_id].iloc[0]
# Get items for this order
items = items_df[items_df['OrderId'] == order_id]
# Convert to dict and add items
order_dict = order.to_dict()
order_dict['items'] = items.to_dict('records')
return order_dict
# Usage
order = get_order_with_items(12345)
print(f"Order {order['OrderNumber']} has {len(order['items'])} items")
Example 4: Filtering and Aggregation
Before (API):
def get_active_assets_by_location(location_id):
# API doesn't support complex filtering, need to fetch all and filter
skip = 0
take = 1000
filtered_assets = []
while True:
response = requests.get(
f"{API_URL}/assets?skip={skip}&take={take}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
assets = response.json()
if not assets:
break
# Manual filtering
for asset in assets:
if asset['LocationId'] == location_id and asset['Status'] == 'Active':
filtered_assets.append(asset)
skip += take
return filtered_assets
# Slow: Had to fetch ALL assets to filter for one location
After (Export):
import pandas as pd
def get_active_assets_by_location(location_id):
df = pd.read_parquet('asset.parquet')
# Efficient filtering with pandas
filtered = df[
(df['LocationId'] == location_id) &
(df['Status'] == 'Active') &
(df['IsDeleted'] == False)
]
return filtered.to_dict('records')
# Fast: Single file read + efficient filtering
Rollback Procedures
Emergency Rollback Plan
If you encounter critical issues with Export integration:
Step 1: Immediate Rollback
# Flip feature flag to use API
USE_EXPORT_INTEGRATION = False # Set to False
def get_locations():
if USE_EXPORT_INTEGRATION:
return load_from_export()
else:
return load_from_api() # Fallback to API
Step 2: Root Cause Analysis
# Capture logs and metrics
def load_from_export():
try:
start_time = time.time()
df = pd.read_parquet('location.parquet')
# Log success metrics
log_metric('export_load_success', 1)
log_metric('export_load_time', time.time() - start_time)
log_metric('export_record_count', len(df))
return df
except Exception as e:
# Log failure details
log_error('export_load_failed', str(e))
log_metric('export_load_success', 0)
# Fallback to API
return load_from_api()
Step 3: Gradual Re-enable
# Canary deployment: 10% traffic to Export
import random
def get_locations():
use_export = random.random() < 0.10 # 10% on Export
if use_export:
return load_from_export()
else:
return load_from_api()
# Monitor metrics, increase percentage gradually
Common Challenges
Challenge 1: Real-Time Data Needs
Problem: "We need real-time data, exports are too delayed"
Solution:
- Use Delta exports every 1-6 hours (configurable)
- For true real-time ( less than 5 min latency), keep API for specific high-frequency endpoints
- Most reporting doesn't actually need real-time data
# Hybrid approach
def get_location_data(location_id, real_time_required=False):
if real_time_required:
# Use API for real-time critical data
return fetch_from_api(f'/v1/locations/{location_id}')
else:
# Use Export for reporting/analytics
df = pd.read_parquet('location.parquet')
return df[df['LocationId'] == location_id].to_dict('records')[0]
Challenge 2: Complex API Query Parameters
Problem: "API supports advanced filtering we rely on"
Solution: Use pandas for local filtering (usually faster anyway)
# API filtering
response = requests.get(
f"{API_URL}/assets",
params={
"status": "Active",
"assetType": "Cooler",
"state": "CA",
"installedAfter": "2025-01-01"
}
)
# Export equivalent (often faster!)
df = pd.read_parquet('asset.parquet')
filtered = df[
(df['Status'] == 'Active') &
(df['AssetTypeCode'] == 'Cooler') &
(df['State'] == 'CA') &
(df['InstallationDate'] > '2025-01-01') &
(df['IsDeleted'] == False)
]
Challenge 3: Custom API Endpoints
Problem: "We use custom endpoints not in standard exports"
Solution:
- Request custom export files from CoolR team
- Or keep API for custom endpoints, use Export for standard data
- Most custom queries can be built from standard exports with joins
# Custom endpoint: GET /v1/reports/cooler-performance
# Can often be built from standard exports
locations = pd.read_parquet('location.parquet')
assets = pd.read_parquet('asset.parquet')
inventory = pd.read_parquet('inventory.parquet')
# Build custom report
report = assets.merge(locations, on='LocationId') \
.merge(inventory, on='AssetId') \
.groupby(['State', 'AssetTypeCode']) \
.agg({'QuantityOnHand': 'sum'})
Challenge 4: Authentication Differences
Problem: "Different auth mechanism than API"
Solution: Authentication is simpler with exports!
| Method | API | Export |
|---|---|---|
| API Key | ✅ Required | ❌ Not used |
| OAuth | ✅ Complex flow | ❌ Not needed |
| Managed Identity | ❌ Not supported | ✅ Recommended |
| SAS Token | ❌ Not supported | ✅ Supported |
| SFTP | ❌ Not supported | ✅ Supported |
Migration Checklist
Pre-Migration
- Inventory current API endpoints in use
- Map API endpoints to Export files (see Endpoint Mapping)
- Choose migration strategy (Parallel / Phased / Big Bang)
- Set up Export access (Managed Identity, SAS Token, or SFTP)
- Test Export download in non-production environment
- Validate data completeness vs API
During Migration
- Implement parallel data loading (API + Export)
- Compare data quality between sources
- Measure performance improvements
- Update documentation and runbooks
- Train team on new integration pattern
- Monitor error rates and data freshness
Post-Migration
- Gradually increase Export traffic percentage
- Monitor dashboard for anomalies
- Decommission API integration code
- Remove API credentials and cleanup
- Document lessons learned
- Celebrate successful migration! 🎉
Performance Comparison
Real-world migration results:
| Metric | API (Before) | Export (After) | Improvement |
|---|---|---|---|
| Initial Load Time | 45 minutes | 3 minutes | 15x faster |
| Delta Sync Time | 12 minutes | 30 seconds | 24x faster |
| Monthly API Costs | $450 | $0 | 100% savings |
| Storage Costs | $0 | $25 | New cost |
| Total Monthly Cost | $450 | $25 | 94% savings |
| Data Freshness | 15 min | 4 hours | Acceptable trade-off |
| Code Complexity | 500 lines | 100 lines | 80% reduction |
| Error Rate | 2.3% | 0.1% | 95% reduction |
Support Resources
- Migration Questions: Contact your CoolR account manager
- Technical Issues: support@coolrgroup.com
- Integration Specification: See
DATA_INTEGRATION_SPECIFICATION.md - Implementation Examples: See
INTEGRATION_PROCESS_WALKTHROUGH.md
Ready to start your migration? Begin with the Parallel Run strategy and validate Export data quality before committing to full migration.