Connection Pooling
Reuse SDK clients with a simple pool.
from armoriq_sdk import ArmorIQClient
import threading
class ArmorIQClientPool:
def __init__(self, api_key, user_id, agent_id, pool_size=5):
self.pool = [
ArmorIQClient(api_key=api_key, user_id=user_id, agent_id=agent_id)
for _ in range(pool_size)
]
self.lock = threading.Lock()
self.available = list(self.pool)
def get_client(self):
with self.lock:
if self.available:
return self.available.pop()
# Pool exhausted, create new client
return ArmorIQClient(...)
def return_client(self, client):
with self.lock:
self.available.append(client)
# Usage
pool = ArmorIQClientPool(api_key="...", user_id="...", agent_id="...", pool_size=10)
def process_task(task):
client = pool.get_client()
try:
# Use client
result = client.invoke(...)
return result
finally:
pool.return_client(client)