WebSocket Crate

The mockforge-ws crate provides comprehensive WebSocket protocol support for MockForge, featuring replay capabilities, proxy functionality, and AI-powered event generation. It enables realistic WebSocket interaction simulation for testing and development.

Architecture Overview

graph TD
    A[WebSocket Server] --> B[Connection Handler]
    B --> C{Operation Mode}
    C --> D[Replay Mode]
    C --> E[Proxy Mode]
    C --> F[Interactive Mode]
    C --> G[AI Event Mode]

    D --> H[Replay File Parser]
    H --> I[JSONL Processor]
    I --> J[Template Expansion]

    E --> K[Proxy Handler]
    K --> L[Upstream Connection]
    L --> M[Message Forwarding]

    F --> N[Message Router]
    N --> O[Pattern Matching]
    O --> P[Response Generation]

    G --> Q[AI Event Generator]
    Q --> R[LLM Integration]
    R --> S[Event Stream]

Core Components

Connection Management

WebSocket Router

The crate provides multiple router configurations for different use cases:

#![allow(unused)]
fn main() {
// Basic WebSocket router
let app = router();

// With latency simulation
let latency_injector = LatencyInjector::new(profile, Default::default());
let app = router_with_latency(latency_injector);

// With proxy support
let proxy_handler = WsProxyHandler::new(proxy_config);
let app = router_with_proxy(proxy_handler);
}

Connection Lifecycle

  • Establishment: Connection tracking and metrics collection
  • Message Handling: Bidirectional message processing
  • Error Handling: Graceful error recovery and logging
  • Termination: Connection cleanup and statistics recording

Operational Modes

1. Replay Mode

Scripted message playback from JSONL files:

{"ts":0,"dir":"out","text":"HELLO {{uuid}}","waitFor":"^CLIENT_READY$"}
{"ts":10,"dir":"out","text":"{\"type\":\"welcome\",\"sessionId\":\"{{uuid}}\"}"}
{"ts":20,"dir":"out","text":"{\"data\":{{randInt 1 100}}}","waitFor":"^ACK$"}

Features:

  • Timestamp-based message sequencing
  • Template expansion ({{uuid}}, {{now}}, {{randInt min max}})
  • Conditional waiting with regex/JSONPath patterns
  • Deterministic replay for testing

2. Proxy Mode

Forward messages to upstream WebSocket servers:

#![allow(unused)]
fn main() {
let proxy_config = WsProxyConfig {
    upstream_url: "wss://api.example.com/ws".to_string(),
    should_proxy: true,
    message_transform: Some(transform_fn),
};
}

Features:

  • Transparent message forwarding
  • Optional message transformation
  • Connection pooling and reuse
  • Error handling and fallback

3. Interactive Mode

Dynamic responses based on client messages:

#![allow(unused)]
fn main() {
// Echo mode (default)
while let Some(msg) = socket.recv().await {
    if let Ok(Message::Text(text)) = msg {
        let response = format!("echo: {}", text);
        socket.send(Message::Text(response.into())).await?;
    }
}
}

Features:

  • Pattern-based response matching
  • JSONPath query support
  • State-aware conversations
  • Custom response logic

4. AI Event Mode

LLM-powered event stream generation:

#![allow(unused)]
fn main() {
let ai_config = WebSocketAiConfig {
    enabled: true,
    narrative: "Simulate 5 minutes of live stock market trading".to_string(),
    event_count: 20,
    replay: Some(ReplayAugmentationConfig {
        provider: "openai".to_string(),
        model: "gpt-3.5-turbo".to_string(),
        ..Default::default()
    }),
};

let generator = AiEventGenerator::new(ai_config);
generator.stream_events(socket, Some(20)).await;
}

Message Processing

Template Expansion

Rich templating system for dynamic content:

#![allow(unused)]
fn main() {
// UUID generation
"session_{{uuid}}" → "session_550e8400-e29b-41d4-a716-446655440000"

// Timestamp manipulation
"{{now}}" → "2024-01-15T10:30:00Z"
"{{now+1h}}" → "2024-01-15T11:30:00Z"

// Random values
"{{randInt 1 100}}" → "42"
}

JSONPath Matching

Sophisticated message pattern matching:

// Wait for specific message types
{"waitFor": "$.type", "text": "Type received: {{$.type}}"}

// Match nested object properties
{"waitFor": "$.user.id", "text": "User {{$.user.name}} authenticated"}

// Array element matching
{"waitFor": "$.items[0].status", "text": "First item status: {{$.items[0].status}}"}

AI Integration

Event Generation

Narrative-driven event stream creation:

#![allow(unused)]
fn main() {
pub struct AiEventGenerator {
    engine: Arc<RwLock<ReplayAugmentationEngine>>,
}

impl AiEventGenerator {
    pub async fn stream_events(&self, socket: WebSocket, max_events: Option<usize>) {
        // Generate contextual events based on narrative
        let events = self.engine.write().await.generate_stream().await?;
        // Stream events to client with configurable rate
    }
}
}

Replay Augmentation

Enhance existing replay files with AI-generated content:

#![allow(unused)]
fn main() {
let augmentation_config = ReplayAugmentationConfig {
    narrative: "Add realistic user interactions to chat replay".to_string(),
    augmentation_points: vec!["user_message".to_string()],
    provider: "openai".to_string(),
    model: "gpt-4".to_string(),
};
}

Observability

Metrics Collection

Comprehensive WebSocket metrics:

#![allow(unused)]
fn main() {
let registry = get_global_registry();
registry.record_ws_connection_established();
registry.record_ws_message_received();
registry.record_ws_message_sent();
registry.record_ws_connection_closed(duration, status);
}

