ArmorIQ LogoArmorIQ SDK
Concepts

Token Lifecycle

Understanding intent token lifecycle and management

Token Lifecycle

Intent tokens are cryptographically signed credentials that authorize execution of specific actions. Understanding their lifecycle is crucial for secure agent operation.

Token Phases

Phase 1: Plan Capture

captured = client.capture_plan(
    llm="gpt-4",
    prompt="Fetch and analyze data"
)

What Happens:

  • Plan structure created
  • Plan validated against MCP registry
  • Plan stored with unique ID
  • Canonical representation (CSRG) generated

Phase 2: Token Generation

token = client.get_intent_token(
    plan_capture=captured,
    policy={"allow": ["*"], "deny": []},
    validity_seconds=3600
)

What Happens:

  1. Plan canonicalized to CSRG format
  2. Plan hash computed (SHA-256 of canonical form)
  3. Policy applied and validated
  4. JWT token created with:
    • Plan hash
    • Policy hash
    • User/agent identity
    • Expiration time
    • Signature
  5. Token signed by CSRG-IAP using Ed25519
  6. Token returned to agent

Phase 3: Token Usage

result = client.invoke(
    mcp="data-mcp",
    action="fetch_data",
    intent_token=token,
    params={...}
)

What Happens:

  1. Token sent to ArmorIQ Proxy
  2. Token signature verified
  3. Token expiration checked
  4. Plan hash extracted and verified
  5. Action checked against plan
  6. Policy constraints validated
  7. If all checks pass: action forwarded to MCP
  8. If any check fails: request rejected

Phase 4: Token Expiration

Tokens expire based on validity_seconds parameter. After expiration:

  • Token becomes invalid
  • All invocations using token will fail
  • New token must be requested

Token Structure (JWT)

{
  "alg": "EdDSA",
  "typ": "JWT"
}

Payload

{
  "plan_hash": "sha256:abc123...",
  "policy_hash": "sha256:def456...",
  "user_id": "user_123",
  "agent_id": "agent_xyz",
  "org_id": "org_001",
  "iat": 1234567800,
  "exp": 1234571400,
  "iss": "armoriq-csrg-iap",
  "jti": "token_unique_id"
}

Signature

EdDSA signature using CSRG-IAP's private key

Token Properties

Immutability

Once generated, tokens cannot be modified. Any change invalidates the signature.

# ✗ Bad: Don't try to modify token
token.token = token.token + "extra"  # Signature will fail

# ✓ Good: Generate new token if needed
new_token = client.get_intent_token(captured, validity_seconds=7200)

Non-Transferability

Tokens are bound to specific user/agent IDs and cannot be used by others.

# Token bound to this user/agent
token = client.get_intent_token(captured)

# Another agent cannot use this token
other_client = ArmorIQClient(user_id="other_user", agent_id="other_agent")
other_client.invoke(..., intent_token=token)  # ✗ Fails: user/agent mismatch

Time-Limited

Tokens have explicit expiration times for security.

# Short-lived token (60 seconds)
token_short = client.get_intent_token(captured, validity_seconds=60)

# Long-lived token (1 hour)
token_long = client.get_intent_token(captured, validity_seconds=3600)

# Check expiration
print(f"Expires at: {token_short.expires_at}")

Token Management Best Practices

1. Use Appropriate Validity Periods

# ✓ Good: Match validity to use case
token_quick = client.get_intent_token(captured, validity_seconds=300)   # 5 min for quick tasks
token_batch = client.get_intent_token(captured, validity_seconds=3600)  # 1 hour for batch jobs
token_interactive = client.get_intent_token(captured, validity_seconds=1800)  # 30 min for user sessions

# ✗ Bad: Overly long validity
token_long = client.get_intent_token(captured, validity_seconds=86400)  # 24 hours - too long!

2. Handle Token Expiration Gracefully

from armoriq_sdk.exceptions import TokenExpiredError

def invoke_with_refresh(client, mcp, action, token, captured, params):
    """Invoke with automatic token refresh on expiration."""
    try:
        return client.invoke(mcp, action, token, params)
    except TokenExpiredError:
        # Token expired, get new one
        new_token = client.get_intent_token(captured)
        return client.invoke(mcp, action, new_token, params)

