Detailed API documentation for all MCP authentication tools with schemas, examples, and error handling.
The @warpy-auth-sdk/core library provides three core MCP tools for AI agent authentication:
All tools are implemented with Zod schema validation, comprehensive error handling, and support for both direct invocation and Vercel AI SDK integration.
Creating MCP tools for direct use
import { createMCPTools } from '@warpy-auth-sdk/core';
// Self-hosted mode (local JWT verification)
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!,
// Optional: Add database adapter for persistent revocation
// adapter: PrismaAdapter(prisma)
});
// Available tools:
// - mcpTools.agent_login
// - mcpTools.get_session
// - mcpTools.revoke_token
// Each tool has:
// - description: string
// - parameters: ZodObject (schema validation)
// - execute: async functionUsing Warpy Cloud for managed security
import { createMCPShield } from '@warpy-auth-sdk/core';
// Cloud-enhanced mode with automatic fallback
const mcpTools = createMCPShield({
secret: process.env.AUTH_SECRET!,
warpy: {
apiKey: process.env.WARPY_API_KEY // Enables cloud features
},
metrics: { enabled: true }
});
// Same tools, enhanced with cloud security and analyticsCreates a short-lived JWT token for AI agent authentication with scoped permissions. This is the primary entry point for delegating user access to agents.
Input validation schema
import { z } from 'zod';
const agentLoginSchema = z.object({
userId: z.string()
.describe('User ID to impersonate'),
scopes: z.array(z.string())
.describe('Scopes for agent access (e.g., ["debug", "read"])'),
agentId: z.string()
.describe('Unique identifier for the agent'),
expiresIn: z.string()
.optional()
.default('15m')
.describe('Token expiration (e.g., "15m", "1h", "30s")')
});
type AgentLoginInput = z.infer<typeof agentLoginSchema>;| Parameter | Type | Required | Description |
|---|---|---|---|
userId | string | Yes | ID of the user the agent will act on behalf of |
scopes | string[] | Yes | Array of permission scopes (e.g., ["read:profile", "write:comments"]) |
agentId | string | Yes | Unique identifier for the agent (e.g., "claude-assistant", "debug-bot") |
expiresIn | string | No | Token lifetime using time notation ("15m", "1h", "30s", "7d"). Default: "15m" |
Successful agent login
{
success: true,
token: "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySWQiOiJ1c2VyLTEyMyIsImVtYWlsIjoidXNlckBleGFtcGxlLmNvbSIsInNjb3BlcyI6WyJkZWJ1ZyIsInJlYWQiXSwiYWdlbnRJZCI6ImNsYXVkZS1hc3Npc3RhbnQiLCJ0eXBlIjoibWNwLWFnZW50IiwiaWF0IjoxNjk5MjgyNDk2LCJleHAiOjE2OTkyODMzOTZ9.signature",
expires: "2024-11-06T12:49:56.000Z",
message: "Agent claude-assistant logged in as user user-123 with scopes: debug, read"
}Failed agent login
{
success: false,
error: "Invalid user ID or insufficient permissions"
}The returned JWT token contains the following claims in its payload:
Token claims structure
{
// Standard JWT claims
iat: 1699282496, // Issued at (Unix timestamp)
exp: 1699283396, // Expires at (Unix timestamp)
// User identification
userId: "user-123",
email: "user@example.com", // Optional, if available
name: "John Doe", // Optional, if available
// Agent-specific claims
type: "mcp-agent", // Token type (always "mcp-agent")
agentId: "claude-assistant", // Agent identifier
scopes: ["debug", "read"], // Granted permission scopes
// Standard JWT signature
// Verified using AUTH_SECRET
}Scopes should follow a hierarchical naming convention for clarity and flexibility:
action:resource or action:resource:subresourceread:profile - Read user profile datawrite:comments - Create/edit commentsdelete:comments:own - Delete only own commentsadmin:users - Full administrative access to usersread, write, delete, adminread:* (read all resources), *:profile (all actions on profile)admin:* unless absolutely required. Consider implementing time-based scopes that automatically expire after task completion.Calling agent_login directly
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
// Execute the tool
const result = await mcpTools.agent_login.execute({
userId: 'user-123',
scopes: ['read:profile', 'read:orders'],
agentId: 'shopping-assistant',
expiresIn: '30m'
});
if (result.success) {
console.log('Token:', result.token);
console.log('Expires:', result.expires);
// Use token for authenticated requests
const response = await fetch('/api/user/profile', {
headers: {
'Authorization': `Bearer ${result.token}`
}
});
} else {
console.error('Login failed:', result.error);
}Exposing agent_login via API
// app/api/mcp/route.ts
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
export async function POST(request: Request) {
const { tool, args } = await request.json();
if (tool === 'agent_login') {
const result = await mcpTools.agent_login.execute(args);
return Response.json(result);
}
return Response.json({ error: 'Unknown tool' }, { status: 400 });
}
// Client usage:
const response = await fetch('/api/mcp', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
tool: 'agent_login',
args: {
userId: 'user-123',
scopes: ['debug', 'read'],
agentId: 'claude-assistant',
expiresIn: '15m'
}
})
});
const { success, token } = await response.json();Using with generateText()
import { generateText } from 'ai';
import { openai } from '@ai-sdk/openai';
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
const { text, toolCalls } = await generateText({
model: openai('gpt-4-turbo'),
tools: mcpTools,
prompt: 'Login as user-123 with read and debug scopes',
maxToolRoundtrips: 3
});
// Agent automatically calls agent_login tool
// Token is available in toolCalls[0].result.tokenCommon error scenarios and how to handle them:
| Error | Cause | Resolution |
|---|---|---|
| Invalid user ID | User doesn't exist or ID format is wrong | Validate user exists before calling agent_login |
| Invalid scopes | Scopes array is empty or contains invalid values | Ensure at least one valid scope is provided |
| Invalid expiresIn format | Unrecognized time format (e.g., "15 minutes") | Use valid formats: "15m", "1h", "30s", "7d" |
| Token generation failed | Missing or invalid AUTH_SECRET | Ensure AUTH_SECRET is set and at least 32 characters |
Verifies a JWT token and retrieves the associated session information including user details, scopes, and agent identifier. Use this to validate tokens before performing privileged operations.
Input validation schema
import { z } from 'zod';
const getSessionSchema = z.object({
token: z.string()
.describe('JWT token to verify')
});
type GetSessionInput = z.infer<typeof getSessionSchema>;| Parameter | Type | Required | Description |
|---|---|---|---|
token | string | Yes | JWT token to verify (typically from Authorization header) |
Valid token
{
success: true,
session: {
userId: "user-123",
email: "user@example.com",
name: "John Doe",
type: "mcp-agent",
scopes: ["debug", "read"],
agentId: "claude-assistant"
}
}Invalid or expired token
{
success: false,
error: "Invalid or expired token"
}
// Or if token was revoked:
{
success: false,
error: "Token has been revoked"
}| Field | Type | Description |
|---|---|---|
userId | string | ID of the user the agent is acting as |
email | string? | User's email address (if available) |
name | string? | User's display name (if available) |
type | string | Token type (always "mcp-agent" for agent tokens) |
scopes | string[] | Array of granted permission scopes |
agentId | string | Identifier of the agent that owns this token |
Protecting an API endpoint
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
export async function GET(request: Request) {
// Extract token from Authorization header
const authHeader = request.headers.get('Authorization');
if (!authHeader?.startsWith('Bearer ')) {
return Response.json({ error: 'Unauthorized' }, { status: 401 });
}
const token = authHeader.substring(7);
// Verify token
const verification = await mcpTools.get_session.execute({ token });
if (!verification.success) {
return Response.json(
{ error: 'Invalid token' },
{ status: 401 }
);
}
// Check if agent has required scope
const { session } = verification;
if (!session.scopes.includes('read:profile')) {
return Response.json(
{ error: 'Insufficient permissions' },
{ status: 403 }
);
}
// Proceed with operation
const userData = await getUserData(session.userId);
return Response.json(userData);
}Reusable authorization helper
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
export async function requireScopes(
request: Request,
requiredScopes: string[]
): Promise<{ userId: string; agentId: string } | Response> {
const authHeader = request.headers.get('Authorization');
if (!authHeader?.startsWith('Bearer ')) {
return Response.json({ error: 'Unauthorized' }, { status: 401 });
}
const token = authHeader.substring(7);
const verification = await mcpTools.get_session.execute({ token });
if (!verification.success || !verification.session) {
return Response.json({ error: 'Invalid token' }, { status: 401 });
}
const { session } = verification;
const hasAllScopes = requiredScopes.every(scope =>
session.scopes.includes(scope)
);
if (!hasAllScopes) {
return Response.json(
{
error: 'Insufficient permissions',
required: requiredScopes,
granted: session.scopes
},
{ status: 403 }
);
}
return {
userId: session.userId,
agentId: session.agentId
};
}
// Usage in route handlers:
export async function POST(request: Request) {
const auth = await requireScopes(request, ['write:comments']);
if (auth instanceof Response) return auth; // Error response
const { userId, agentId } = auth;
// Perform authorized operation
console.log(`Agent ${agentId} creating comment for user ${userId}`);
// ...
}Periodic validation
// Check if a long-lived token is still valid
async function isTokenStillValid(token: string): Promise<boolean> {
const result = await mcpTools.get_session.execute({ token });
return result.success;
}
// Usage in long-running agent tasks
const token = await authenticateAgent();
setInterval(async () => {
const valid = await isTokenStillValid(token);
if (!valid) {
console.log('Token expired or revoked, re-authenticating...');
token = await authenticateAgent();
}
}, 5 * 60 * 1000); // Check every 5 minutesThe get_session tool automatically checks if a token has been revoked before returning session information. Revocation lists are maintained:
Immediately invalidates an agent token, preventing further use. Once revoked, any subsequentget_session calls with the token will fail. Use this to terminate agent sessions when tasks complete or security incidents occur.
Input validation schema
import { z } from 'zod';
const revokeTokenSchema = z.object({
token: z.string()
.describe('JWT token to revoke')
});
type RevokeTokenInput = z.infer<typeof revokeTokenSchema>;| Parameter | Type | Required | Description |
|---|---|---|---|
token | string | Yes | JWT token to revoke (must be valid format, even if already expired) |
Token revoked successfully
{
success: true,
message: "Token for agent claude-assistant has been revoked"
}Invalid token or revocation failed
{
success: false,
error: "Invalid token"
}
// Or if token format is malformed:
{
success: false,
error: "Failed to revoke token"
}Clean up agent tokens
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
async function performAgentTask(userId: string) {
// Create agent token
const loginResult = await mcpTools.agent_login.execute({
userId,
scopes: ['read:data', 'write:analysis'],
agentId: 'data-analyst-agent',
expiresIn: '1h'
});
if (!loginResult.success) {
throw new Error('Failed to create agent token');
}
const { token } = loginResult;
try {
// Perform task with token
await analyzeUserData(token);
await generateReport(token);
console.log('Task completed successfully');
} finally {
// Always revoke token after task, even if errors occurred
await mcpTools.revoke_token.execute({ token });
console.log('Agent token revoked');
}
}Admin API to revoke tokens
// app/api/admin/revoke-agent-token/route.ts
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
export async function POST(request: Request) {
// Verify admin authentication (your auth logic)
const admin = await verifyAdminAuth(request);
if (!admin) {
return Response.json({ error: 'Unauthorized' }, { status: 401 });
}
const { token } = await request.json();
// Get session info before revoking (for audit log)
const sessionResult = await mcpTools.get_session.execute({ token });
// Revoke the token
const revokeResult = await mcpTools.revoke_token.execute({ token });
if (revokeResult.success) {
// Log the revocation for audit trail
await logAuditEvent({
action: 'agent_token_revoked',
admin: admin.id,
agentId: sessionResult.session?.agentId,
userId: sessionResult.session?.userId,
reason: 'Admin revocation'
});
}
return Response.json(revokeResult);
}Revoke expired tokens periodically
// Background job to clean up expired tokens
import { createMCPTools } from '@warpy-auth-sdk/core';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
async function cleanupExpiredTokens() {
// Get all active agent tokens from your database
const activeTokens = await db.agentToken.findMany({
where: { revoked: false }
});
for (const tokenRecord of activeTokens) {
// Check if token is still valid
const verification = await mcpTools.get_session.execute({
token: tokenRecord.token
});
if (!verification.success) {
// Token expired, revoke and mark in database
await mcpTools.revoke_token.execute({ token: tokenRecord.token });
await db.agentToken.update({
where: { id: tokenRecord.id },
data: { revoked: true, revokedAt: new Date() }
});
console.log(`Cleaned up expired token for agent ${tokenRecord.agentId}`);
}
}
}
// Run every hour
setInterval(cleanupExpiredTokens, 60 * 60 * 1000);How revocation is handled depends on your setup:
| Mode | Storage | Persistence | Scalability |
|---|---|---|---|
| Self-hosted (no adapter) | In-memory Set | Process lifetime only | Single instance |
| With database adapter | Database table | Permanent | Horizontal scaling |
| Cloud Shield | Distributed cache | Cloud-managed | Global, multi-region |
Create, use once, and immediately revoke
async function performOneTimeTask(userId: string, taskData: any) {
// Create short-lived token
const { token } = await mcpTools.agent_login.execute({
userId,
scopes: ['write:task_result'],
agentId: 'one-time-task-agent',
expiresIn: '5m'
});
// Perform task
const result = await executeTask(token, taskData);
// Immediately revoke (don't wait for expiration)
await mcpTools.revoke_token.execute({ token });
return result;
}Start with minimal scopes, escalate as needed
async function interactiveAgentSession(userId: string) {
// Start with read-only access
let { token } = await mcpTools.agent_login.execute({
userId,
scopes: ['read:data'],
agentId: 'interactive-agent',
expiresIn: '15m'
});
// Agent performs read operations
await analyzeData(token);
// User approves write operation, get new token with escalated scope
const needsWrite = await askUserPermission('Agent wants to save results');
if (needsWrite) {
// Revoke read-only token
await mcpTools.revoke_token.execute({ token });
// Create new token with write permission
const newTokenResult = await mcpTools.agent_login.execute({
userId,
scopes: ['read:data', 'write:results'],
agentId: 'interactive-agent',
expiresIn: '15m'
});
token = newTokenResult.token;
await saveResults(token);
}
// Clean up
await mcpTools.revoke_token.execute({ token });
}Multiple agents sharing scoped access
async function coordinatedWorkflow(userId: string) {
// Agent 1: Data collector (read-only)
const collectorToken = await mcpTools.agent_login.execute({
userId,
scopes: ['read:raw_data'],
agentId: 'data-collector',
expiresIn: '30m'
});
const rawData = await collectData(collectorToken.token);
// Agent 2: Data processor (read + write intermediate)
const processorToken = await mcpTools.agent_login.execute({
userId,
scopes: ['read:raw_data', 'write:processed_data'],
agentId: 'data-processor',
expiresIn: '30m'
});
const processedData = await processData(processorToken.token, rawData);
// Agent 3: Report generator (read processed + write final)
const reporterToken = await mcpTools.agent_login.execute({
userId,
scopes: ['read:processed_data', 'write:reports'],
agentId: 'report-generator',
expiresIn: '30m'
});
const report = await generateReport(reporterToken.token, processedData);
// Clean up all tokens
await Promise.all([
mcpTools.revoke_token.execute({ token: collectorToken.token }),
mcpTools.revoke_token.execute({ token: processorToken.token }),
mcpTools.revoke_token.execute({ token: reporterToken.token })
]);
return report;
}admin:* includes all admin scopes)Protecting Express routes
import express from 'express';
import { createMCPTools } from '@warpy-auth-sdk/core';
const app = express();
const mcpTools = createMCPTools({ secret: process.env.AUTH_SECRET! });
// Middleware to verify agent tokens
const requireAgent = async (req, res, next) => {
const authHeader = req.headers.authorization;
if (!authHeader?.startsWith('Bearer ')) {
return res.status(401).json({ error: 'Unauthorized' });
}
const token = authHeader.substring(7);
const verification = await mcpTools.get_session.execute({ token });
if (!verification.success) {
return res.status(401).json({ error: 'Invalid token' });
}
req.agentSession = verification.session;
next();
};
// Protected route
app.get('/api/agent/data', requireAgent, async (req, res) => {
const { userId, agentId, scopes } = req.agentSession;
console.log(`Agent ${agentId} accessing data for user ${userId}`);
if (!scopes.includes('read:data')) {
return res.status(403).json({ error: 'Insufficient permissions' });
}
const data = await fetchUserData(userId);
res.json(data);
});Using MCP tools in Next.js
// app/api/agent/profile/route.ts
import { createMCPTools } from '@warpy-auth-sdk/core';
import { NextRequest, NextResponse } from 'next/server';
const mcpTools = createMCPTools({
secret: process.env.AUTH_SECRET!
});
export async function GET(request: NextRequest) {
// Extract and verify token
const authHeader = request.headers.get('authorization');
if (!authHeader?.startsWith('Bearer ')) {
return NextResponse.json(
{ error: 'Unauthorized' },
{ status: 401 }
);
}
const token = authHeader.substring(7);
const verification = await mcpTools.get_session.execute({ token });
if (!verification.success || !verification.session) {
return NextResponse.json(
{ error: 'Invalid token' },
{ status: 401 }
);
}
const { userId, scopes } = verification.session;
// Check required scope
if (!scopes.includes('read:profile')) {
return NextResponse.json(
{ error: 'Insufficient permissions' },
{ status: 403 }
);
}
// Fetch user profile
const profile = await db.user.findUnique({
where: { id: userId },
select: { id: true, email: true, name: true, avatar: true }
});
return NextResponse.json(profile);
}