Tracing Integration

Distributed tracing for WebSocket connections:

#![allow(unused)]
fn main() {
let span = create_ws_connection_span(&request);
let _guard = span.enter();
// Connection handling with tracing context
record_ws_message_success(&span, message_size);
}

Logging

Structured logging for connection lifecycle and message flow:

#![allow(unused)]
fn main() {
info!("WebSocket connection established from {}", peer_addr);
debug!("Received message: {} bytes", message.len());
error!("WebSocket error: {}", error);
}

Performance Features

Connection Pooling

Efficient management of upstream connections in proxy mode:

#![allow(unused)]
fn main() {
// Connection reuse for proxy operations
let connection = pool.get_connection(upstream_url).await?;
connection.forward_message(message).await?;
}

Message Buffering

Optimized message processing with buffering:

#![allow(unused)]
fn main() {
// Stream processing for large message volumes
while let Some(batch) = message_buffer.next_batch().await {
    for message in batch {
        process_message(message).await?;
    }
}
}

Rate Limiting

Configurable message rate limits:

#![allow(unused)]
fn main() {
let rate_limiter = RateLimiter::new(1000, Duration::from_secs(60)); // 1000 msg/min
if rate_limiter.check_limit().await {
    process_message(message).await?;
}
}

Configuration

WebSocket Server Config

#![allow(unused)]
fn main() {
pub struct WsConfig {
    pub port: u16,
    pub max_connections: usize,
    pub max_message_size: usize,
    pub heartbeat_interval: Duration,
    pub replay_file: Option<PathBuf>,
    pub proxy_config: Option<WsProxyConfig>,
    pub ai_config: Option<WebSocketAiConfig>,
}
}

Proxy Configuration

#![allow(unused)]
fn main() {
pub struct WsProxyConfig {
    pub upstream_url: String,
    pub should_proxy: bool,
    pub message_transform: Option<TransformFn>,
    pub connection_pool_size: usize,
    pub retry_attempts: u32,
}
}

AI Configuration

#![allow(unused)]
fn main() {
pub struct WebSocketAiConfig {
    pub enabled: bool,
    pub narrative: String,
    pub event_count: usize,
    pub events_per_second: f64,
    pub replay: Option<ReplayAugmentationConfig>,
}
}

Testing Support

Integration Tests

End-to-end WebSocket testing:

#![allow(unused)]
fn main() {
#[tokio::test]
async fn test_websocket_replay() {
    // Start WebSocket server with replay file
    let server = TestServer::new(router()).await;

    // Connect test client
    let (ws_stream, _) = connect_async(server.url()).await?;

    // Verify replay sequence
    let msg = ws_stream.next().await.unwrap()?;
    assert_eq!(msg, Message::Text("HELLO test-session".into()));
}
}

Replay File Validation

Automated validation of replay configurations:

#![allow(unused)]
fn main() {
#[test]
fn test_replay_file_parsing() {
    let replay_data = r#"{"ts":0,"text":"hello","waitFor":"ready"}"#;
    let entry: ReplayEntry = serde_json::from_str(replay_data)?;
    assert_eq!(entry.ts, 0);
    assert_eq!(entry.text, "hello");
}
}

Error Handling

Connection Errors

Graceful handling of connection failures:

#![allow(unused)]
fn main() {
match socket.recv().await {
    Ok(Some(Message::Close(frame))) => {
        info!("Client closed connection: {:?}", frame);
        break;
    }
    Err(e) => {
        error!("WebSocket error: {}", e);
        record_ws_error();
        break;
    }
    _ => continue,
}
}

Message Processing Errors

Robust message parsing and transformation:

#![allow(unused)]
fn main() {
match serde_json::from_str::<Value>(&text) {
    Ok(json) => process_json_message(json).await,
    Err(e) => {
        warn!("Invalid JSON message: {}", e);
        send_error_response("Invalid JSON format").await?;
    }
}
}

Usage Examples

Basic WebSocket Server

use mockforge_ws::start_with_latency;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Start with default latency profile
    start_with_latency(3001, Some(LatencyProfile::normal())).await?;
    Ok(())
}

Replay Mode

# Set environment variable for replay file
export MOCKFORGE_WS_REPLAY_FILE=./replay.jsonl

# Start server
mockforge serve --ws-port 3001

Proxy Mode

#![allow(unused)]
fn main() {
use mockforge_ws::router_with_proxy;
use mockforge_core::{WsProxyConfig, WsProxyHandler};

let proxy_config = WsProxyConfig {
    upstream_url: "wss://api.example.com/ws".to_string(),
    should_proxy: true,
};

let proxy = WsProxyHandler::new(proxy_config);
let app = router_with_proxy(proxy);
}

AI Event Generation

#![allow(unused)]
fn main() {
use mockforge_ws::{AiEventGenerator, WebSocketAiConfig};

let config = WebSocketAiConfig {
    enabled: true,
    narrative: "Simulate real-time chat conversation".to_string(),
    event_count: 50,
    events_per_second: 2.0,
};

let generator = AiEventGenerator::new(config)?;
generator.stream_events_with_rate(socket, None, 2.0).await;
}

Future Enhancements

  • Advanced Pattern Matching: Complex event correlation and state machines
  • Load Testing: Built-in WebSocket load testing capabilities
  • Recording Mode: Capture live WebSocket interactions for replay
  • Clustering: Distributed WebSocket session management
  • Protocol Extensions: Support for custom WebSocket subprotocols