3. Cache Tokens for Repeated Use

class TokenManager:
    def __init__(self, client):
        self.client = client
        self.token = None
        self.captured = None
    
    def ensure_token(self, prompt, validity_seconds=3600):
        """Get or refresh token as needed."""
        if self.token and self.token.expires_at > time.time() + 60:
            return self.token
        
        # Need new token
        self.captured = self.client.capture_plan(llm="gpt-4", prompt=prompt)
        self.token = self.client.get_intent_token(
            self.captured,
            validity_seconds=validity_seconds
        )
        return self.token

# Usage
manager = TokenManager(client)
token = manager.ensure_token("Fetch and analyze data")
result = client.invoke("data-mcp", "fetch_data", token, {...})

4. Revoke Tokens When Done

# Not directly supported yet, but use short validity as mitigation
token = client.get_intent_token(captured, validity_seconds=300)  # 5 min only

# For long-running tasks, periodically refresh
for i in range(100):
    if i % 10 == 0:
        # Refresh token every 10 iterations
        token = client.get_intent_token(captured, validity_seconds=300)
    
    result = client.invoke("data-mcp", "process", token, {"batch": i})

Token Verification Process

When you invoke an action, the proxy verifies:

Step 1: Signature Verification

1. Extract JWT header, payload, signature
2. Reconstruct signing input: base64(header) + "." + base64(payload)
3. Verify signature using CSRG-IAP public key
4. If signature invalid → REJECT

Step 2: Expiration Check

1. Extract "exp" from payload
2. Check if current_time < exp
3. If expired → REJECT

Step 3: Identity Verification

1. Extract user_id, agent_id from payload
2. Compare with request's user_id, agent_id
3. If mismatch → REJECT

Step 4: Plan Verification

1. Extract plan_hash from payload
2. Check if requested action is in plan
3. If action not in plan → REJECT

Step 5: Policy Verification

1. Extract policy_hash from payload
2. Apply policy rules to requested action
3. If action violates policy → REJECT

Step 6: Rate Limit Check

1. Check invocation count for this user/agent
2. If rate limit exceeded → REJECT

If all checks pass → ALLOW and forward to MCP

Token Security Properties

Cryptographic Binding

Tokens are cryptographically bound to:

  • Plan: Cannot execute actions outside plan
  • User/Agent: Cannot be used by different identity
  • Policy: Cannot bypass policy constraints
  • Time: Cannot be used after expiration

Non-Repudiation

Every invocation creates an audit log with:

  • Token ID
  • User/Agent ID
  • Action executed
  • Timestamp
  • Result

This provides complete auditability.

Defense in Depth

Even if an attacker obtains a token:

  • Cannot modify it (signature verification)
  • Cannot reuse it as different user (identity binding)
  • Cannot execute unplanned actions (plan hash verification)
  • Cannot use after expiration (time-limited)

Common Token Issues

Issue: Token Expired

Symptom: TokenExpiredError when invoking

Solution:

# Refresh token
new_token = client.get_intent_token(captured, validity_seconds=3600)
result = client.invoke(mcp, action, new_token, params)

Issue: Action Not in Plan

Symptom: IntentVerificationError: Action not in plan

Solution:

# Capture new plan that includes the action
captured = client.capture_plan(
    llm="gpt-4",
    prompt="Fetch data and also do analysis"  # Include both actions
)
token = client.get_intent_token(captured)

Issue: Token Signature Invalid

Symptom: InvalidTokenError: Signature verification failed

Solution:

  • Don't modify token after generation
  • Ensure token was generated by legitimate CSRG-IAP
  • Check network isn't corrupting token

Issue: Identity Mismatch

Symptom: AuthenticationError: Token user_id/agent_id mismatch

Solution:

# Use same client that generated token
# ✓ Good
token = client.get_intent_token(captured)
result = client.invoke(mcp, action, token, params)

# ✗ Bad: Different client
other_client = ArmorIQClient(user_id="different_user", ...)
result = other_client.invoke(mcp, action, token, params)  # Fails

Next Steps

On this page