Skip to main content

CoolR API to Data Export Migration Guide

  • Version: 1.0
  • Last Updated: February 24, 2026
  • Audience: Teams Currently Using CoolR REST API

Table of Contents

  1. Why Migrate?
  2. API vs Export Comparison
  3. Migration Strategies
  4. Endpoint Mapping
  5. Code Migration Examples
  6. Rollback Procedures
  7. Common Challenges

Why Migrate?

Current API Limitations

If you're using the CoolR REST API (https://docs.coolrgroup.com/docs/integration/planning), you've likely experienced:

ChallengeImpact
Complex PaginationNeed to implement skip/take logic across millions of records
Rate LimitingAPI throttling slows down large data pulls
Timeout IssuesLong-running queries fail mid-execution
Incremental Sync ComplexityManual timestamp tracking for each endpoint
Network DependencyReal-time API calls required during ETL windows
CostAPI compute costs for large data volumes

Data Export Benefits

BenefitValue
No PaginationSingle file contains all data
No Rate LimitsDownload at full network speed
ReliabilityPre-generated files, no query timeouts
Automated DeltaBuilt-in ModifiedOn filtering
Offline ProcessingDownload once, process multiple times
Cost EffectiveStorage costs less than API compute costs
Better PerformanceParquet format loads 10-50x faster than JSON API responses

API vs Export Comparison

Data Retrieval Pattern

Old API Approach:

# Complex pagination loop
skip = 0
take = 1000
all_locations = []

while True:
response = requests.get(
f"https://api.coolr.com/v1/locations?skip={skip}&take={take}",
headers={"Authorization": f"Bearer {token}"}
)
data = response.json()

if not data:
break

all_locations.extend(data)
skip += take
time.sleep(0.5) # Avoid rate limits

# Took 15 minutes for 50,000 records

New Export Approach:

# Single file download
import pandas as pd

df = pd.read_parquet('location.parquet')
# Took 5 seconds for 50,000 records

Incremental Updates

Old API Approach:

# Manual timestamp tracking per endpoint
last_sync = get_last_sync_timestamp('locations')

response = requests.get(
f"https://api.coolr.com/v1/locations?modifiedSince={last_sync}",
headers={"Authorization": f"Bearer {token}"}
)

locations = response.json()

# Repeat for each endpoint: locations, assets, products, orders...
# Need to track timestamps separately for each

New Export Approach:

# Automatic delta handling
df = pd.read_parquet('Delta/YourClient/latest/location.parquet')
# All changes since last export, pre-filtered by CoolR

Migration Strategies

Timeline: 2-4 weeks Risk: Low Effort: Medium

Run both API and Export integrations simultaneously, validate Export data matches API before cutover.

Week 1-2: Setup Export integration alongside API
Week 2-3: Compare data quality and completeness
Week 3-4: Gradually shift traffic to Export
Week 4+: Decommission API integration

Validation Script:

import pandas as pd
import requests

def validate_migration(table_name):
# Fetch from API (old method)
api_data = fetch_from_api(table_name)
api_df = pd.DataFrame(api_data)

# Load from Export (new method)
export_df = pd.read_parquet(f'{table_name}.parquet')
export_df = export_df[export_df['IsDeleted'] == 0] # Active records only

# Compare
print(f"API records: {len(api_df)}")
print(f"Export records: {len(export_df)}")

# Check for missing records
api_ids = set(api_df[f'{table_name}Id'])
export_ids = set(export_df[f'{table_name}Id'])

missing_in_export = api_ids - export_ids
missing_in_api = export_ids - api_ids

if missing_in_export:
print(f"⚠️ Missing in Export: {missing_in_export}")
if missing_in_api:
print(f"ℹ️ Extra in Export (recently added): {missing_in_api}")

if not missing_in_export:
print(f"✅ {table_name} validation passed")

validate_migration('location')
validate_migration('asset')
validate_migration('product')

Strategy 2: Phased Migration

Timeline: 4-8 weeks Risk: Very Low Effort: High

Migrate one data domain at a time, maintaining API for unmigrated domains.

