Graph RAG Code Library
⚙️ Minimum Requirements
- Node.js 22.x: With
@aws-sdk/client-bedrock-runtime,@aws-sdk/client-bedrock-agent-runtime,@aws-sdk/lib-dynamodb,ioredispackages - Python 3.11+: With
boto3,redis,psycopg2-binary,pytest,aws-requests-authpackages - AWS Credentials: IAM role or profile with
bedrock:InvokeModel,bedrock-agent-runtime:RetrieveAndGenerate,neptune-db:ReadDataViaQuery,dynamodb:GetItem - Environment Variables:
NEPTUNE_ENDPOINT,REDIS_URL,KB_ID=BXJGG7PIPS,AWS_REGION=us-east-1configured
1. Agentic Orchestrator (TypeScript)
import { BedrockRuntimeClient, InvokeModelCommand } from '@aws-sdk/client-bedrock-runtime';
import { BedrockAgentRuntimeClient, RetrieveAndGenerateCommand } from '@aws-sdk/client-bedrock-agent-runtime';
import { DynamoDBDocumentClient, GetCommand, QueryCommand } from '@aws-sdk/lib-dynamodb';
import * as crypto from 'crypto';
interface AgentQuery {
query: string;
context: { tenantId: string; userId: string; incidentId?: string; sessionId?: string; };
}
interface AgentResponse {
answer: string; sources: string[]; intent: string;
fromCache: boolean; latencyMs: number; modelUsed: string;
}
export async function agenticOrchestrator(input: AgentQuery): Promise {
const startTime = Date.now();
const cacheKey = crypto.createHash('sha256')
.update(input.query.toLowerCase().trim()).digest('hex');
// Step 1: Check semantic cache
const cached = await redisClient.get(`sf:cache:${cacheKey}`);
if (cached) {
return { ...JSON.parse(cached), fromCache: true, latencyMs: Date.now() - startTime };
}
// Step 2: Classify intent using Claude Haiku (fast + cheap)
const { intent, entities } = await classifyIntent(input.query, input.context.tenantId);
// Step 3: Parallel retrieval from all relevant sources
const [graphResult, kbResult, exemplarResult, memoryResult] = await Promise.allSettled([
['cmdb', 'topology', 'blast_radius', 'dependency'].includes(intent)
? queryNeptuneGraph(entities, intent) : Promise.resolve(null),
queryBedrockKB(input.query, intent),
retrieveExemplars(input.query, intent, input.context.tenantId),
lookupProceduralMemory(intent, entities, input.context.tenantId),
]);
// Step 4: Assemble context window (max 8000 tokens)
const contextWindow = assembleContext({
graph: graphResult.status === 'fulfilled' ? graphResult.value : null,
kb: kbResult.status === 'fulfilled' ? kbResult.value : null,
exemplars: exemplarResult.status === 'fulfilled' ? exemplarResult.value : null,
memory: memoryResult.status === 'fulfilled' ? memoryResult.value : null,
session: await getSessionHistory(input.context.sessionId),
});
// Step 5: Select model via router
const { modelId, maxTokens, temperature } = await getModelRouterDecision(intent, input.context.tenantId);
// Step 6: Generate response via Bedrock
const response = await generateResponse(input.query, contextWindow, intent, modelId, maxTokens, temperature);
// Step 7: Cache and log (non-blocking)
const result: AgentResponse = {
answer: response.text,
sources: response.sources,
intent,
fromCache: false,
latencyMs: Date.now() - startTime,
modelUsed: modelId,
};
void redisClient.setex(`sf:cache:${cacheKey}`, 3600, JSON.stringify(result));
void logToAuditTable({ ...input, ...result, cacheKey });
// Step 8: Update session history
if (input.context.sessionId) {
void updateSessionHistory(input.context.sessionId, input.query, response.text);
}
return result;
}
async function classifyIntent(query: string, tenantId: string): Promise<{ intent: string; entities: string[] }> {
const bedrockRuntime = new BedrockRuntimeClient({ region: 'us-east-1' });
const template = await getPromptTemplate('intent-classify-v1', tenantId);
const prompt = template.replace('{{query}}', query);
const cmd = new InvokeModelCommand({
modelId: 'anthropic.claude-3-haiku-20240307-v1:0',
body: JSON.stringify({
anthropic_version: 'bedrock-2023-05-31',
max_tokens: 256,
temperature: 0,
messages: [{ role: 'user', content: prompt }],
}),
contentType: 'application/json',
accept: 'application/json',
});
const res = await bedrockRuntime.send(cmd);
const parsed = JSON.parse(new TextDecoder().decode(res.body));
return JSON.parse(parsed.content[0].text);
}
2. Neptune Query Helper (TypeScript)
import { SignatureV4 } from '@aws-sdk/signature-v4';
import { Sha256 } from '@aws-crypto/sha256-js';
import { defaultProvider } from '@aws-sdk/credential-provider-node';
const NEPTUNE_HOST = process.env.NEPTUNE_ENDPOINT!;
export async function queryNeptuneGraph(entities: string[], intent: string): Promise {
const query = buildGremlinQuery(entities, intent);
if (!query) return null;
const signer = new SignatureV4({
credentials: defaultProvider(),
region: 'us-east-1', service: 'neptune-db', sha256: Sha256,
});
const body = JSON.stringify({ gremlin: query });
const signedReq = await signer.sign({
method: 'POST',
hostname: `${NEPTUNE_HOST}:8182`,
path: '/gremlin', protocol: 'https:',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(body).toString(),
Host: `${NEPTUNE_HOST}:8182`,
},
body,
});
const response = await fetch(`https://${NEPTUNE_HOST}:8182/gremlin`, {
method: 'POST',
headers: signedReq.headers as Record,
body,
signal: AbortSignal.timeout(15000),
});
if (!response.ok) {
console.error(`Neptune error: ${response.status} -- ${await response.text()}`);
return null;
}
const data = await response.json();
const vertices = data?.result?.data?.['@value'] ?? [];
return {
topology: vertices.map(formatVertex),
entityCount: vertices.length,
queryUsed: query,
};
}
function buildGremlinQuery(entities: string[], intent: string): string | null {
if (!entities.length) return null;
const ciName = entities[0];
switch (intent) {
case 'blast_radius':
return `g.V().has('CI','name','${ciName}').repeat(out('DEPENDS_ON')).times(3).dedup().project('id','name','type','criticality').by('ciId').by('name').by('type').by('criticality')`;
case 'topology':
return `g.V().has('Service','name','${ciName}').in('RUNS_ON').project('name','type','status').by('name').by('type').by('status').limit(50)`;
case 'dependency':
return `g.V().has('CI','name','${ciName}').out('DEPENDS_ON').values('name','type')`;
default:
return null;
}
}
function formatVertex(v: any): Record {
const props = v?.['@value']?.[1]?.['@value'] ?? [];
const result: Record = {};
for (let i = 0; i < props.length; i += 2) {
const key = props[i];
const val = props[i+1]?.['@value']?.[0]?.['@value']?.value ?? '';
result[key] = val;
}
return result;
}
3. Bedrock KB Retrieve (Python)
import boto3
import time
from typing import Optional
bedrock_agent = boto3.client('bedrock-agent-runtime', region_name='us-east-1')
def query_bedrock_kb(
query: str,
intent: str = 'general',
num_results: int = 5,
session_id: Optional[str] = None,
max_retries: int = 3
) -> dict:
"""Production-ready Bedrock KB retrieval with retry logic and timeout handling."""
model_map = {
'incident_triage': 'anthropic.claude-3-haiku-20240307-v1:0',
'kb_search': 'anthropic.claude-3-5-sonnet-20241022-v2:0',
'policy_check': 'anthropic.claude-3-5-sonnet-20241022-v2:0',
'default': 'anthropic.claude-3-sonnet-20240229-v1:0',
}
model_arn = f"arn:aws:bedrock:us-east-1::foundation-model/{model_map.get(intent, model_map['default'])}"
config = {
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': 'BXJGG7PIPS',
'modelArn': model_arn,
'retrievalConfiguration': {
'vectorSearchConfiguration': {
'numberOfResults': num_results,
'overrideSearchType': 'HYBRID',
}
},
'generationConfiguration': {
'promptTemplate': {
'textPromptTemplate': (
'You are a StackFlow ITSM expert. Answer based on the provided context only. '
'Reference specific resource names and CLI commands when available.
'
'Context:
$search_results$
Question: $query$
Answer:'
)
},
'inferenceConfig': {
'textInferenceConfig': {'maxTokens': 1024, 'temperature': 0.1, 'topP': 0.9}
},
},
}
}
if session_id:
config['knowledgeBaseConfiguration']['sessionId'] = session_id
for attempt in range(max_retries):
try:
response = bedrock_agent.retrieve_and_generate(
input={'text': query},
retrieveAndGenerateConfiguration=config
)
return {
'answer': response['output']['text'],
'citations': [
{
'text': ref['content']['text'][:300],
'uri': ref['location'].get('s3Location', {}).get('uri', ''),
'score': ref.get('score', 0),
}
for citation in response.get('citations', [])
for ref in citation.get('retrievedReferences', [])
],
'session_id': response.get('sessionId'),
}
except bedrock_agent.exceptions.ThrottlingException:
if attempt < max_retries - 1:
wait = (2 ** attempt) + 0.5
print(f'Bedrock throttled, retry {attempt+1} in {wait}s')
time.sleep(wait)
else:
raise
except Exception as e:
print(f'Bedrock KB error: {e}')
raise
4. Pattern Cluster Matching (Python)
import boto3
import json
import hashlib
from decimal import Decimal
ddb = boto3.resource('dynamodb', region_name='us-east-1')
pattern_table = ddb.Table('StackFlow_PatternCluster')
def match_pattern_cluster(incident_text: str, tenant_id: str) -> dict | None:
"""Find the best matching pattern cluster for an incident description."""
# Generate embedding for the incident text
bedrock = boto3.client('bedrock-runtime', region_name='us-east-1')
embed_response = bedrock.invoke_model(
modelId='amazon.titan-embed-text-v2:0',
body=json.dumps({'inputText': incident_text, 'dimensions': 1024}),
contentType='application/json', accept='application/json'
)
query_embedding = json.loads(embed_response['body'].read())['embedding']
# Query all clusters for the tenant (small dataset -- fetch all and score in memory)
response = pattern_table.query(
IndexName='tenantId-index',
KeyConditionExpression='tenantId = :t',
ExpressionAttributeValues={':t': tenant_id},
FilterExpression='#s = :active',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':t': tenant_id, ':active': 'active'}
)
clusters = response.get('Items', [])
if not clusters:
return None
# Cosine similarity against cluster centroids
best_cluster = None
best_score = -1.0
for cluster in clusters:
centroid = [float(x) for x in cluster.get('centroidEmbedding', [])]
if len(centroid) != 1024:
continue
score = cosine_similarity(query_embedding, centroid)
if score > best_score:
best_score = score
best_cluster = cluster
if best_score < 0.65: # Similarity threshold
return None
return {
'clusterId': best_cluster['clusterId'],
'category': best_cluster['category'],
'suggestedPriority': best_cluster.get('suggestedPriority', 'P3'),
'assignmentGroup': best_cluster.get('defaultAssignmentGroup'),
'confidence': round(best_score, 3),
'sampleSize': int(best_cluster.get('sampleCount', 0)),
}
def cosine_similarity(a: list, b: list) -> float:
dot = sum(x * y for x, y in zip(a, b))
norm_a = sum(x**2 for x in a) ** 0.5
norm_b = sum(x**2 for x in b) ** 0.5
return dot / (norm_a * norm_b) if norm_a * norm_b > 0 else 0.0
5. Exemplar Retrieval (TypeScript)
import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb';
const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({ region: 'us-east-1' }));
export async function retrieveExemplars(
query: string, intent: string, tenantId: string, limit = 3
): Promise {
// Query StackFlow_AIExemplar by intentType GSI, ordered by qualityScore desc
const result = await ddb.send(new QueryCommand({
TableName: 'StackFlow_AIExemplar',
IndexName: 'intentType-qualityScore-index',
KeyConditionExpression: 'intentType = :i',
FilterExpression: 'tenantId = :t AND qualityScore >= :minScore AND #s = :active',
ExpressionAttributeValues: {
':i': intent, ':t': tenantId,
':minScore': 0.7, ':active': 'approved',
},
ExpressionAttributeNames: { '#s': 'status' },
ScanIndexForward: false, // Descending by qualityScore
Limit: limit * 3, // Over-fetch to allow for filtering
}));
const exemplars = (result.Items ?? []).slice(0, limit);
return exemplars.map(item => ({
exemplarId: item.exemplarId,
queryText: item.queryText,
resolution: item.resolution,
qualityScore: item.qualityScore,
category: item.category,
tags: item.tags ?? [],
}));
}
interface Exemplar {
exemplarId: string; queryText: string; resolution: string;
qualityScore: number; category: string; tags: string[];
}
6. Procedural Memory Lookup (TypeScript)
import { DynamoDBDocumentClient, QueryCommand } from '@aws-sdk/lib-dynamodb';
export async function lookupProceduralMemory(
intent: string, entities: string[], tenantId: string
): Promise {
const result = await ddb.send(new QueryCommand({
TableName: 'StackFlow_ProceduralMemory',
IndexName: 'intentType-index',
KeyConditionExpression: 'intentType = :i AND tenantId = :t',
FilterExpression: '#s = :active',
ExpressionAttributeValues: { ':i': intent, ':t': tenantId, ':active': 'active' },
ExpressionAttributeNames: { '#s': 'status' },
ScanIndexForward: false,
Limit: 5,
}));
if (!result.Items?.length) return null;
// Find best match by entity overlap
const best = result.Items.reduce((prev, curr) => {
const prevScore = countEntityOverlap(entities, prev.entities ?? []);
const currScore = countEntityOverlap(entities, curr.entities ?? []);
return currScore > prevScore ? curr : prev;
});
return {
procedureId: best.procedureId,
title: best.title,
steps: best.steps as string[],
estimatedMinutes: best.estimatedMinutes,
successRate: best.successRate,
lastUsed: best.lastUsedAt,
};
}
function countEntityOverlap(a: string[], b: string[]): number {
const setB = new Set(b.map(x => x.toLowerCase()));
return a.filter(x => setB.has(x.toLowerCase())).length;
}
interface ProceduralMemory {
procedureId: string; title: string; steps: string[];
estimatedMinutes: number; successRate: number; lastUsed: string;
}
7. Redis Semantic Cache Client
import { createClient, RedisClientType } from 'redis';
import { SecretsManagerClient, GetSecretValueCommand } from '@aws-sdk/client-secrets-manager';
let redisClient: RedisClientType | null = null;
export async function getRedisClient(): Promise {
if (redisClient?.isReady) return redisClient;
const sm = new SecretsManagerClient({ region: 'us-east-1' });
const secret = await sm.send(new GetSecretValueCommand({ SecretId: 'stackflow/redis/auth-token' }));
const { auth_token } = JSON.parse(secret.SecretString!);
redisClient = createClient({
url: 'rediss://master.stackflow-redis-prod.mnzfvx.use1.cache.amazonaws.com:6379',
password: auth_token,
socket: {
tls: true,
rejectUnauthorized: false, // ElastiCache uses Amazon-signed cert
connectTimeout: 5000,
commandTimeout: 3000,
},
lazyConnect: true,
}) as RedisClientType;
redisClient.on('error', (err) => console.error('Redis Client Error:', err.message));
await redisClient.connect();
return redisClient;
}
export async function cacheGet(key: string): Promise {
try {
const client = await getRedisClient();
return await client.get(key);
} catch (e) {
console.warn('Cache get failed (non-fatal):', e);
return null;
}
}
export async function cacheSet(key: string, value: string, ttlSeconds = 3600): Promise {
try {
const client = await getRedisClient();
await client.setEx(key, ttlSeconds, value);
} catch (e) {
console.warn('Cache set failed (non-fatal):', e);
}
}
8. AI Audit Logging (TypeScript)
import { DynamoDBDocumentClient, PutCommand } from '@aws-sdk/lib-dynamodb';
import { randomUUID } from 'crypto';
interface AuditEntry {
tenantId: string; userId: string; query: string;
answer: string; intent: string; latencyMs: number;
modelUsed: string; fromCache: boolean;
sources?: string[]; error?: string;
incidentId?: string; sessionId?: string;
}
export async function logToAuditTable(entry: AuditEntry): Promise {
const now = Math.floor(Date.now() / 1000);
const TTL_DAYS = 90;
try {
await ddb.send(new PutCommand({
TableName: 'stackflow-ai-audit-log',
Item: {
pk: `${entry.tenantId}#${entry.userId}`,
sk: `${now}#${randomUUID()}`,
...entry,
timestamp: new Date().toISOString(),
expiresAt: now + (TTL_DAYS * 86400), // TTL for automatic deletion
sourceCount: (entry.sources ?? []).length,
queryHash: require('crypto').createHash('sha256')
.update(entry.query.toLowerCase().trim()).digest('hex').substring(0, 16),
},
}));
} catch (e) {
// Audit logging is non-critical -- log but don't fail the request
console.error('Audit log write failed:', e);
}
}
9. End-to-End Integration Test (Python)
#!/usr/bin/env python3
"""pytest integration tests for the Graph RAG pipeline"""
import boto3
import pytest
import json
import hashlib
import redis
KB_ID = 'BXJGG7PIPS'
NEPTUNE_ENDPOINT = 'stackflow-knowledge-graph.cluster-c6pq0smgmlri.us-east-1.neptune.amazonaws.com'
REDIS_HOST = 'master.stackflow-redis-prod.mnzfvx.use1.cache.amazonaws.com'
@pytest.fixture(scope='module')
def bedrock_client():
return boto3.client('bedrock-agent-runtime', region_name='us-east-1')
@pytest.fixture(scope='module')
def redis_client():
import json
sm = boto3.client('secretsmanager', region_name='us-east-1')
creds = json.loads(sm.get_secret_value(SecretId='stackflow/redis/auth-token')['SecretString'])
r = redis.Redis(host=REDIS_HOST, port=6379, password=creds['auth_token'],
ssl=True, ssl_cert_reqs='none', decode_responses=True)
r.ping() # Verify connection
return r
def test_bedrock_kb_is_active():
ba = boto3.client('bedrock-agent', region_name='us-east-1')
response = ba.get_knowledge_base(knowledgeBaseId=KB_ID)
assert response['knowledgeBase']['status'] == 'ACTIVE', "KB must be ACTIVE"
def test_bedrock_kb_returns_results(bedrock_client):
response = bedrock_client.retrieve(
knowledgeBaseId=KB_ID,
retrievalQuery={'text': 'Aurora connection pool exhausted'},
retrievalConfiguration={'vectorSearchConfiguration': {'numberOfResults': 3}}
)
results = response['retrievalResults']
assert len(results) > 0, "KB must return at least one result"
assert results[0]['score'] > 0.3, f"Top result score too low: {results[0]['score']}"
def test_redis_connectivity(redis_client):
test_key = 'sf:test:ping'
redis_client.setex(test_key, 30, 'pong')
assert redis_client.get(test_key) == 'pong'
redis_client.delete(test_key)
def test_cache_key_determinism(redis_client):
q1 = 'how do I reset cognito password'
q2 = ' How do I reset Cognito Password '
key1 = hashlib.sha256(q1.lower().strip().encode()).hexdigest()
key2 = hashlib.sha256(q2.lower().strip().encode()).hexdigest()
assert key1 == key2, "Normalized queries must produce identical cache keys"
def test_ai_audit_log_table_exists():
ddb = boto3.client('dynamodb', region_name='us-east-1')
response = ddb.describe_table(TableName='stackflow-ai-audit-log')
assert response['Table']['TableStatus'] == 'ACTIVE'
def test_pattern_cluster_table_exists():
ddb = boto3.client('dynamodb', region_name='us-east-1')
response = ddb.describe_table(TableName='StackFlow_PatternCluster')
assert response['Table']['TableStatus'] == 'ACTIVE'
def test_procedural_memory_table_exists():
ddb = boto3.client('dynamodb', region_name='us-east-1')
response = ddb.describe_table(TableName='StackFlow_ProceduralMemory')
assert response['Table']['TableStatus'] == 'ACTIVE'
def test_full_rag_pipeline(bedrock_client):
"""End-to-end test of retrieve-and-generate"""
response = bedrock_client.retrieve_and_generate(
input={'text': 'How do I troubleshoot Redis AUTH failures in StackFlow?'},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': KB_ID,
'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-3-haiku-20240307-v1:0',
'retrievalConfiguration': {'vectorSearchConfiguration': {'numberOfResults': 3}}
}
}
)
assert 'output' in response
assert len(response['output']['text']) > 50, "Response must be substantive"
print(f"RAG response ({len(response['output']['text'])} chars): {response['output']['text'][:200]}")
10. CloudWatch Dashboard (CDK)
import * as cdk from 'aws-cdk-lib';
import * as cloudwatch from 'aws-cdk-lib/aws-cloudwatch';
export function createGraphRagDashboard(scope: cdk.Stack): cloudwatch.Dashboard {
const dashboard = new cloudwatch.Dashboard(scope, 'StackFlowGraphRagDashboard', {
dashboardName: 'StackFlow-GraphRAG-Monitoring',
});
const aiCacheHitRate = new cloudwatch.Metric({
namespace: 'StackFlow/AI', metricName: 'CacheHitRate',
statistic: 'Average', period: cdk.Duration.minutes(5),
});
const aiLatency = new cloudwatch.Metric({
namespace: 'StackFlow/AI', metricName: 'AvgLatencyMs',
statistic: 'Average', period: cdk.Duration.minutes(5),
});
const bedrockInvocations = new cloudwatch.Metric({
namespace: 'StackFlow/AI', metricName: 'ModelInvocations',
statistic: 'Sum', period: cdk.Duration.minutes(5),
});
const neptuneQueries = new cloudwatch.Metric({
namespace: 'AWS/Neptune', metricName: 'GremlinRequestsPerSec',
dimensionsMap: { DBClusterIdentifier: 'stackflow-knowledge-graph' },
statistic: 'Average', period: cdk.Duration.minutes(5),
});
const neptuneCpu = new cloudwatch.Metric({
namespace: 'AWS/Neptune', metricName: 'CPUUtilization',
dimensionsMap: { DBClusterIdentifier: 'stackflow-knowledge-graph' },
statistic: 'Average', period: cdk.Duration.minutes(5),
});
const redisCacheHits = new cloudwatch.Metric({
namespace: 'AWS/ElastiCache', metricName: 'CacheHits',
dimensionsMap: { CacheClusterId: 'stackflow-redis-prod-0001-001' },
statistic: 'Sum', period: cdk.Duration.minutes(5),
});
dashboard.addWidgets(
new cloudwatch.GraphWidget({
title: 'AI Cache Hit Rate (%)',
left: [aiCacheHitRate], width: 8, height: 6,
}),
new cloudwatch.GraphWidget({
title: 'AI Response Latency (ms)',
left: [aiLatency], width: 8, height: 6,
}),
new cloudwatch.GraphWidget({
title: 'Bedrock Invocations / 5min',
left: [bedrockInvocations], width: 8, height: 6,
}),
new cloudwatch.GraphWidget({
title: 'Neptune Gremlin Requests/sec',
left: [neptuneQueries], width: 8, height: 6,
}),
new cloudwatch.GraphWidget({
title: 'Neptune CPU Utilization (%)',
left: [neptuneCpu], width: 8, height: 6,
}),
new cloudwatch.GraphWidget({
title: 'Redis Cache Hits / 5min',
left: [redisCacheHits], width: 8, height: 6,
}),
);
return dashboard;
}