ArmorIQ SDK

Connection Pooling

Reuse SDK clients with a simple pool.

from armoriq_sdk import ArmorIQClient
import threading

class ArmorIQClientPool:
    def __init__(self, api_key, user_id, agent_id, pool_size=5):
        self.pool = [
            ArmorIQClient(api_key=api_key, user_id=user_id, agent_id=agent_id)
            for _ in range(pool_size)
        ]
        self.lock = threading.Lock()
        self.available = list(self.pool)

    def get_client(self):
        with self.lock:
            if self.available:
                return self.available.pop()
            # Pool exhausted, create new client
            return ArmorIQClient(...)

    def return_client(self, client):
        with self.lock:
            self.available.append(client)

# Usage
pool = ArmorIQClientPool(api_key="...", user_id="...", agent_id="...", pool_size=10)

def process_task(task):
    client = pool.get_client()
    try:
        # Use client
        result = client.invoke(...)
        return result
    finally:
        pool.return_client(client)