Phase 1: Master data (Location, Asset, Product) -> Export
Keep API for transactional data
Phase 2: Transactional data (Order, Inventory) -> Export
Keep API for real-time queries
Phase 3: All data on Export, decommission API

Hybrid Integration Example:

def load_locations():
# Already migrated to Export
return pd.read_parquet('location.parquet')

def load_orders():
# Still using API during migration
return fetch_from_api_with_pagination('/v1/orders')

def build_report():
locations = load_locations() # From Export
orders = load_orders() # From API

report = orders.merge(locations, on='LocationId')
return report

Strategy 3: Big Bang Migration

Timeline: 1 week Risk: High Effort: Low

⚠️ Not Recommended unless:

  • Small dataset (less than 10K records total)
  • Non-critical reporting system
  • Can tolerate downtime

Endpoint Mapping

Map your current API endpoints to Export files:

API EndpointExport FileNotes
GET /v1/locationslocation.parquetFilter IsDeleted = 0 for active only
GET /v1/locations?modifiedSince=Delta: location.parquetPre-filtered by ModifiedOn
GET /v1/assetsasset.parquetIncludes AssetTypeCode and AssetTypeName
GET /v1/assets?locationId=asset.parquet + filterUse pandas: df[df['LocationId'] == location_id]
GET /v1/productsproduct.parquetIncludes category and manufacturer
GET /v1/ordersorder.parquetParent order records
GET /v1/orders/{id}/itemsorderitem.parquetFilter: orderitem[orderitem['OrderId'] == id]
GET /v1/inventoryinventory.parquetSnapshot at export time
GET /v1/scheduledordersscheduledorder.parquet + scheduledorderitem.parquetTwo files for recurring orders

Code Migration Examples

Example 1: Location Listing

Before (API):

import requests
import json

API_URL = "https://api.coolr.com/v1"
API_KEY = "your-api-key"

def get_all_locations():
headers = {"Authorization": f"Bearer {API_KEY}"}
skip = 0
take = 100
all_locations = []

while True:
response = requests.get(
f"{API_URL}/locations?skip={skip}&take={take}",
headers=headers
)
response.raise_for_status()

locations = response.json()
if not locations:
break

all_locations.extend(locations)
skip += take

return all_locations

# Usage
locations = get_all_locations()
print(f"Found {len(locations)} locations")

After (Export):

import pandas as pd

def get_all_locations():
df = pd.read_parquet('location.parquet')

# Filter active locations only (equivalent to API behavior)
df = df[df['IsDeleted'] == False]

# Convert to list of dicts if needed for compatibility
return df.to_dict('records')

# Usage
locations = get_all_locations()
print(f"Found {len(locations)} locations")

Example 2: Incremental Sync

Before (API):

from datetime import datetime, timedelta
import requests

def sync_locations_since_last_run():
# Get last sync time from database
last_sync = get_last_sync_time('locations')

# API call with timestamp filter
response = requests.get(
f"{API_URL}/locations",
params={
"modifiedSince": last_sync.isoformat(),
"skip": 0,
"take": 1000
},
headers={"Authorization": f"Bearer {API_KEY}"}
)

updated_locations = response.json()

# Update database
for location in updated_locations:
upsert_location(location)

# Save new sync timestamp
save_last_sync_time('locations', datetime.utcnow())

return len(updated_locations)

After (Export):

import pandas as pd
from sqlalchemy import create_engine, text

def sync_locations_since_last_run():
engine = create_engine('your-db-connection')

# Get watermark
with engine.connect() as conn:
result = conn.execute(
text("SELECT last_modified FROM watermark WHERE table_name = 'location'")
)
last_sync = result.fetchone()[0]

# Read delta export
df = pd.read_parquet('Delta/YourClient/latest/location.parquet')

# Filter to records modified after watermark
df = df[pd.to_datetime(df['ModifiedOn']) > last_sync]

