Code Patterns
The guide pages show you individual API calls. This page shows you the patterns you need for production — iterating through pages of results, retrying failed requests, pulling conversations efficiently, importing contacts in bulk, and keeping OAuth tokens fresh. Pick your language from the tabs on the right.
Paginating Through Results
Every list endpoint returns a page of results. To get all records, loop through pages until number equals totalPages - 1. Pages are 0-indexed.
Set size=200 (the maximum) to minimize the number of requests. The response includes totalElements if you need the total count up front.
import requests
def get_all_contacts(auth):
"""Fetch every contact, page by page."""
all_contacts = []
page = 0
size = 200 # max page size
while True:
resp = requests.get(
"https://a.eztexting.com/v1/contacts",
params={"page": page, "size": size},
auth=auth,
)
resp.raise_for_status()
data = resp.json()
all_contacts.extend(data["content"])
if data["number"] >= data["totalPages"] - 1:
break
page += 1
return all_contacts
contacts = get_all_contacts(("your_username", "your_password"))
print(f"Total contacts: {len(contacts)}")Retry with Exponential Backoff
Network errors and 429 rate limit responses are temporary. Retry with exponential backoff instead of failing immediately. Start at 1 second, double each attempt, cap at 30 seconds.
The Retry-After header on 429 responses tells you exactly how long to wait — check for it before falling back to exponential backoff.
import time
import requests
def api_call_with_retry(method, url, max_retries=5, **kwargs):
"""Make an API call with exponential backoff on failure."""
for attempt in range(max_retries):
try:
resp = requests.request(method, url, **kwargs)
if resp.status_code == 429:
wait = min(2 ** attempt, 30)
print(f"Rate limited. Retrying in {wait}s...")
time.sleep(wait)
continue
resp.raise_for_status()
return resp
except requests.exceptions.ConnectionError:
if attempt == max_retries - 1:
raise
wait = min(2 ** attempt, 30)
print(f"Connection error. Retrying in {wait}s...")
time.sleep(wait)
raise Exception(f"Failed after {max_retries} attempts")
# Usage
resp = api_call_with_retry(
"POST",
"https://a.eztexting.com/v1/messages",
auth=("your_username", "your_password"),
json={"toNumbers": ["15551234567"], "message": "Hello!"},
)Batch Contact Import
Use POST /v1/contacts/batch to import multiple contacts at once. Break large imports into chunks to stay within rate limits and handle partial failures.
Add a 2-second pause between chunks to avoid rate limiting. You can also add contacts to groups in the same call using groupIdsAdd.
import requests
import time
def batch_import_contacts(contacts, auth, group_ids=None, chunk_size=100):
"""Import contacts in batches. Returns results per chunk."""
results = []
for i in range(0, len(contacts), chunk_size):
chunk = contacts[i:i + chunk_size]
body = {"contacts": chunk}
if group_ids:
body["groupIdsAdd"] = group_ids
resp = requests.post(
"https://a.eztexting.com/v1/contacts/batch",
auth=auth,
json=body,
)
results.append({
"chunk": i // chunk_size + 1,
"count": len(chunk),
"status": resp.status_code,
})
# Pause between batches to respect rate limits
if i + chunk_size < len(contacts):
time.sleep(2)
return results
# Usage
contacts = [
{"phoneNumber": "15551234567", "firstName": "Jane", "lastName": "Doe"},
{"phoneNumber": "15559876543", "firstName": "John", "lastName": "Smith"},
# ... hundreds more
]
results = batch_import_contacts(
contacts,
auth=("your_username", "your_password"),
group_ids=["12345"],
)
for r in results:
print(f"Chunk {r['chunk']}: {r['count']} contacts — HTTP {r['status']}")Pulling Conversations Efficiently
Fetching replies one message at a time gets expensive fast. Use the Conversations API or the Messages list endpoint with filters to pull data in bulk instead. Which one fits depends on your use case.
GET /v1/conversations returns threaded views — up to 200 per page. Lazy-load individual threads with GET /v1/conversations/conversation/{userNumber}/{contactNumber} only when needed. Alternatively, GET /v1/messages with date and direction filters lets you pull all inbound or outbound messages in a window without per-thread calls.
import requests
BASE = "https://a.eztexting.com/v1"
def get_conversations(auth, unread_only=False):
"""Page through all conversation threads."""
results = []
page = 0
while True:
params = {"page": page, "size": 200}
if unread_only:
params["filters[unread][eq]"] = "true"
r = requests.get(
f"{BASE}/conversations",
params=params, auth=auth,
)
r.raise_for_status()
data = r.json()
results.extend(data["content"])
if data["number"] >= data["totalPages"] - 1:
break
page += 1
return results
def get_thread(user_num, contact_num, auth):
"""Load one conversation thread on demand."""
r = requests.get(
f"{BASE}/conversations/conversation"
f"/{user_num}/{contact_num}",
params={"page": 0, "size": 50},
auth=auth,
)
r.raise_for_status()
return r.json()
auth = ("your_username", "your_password")
# List all threads (paginated, 200/page)
convos = get_conversations(auth, unread_only=True)
print(f"Threads: {len(convos)}")
# Load a single thread when needed
thread = get_thread("15559876543", "15551234567", auth)
for m in thread["content"]:
print(f" {m['direction']}: {m['message']}")OAuth2 Token Refresh
Access tokens expire. For long-running processes, check the token before each request and refresh it automatically when it's close to expiring.
Keep a 60-second buffer (refresh before actual expiry) to avoid mid-request failures. If the refresh token itself fails, fall back to a full re-authentication.
import requests
import time
class EzTextingClient:
"""Client with automatic OAuth2 token refresh."""
BASE = "https://a.eztexting.com/v1"
def __init__(self, app_key, app_secret):
self.app_key = app_key
self.app_secret = app_secret
self.access_token = None
self.refresh_token = None
self.expires_at = 0
def authenticate(self):
resp = requests.post(f"{self.BASE}/tokens/create", json={
"appKey": self.app_key,
"appSecret": self.app_secret,
})
resp.raise_for_status()
data = resp.json()
self._set_tokens(data)
def _set_tokens(self, data):
self.access_token = data["accessToken"]
self.refresh_token = data["refreshToken"]
self.expires_at = time.time() + data["expiresInSeconds"] - 60
def ensure_token(self):
if self.access_token and time.time() < self.expires_at:
return
if self.refresh_token:
try:
resp = requests.post(f"{self.BASE}/tokens/refresh",
json={"refreshToken": self.refresh_token})
resp.raise_for_status()
self._set_tokens(resp.json())
return
except Exception:
pass
self.authenticate()
def request(self, method, path, **kwargs):
self.ensure_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {self.access_token}"
return requests.request(method, f"{self.BASE}{path}", headers=headers, **kwargs)
# Usage
client = EzTextingClient("your_app_key", "your_app_secret")
resp = client.request("GET", "/contacts?page=0&size=10")
print(resp.json())Tips
- All patterns work with Basic Auth or OAuth2. For production services that run continuously, use the OAuth2 token refresh pattern to avoid storing passwords.
- Combine patterns — use retry logic inside your pagination loop to handle transient failures mid-iteration without losing progress.
- For very large imports (10K+ contacts), consider running the batch import as a background job with progress tracking rather than a synchronous request.
- If your app displays message threads, always use the Conversations API with server-side caching. Fetching replies per-message is the most common cause of rate limit issues — the Conversations API can cut API calls by 99%.