# Handle deletes
deletes = df[df['IsDeleted'] == True]
if len(deletes) > 0:
delete_ids = [int(v) for v in deletes['LocationId']]
placeholders = ', '.join([f':id_{i}' for i in range(len(delete_ids))])
params = {f'id_{i}': delete_ids[i] for i in range(len(delete_ids))}
with engine.begin() as conn:
conn.execute(text(f"DELETE FROM location WHERE LocationId IN ({placeholders})"), params)

# Upsert active records
active = df[df['IsDeleted'] == False]
if len(active) > 0:
active.to_sql('location', engine, if_exists='append', index=False,
method='multi') # Batch insert

# Update watermark
if len(df) > 0:
new_watermark = df['ModifiedOn'].max()
with engine.begin() as conn:
conn.execute(
text("UPDATE watermark SET last_modified = :ts WHERE table_name = 'location'"),
{"ts": new_watermark}
)

return len(df)

Example 3: Orders with Line Items

Before (API):

def get_order_with_items(order_id):
# Fetch order header
response = requests.get(
f"{API_URL}/orders/{order_id}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
order = response.json()

# Fetch order items (separate API call)
response = requests.get(
f"{API_URL}/orders/{order_id}/items",
headers={"Authorization": f"Bearer {API_KEY}"}
)
items = response.json()

order['items'] = items
return order

# Usage
order = get_order_with_items(12345)
print(f"Order {order['OrderNumber']} has {len(order['items'])} items")

After (Export):

import pandas as pd

def get_order_with_items(order_id):
# Load both files
orders_df = pd.read_parquet('order.parquet')
items_df = pd.read_parquet('orderitem.parquet')

# Get specific order
order = orders_df[orders_df['OrderId'] == order_id].iloc[0]

# Get items for this order
items = items_df[items_df['OrderId'] == order_id]

# Convert to dict and add items
order_dict = order.to_dict()
order_dict['items'] = items.to_dict('records')

return order_dict

# Usage
order = get_order_with_items(12345)
print(f"Order {order['OrderNumber']} has {len(order['items'])} items")

Example 4: Filtering and Aggregation

Before (API):

def get_active_assets_by_location(location_id):
# API doesn't support complex filtering, need to fetch all and filter
skip = 0
take = 1000
filtered_assets = []

while True:
response = requests.get(
f"{API_URL}/assets?skip={skip}&take={take}",
headers={"Authorization": f"Bearer {API_KEY}"}
)
assets = response.json()

if not assets:
break

# Manual filtering
for asset in assets:
if asset['LocationId'] == location_id and asset['Status'] == 'Active':
filtered_assets.append(asset)

skip += take

return filtered_assets

# Slow: Had to fetch ALL assets to filter for one location

After (Export):

import pandas as pd

def get_active_assets_by_location(location_id):
df = pd.read_parquet('asset.parquet')

# Efficient filtering with pandas
filtered = df[
(df['LocationId'] == location_id) &
(df['Status'] == 'Active') &
(df['IsDeleted'] == False)
]

return filtered.to_dict('records')

# Fast: Single file read + efficient filtering

Rollback Procedures

Emergency Rollback Plan

If you encounter critical issues with Export integration:

Step 1: Immediate Rollback

# Flip feature flag to use API
USE_EXPORT_INTEGRATION = False # Set to False

def get_locations():
if USE_EXPORT_INTEGRATION:
return load_from_export()
else:
return load_from_api() # Fallback to API

Step 2: Root Cause Analysis

# Capture logs and metrics
def load_from_export():
try:
start_time = time.time()
df = pd.read_parquet('location.parquet')

# Log success metrics
log_metric('export_load_success', 1)
log_metric('export_load_time', time.time() - start_time)
log_metric('export_record_count', len(df))

return df
except Exception as e:
# Log failure details
log_error('export_load_failed', str(e))
log_metric('export_load_success', 0)

# Fallback to API
return load_from_api()

Step 3: Gradual Re-enable

# Canary deployment: 10% traffic to Export
import random

def get_locations():
use_export = random.random() < 0.10 # 10% on Export

if use_export:
return load_from_export()
else:
return load_from_api()

# Monitor metrics, increase percentage gradually

Common Challenges

Challenge 1: Real-Time Data Needs

Problem: "We need real-time data, exports are too delayed"

Solution:

  • Use Delta exports every 1-6 hours (configurable)
  • For true real-time ( less than 5 min latency), keep API for specific high-frequency endpoints
  • Most reporting doesn't actually need real-time data
# Hybrid approach
def get_location_data(location_id, real_time_required=False):
if real_time_required:
# Use API for real-time critical data
return fetch_from_api(f'/v1/locations/{location_id}')
else:
# Use Export for reporting/analytics
df = pd.read_parquet('location.parquet')
return df[df['LocationId'] == location_id].to_dict('records')[0]

Challenge 2: Complex API Query Parameters

Problem: "API supports advanced filtering we rely on"

Solution: Use pandas for local filtering (usually faster anyway)

# API filtering
response = requests.get(
f"{API_URL}/assets",
params={
"status": "Active",
"assetType": "Cooler",
"state": "CA",
"installedAfter": "2025-01-01"
}
)

# Export equivalent (often faster!)
df = pd.read_parquet('asset.parquet')
filtered = df[
(df['Status'] == 'Active') &
(df['AssetTypeCode'] == 'Cooler') &
(df['State'] == 'CA') &
(df['InstallationDate'] > '2025-01-01') &
(df['IsDeleted'] == False)
]

Challenge 3: Custom API Endpoints

Problem: "We use custom endpoints not in standard exports"

Solution:

  • Request custom export files from CoolR team
  • Or keep API for custom endpoints, use Export for standard data
  • Most custom queries can be built from standard exports with joins
# Custom endpoint: GET /v1/reports/cooler-performance
# Can often be built from standard exports

locations = pd.read_parquet('location.parquet')
assets = pd.read_parquet('asset.parquet')
inventory = pd.read_parquet('inventory.parquet')

# Build custom report
report = assets.merge(locations, on='LocationId') \
.merge(inventory, on='AssetId') \
.groupby(['State', 'AssetTypeCode']) \
.agg({'QuantityOnHand': 'sum'})

Challenge 4: Authentication Differences

Problem: "Different auth mechanism than API"

Solution: Authentication is simpler with exports!

MethodAPIExport
API Key✅ Required❌ Not used
OAuth✅ Complex flow❌ Not needed
Managed Identity❌ Not supported✅ Recommended
SAS Token❌ Not supported✅ Supported
SFTP❌ Not supported✅ Supported

Migration Checklist

Pre-Migration

  • Inventory current API endpoints in use
  • Map API endpoints to Export files (see Endpoint Mapping)
  • Choose migration strategy (Parallel / Phased / Big Bang)
  • Set up Export access (Managed Identity, SAS Token, or SFTP)
  • Test Export download in non-production environment
  • Validate data completeness vs API

During Migration

  • Implement parallel data loading (API + Export)
  • Compare data quality between sources
  • Measure performance improvements
  • Update documentation and runbooks
  • Train team on new integration pattern
  • Monitor error rates and data freshness

Post-Migration

  • Gradually increase Export traffic percentage
  • Monitor dashboard for anomalies
  • Decommission API integration code
  • Remove API credentials and cleanup
  • Document lessons learned
  • Celebrate successful migration! 🎉

Performance Comparison

Real-world migration results:

MetricAPI (Before)Export (After)Improvement
Initial Load Time45 minutes3 minutes15x faster
Delta Sync Time12 minutes30 seconds24x faster
Monthly API Costs$450$0100% savings
Storage Costs$0$25New cost
Total Monthly Cost$450$2594% savings
Data Freshness15 min4 hoursAcceptable trade-off
Code Complexity500 lines100 lines80% reduction
Error Rate2.3%0.1%95% reduction

Support Resources

  • Migration Questions: Contact your CoolR account manager
  • Technical Issues: support@coolrgroup.com
  • Integration Specification: See DATA_INTEGRATION_SPECIFICATION.md
  • Implementation Examples: See INTEGRATION_PROCESS_WALKTHROUGH.md

Ready to start your migration? Begin with the Parallel Run strategy and validate Export data quality before committing to full migration.