Integration Testing Skill
Comprehensive guide for testing integration points across the Muni system.
Overview
Muni has complex integration points that require end-to-end testing:
Integration Points:
-
Rover ↔ Depot: WebSocket (teleop, discovery, dispatch), UDP (metrics)
-
CAN Bus: VESC motor controllers, MCU peripherals
-
Database: PostgreSQL (dispatch service)
-
File System: Session recording, sync, playback
-
External Services: GPS/RTK, video streaming
Testing Approaches:
Component Test Type Tools Location
Rust firmware Integration tests tokio::test , mocks tests/ directory
TypeScript console Component/E2E tests Vitest, Testing Library src/**/*.test.tsx
Depot services Integration tests tokio::test , Docker tests/ directory
Python workers Unit/integration pytest tests/ directory
System-wide Manual/scripted Docker Compose integration-tests/
Rust Integration Testing
Test Structure
Unit tests (in same file):
// src/lib.rs or src/main.rs #[cfg(test)] mod tests { use super::*;
#[test]
fn test_function() {
assert_eq!(2 + 2, 4);
}
}
Integration tests (separate directory):
crate/ ├── src/ │ └── lib.rs ├── tests/ │ ├── common/ │ │ └── mod.rs # Shared test utilities │ ├── integration_test_1.rs │ └── integration_test_2.rs └── Cargo.toml
Async Testing with Tokio
// tests/websocket_test.rs use tokio::net::TcpListener; use futures_util::{SinkExt, StreamExt};
#[tokio::test] async fn test_websocket_echo() { // Start test server let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
let (stream, _) = listener.accept().await.unwrap();
// Handle WebSocket...
});
// Connect as client
let (ws_stream, _) = tokio_tungstenite::connect_async(
format!("ws://{}", addr)
).await.unwrap();
let (mut write, mut read) = ws_stream.split();
// Send message
write.send(tungstenite::Message::Text("hello".into())).await.unwrap();
// Receive response
let msg = read.next().await.unwrap().unwrap();
assert_eq!(msg, tungstenite::Message::Text("hello".into()));
}
Test Fixtures and Setup
Shared test utilities:
// tests/common/mod.rs use std::sync::Arc; use tokio::sync::RwLock;
pub struct TestContext { pub state: Arc<AppState>, pub addr: SocketAddr, }
impl TestContext { pub async fn new() -> Self { // Setup test state let state = Arc::new(AppState::new());
// Start test server
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
tokio::spawn(async move {
axum::serve(listener, create_router(state)).await.unwrap();
});
Self { state, addr }
}
pub fn url(&self, path: &str) -> String {
format!("http://{}{}", self.addr, path)
}
}
Using in tests:
// tests/api_test.rs mod common;
#[tokio::test] async fn test_health_endpoint() { let ctx = common::TestContext::new().await;
let response = reqwest::get(&ctx.url("/health")).await.unwrap();
assert_eq!(response.status(), 200);
let body: serde_json::Value = response.json().await.unwrap();
assert_eq!(body["status"], "ok");
}
Database Testing
In-memory database (SQLite):
use sqlx::SqlitePool;
async fn setup_test_db() -> SqlitePool { let pool = SqlitePool::connect(":memory:").await.unwrap();
// Run migrations
sqlx::migrate!("./migrations")
.run(&pool)
.await
.unwrap();
pool
}
#[tokio::test] async fn test_database_query() { let pool = setup_test_db().await;
// Insert test data
sqlx::query("INSERT INTO zones (name) VALUES ($1)")
.bind("Test Zone")
.execute(&pool)
.await
.unwrap();
// Query
let zones: Vec<Zone> = sqlx::query_as("SELECT * FROM zones")
.fetch_all(&pool)
.await
.unwrap();
assert_eq!(zones.len(), 1);
assert_eq!(zones[0].name, "Test Zone");
}
Test database (PostgreSQL via Docker):
// tests/common/mod.rs use testcontainers::{clients::Cli, images::postgres::Postgres, Container};
pub struct TestDb<'a> { _container: Container<'a, Postgres>, pub pool: PgPool, }
impl<'a> TestDb<'a> { pub async fn new(docker: &'a Cli) -> Self { let container = docker.run(Postgres::default()); let port = container.get_host_port_ipv4(5432);
let pool = PgPoolOptions::new()
.connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", port))
.await
.unwrap();
// Run migrations
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
Self {
_container: container,
pool,
}
}
}
#[tokio::test] async fn test_with_postgres() { let docker = Cli::default(); let db = TestDb::new(&docker).await;
// Use db.pool for queries...
}
Mocking CAN Bus
Mock CAN interface:
// tests/mock_can.rs use std::sync::{Arc, Mutex};
#[derive(Clone)] pub struct MockCan { sent_frames: Arc<Mutex<Vec<CanFrame>>>, }
impl MockCan { pub fn new() -> Self { Self { sent_frames: Arc::new(Mutex::new(Vec::new())), } }
pub fn send(&self, frame: CanFrame) {
self.sent_frames.lock().unwrap().push(frame);
}
pub fn recv(&self) -> Option<CanFrame> {
// Return pre-configured frames or generate on-demand
Some(CanFrame::new(0x001, &[0x01, 0x02, 0x03]))
}
pub fn get_sent_frames(&self) -> Vec<CanFrame> {
self.sent_frames.lock().unwrap().clone()
}
pub fn clear(&self) {
self.sent_frames.lock().unwrap().clear();
}
}
#[tokio::test] async fn test_motor_command() { let can = MockCan::new();
// Send motor command
send_motor_command(&can, MotorId::FrontLeft, 100).await;
// Verify frame was sent
let frames = can.get_sent_frames();
assert_eq!(frames.len(), 1);
assert_eq!(frames[0].id(), 0x001); // VESC ID
}
Trait-based abstraction (for real/mock CAN):
#[async_trait] pub trait CanInterface: Send + Sync { async fn send(&self, frame: CanFrame) -> Result<()>; async fn recv(&self) -> Result<CanFrame>; }
// Real implementation pub struct RealCan { socket: CanSocket, }
#[async_trait] impl CanInterface for RealCan { async fn send(&self, frame: CanFrame) -> Result<()> { self.socket.write_frame(&frame).await }
async fn recv(&self) -> Result<CanFrame> {
self.socket.read_frame().await
}
}
// Mock implementation pub struct MockCan { // ... as above }
#[async_trait] impl CanInterface for MockCan { async fn send(&self, frame: CanFrame) -> Result<()> { self.sent_frames.lock().unwrap().push(frame); Ok(()) }
async fn recv(&self) -> Result<CanFrame> {
Ok(CanFrame::new(0x001, &[0x01, 0x02, 0x03]))
}
}
// Usage in tests #[tokio::test] async fn test_with_mock_can() { let can: Arc<dyn CanInterface> = Arc::new(MockCan::new()); // Test code uses CanInterface trait... }
Testing WebSocket Protocols
WebSocket test client:
use tokio_tungstenite::{connect_async, tungstenite::Message};
#[tokio::test] async fn test_dispatch_protocol() { // Start service let ctx = TestContext::new().await; let ws_url = format!("ws://{}/ws", ctx.addr);
// Connect
let (ws_stream, _) = connect_async(&ws_url).await.unwrap();
let (mut write, mut read) = ws_stream.split();
// Send registration
let register = serde_json::json!({
"type": "register",
"rover_id": "test-rover"
});
write.send(Message::Text(register.to_string().into())).await.unwrap();
// Receive task assignment
let msg = read.next().await.unwrap().unwrap();
let task: TaskMessage = serde_json::from_str(&msg.to_text().unwrap()).unwrap();
assert_eq!(task.r#type, "task");
assert!(!task.waypoints.is_empty());
// Send progress
let progress = serde_json::json!({
"type": "progress",
"task_id": task.task_id,
"progress": 50
});
write.send(Message::Text(progress.to_string().into())).await.unwrap();
// Cleanup
write.close().await.unwrap();
}
Test Timeouts
use tokio::time::{timeout, Duration};
#[tokio::test] async fn test_with_timeout() { let result = timeout(Duration::from_secs(5), async { // Test code that might hang expensive_operation().await }).await;
assert!(result.is_ok(), "Test timed out");
}
Parallel Test Execution
cargo-nextest (faster test runner):
Install
cargo install cargo-nextest
Run tests in parallel
cargo nextest run
Run specific test
cargo nextest run test_name
Show output
cargo nextest run --nocapture
Test isolation:
// Use unique ports for each test use std::sync::atomic::{AtomicU16, Ordering};
static NEXT_PORT: AtomicU16 = AtomicU16::new(50000);
fn get_test_port() -> u16 { NEXT_PORT.fetch_add(1, Ordering::SeqCst) }
#[tokio::test] async fn test_parallel_1() { let port = get_test_port(); let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap(); // ... }
#[tokio::test] async fn test_parallel_2() { let port = get_test_port(); let listener = TcpListener::bind(format!("127.0.0.1:{}", port)).await.unwrap(); // ... }
TypeScript/React Testing
Test Setup (Vitest)
vitest.config.ts:
import { defineConfig } from 'vitest/config'; import react from '@vitejs/plugin-react';
export default defineConfig({ plugins: [react()], test: { environment: 'jsdom', setupFiles: ['./src/test/setup.ts'], globals: true, }, });
test/setup.ts:
import '@testing-library/jest-dom'; import { cleanup } from '@testing-library/react'; import { afterEach } from 'vitest';
// Cleanup after each test afterEach(() => { cleanup(); });
Component Testing
// src/components/RoverCard.test.tsx import { render, screen } from '@testing-library/react'; import { describe, it, expect } from 'vitest'; import { RoverCard } from './RoverCard';
describe('RoverCard', () => { it('renders rover information', () => { const rover = { id: 'rover-1', name: 'Frog Zero', online: true, batteryVoltage: 48.5, };
render(<RoverCard rover={rover} />);
expect(screen.getByText('Frog Zero')).toBeInTheDocument();
expect(screen.getByText('Online')).toBeInTheDocument();
expect(screen.getByText('48.5V')).toBeInTheDocument();
});
it('shows offline status when rover is offline', () => { const rover = { id: 'rover-1', name: 'Frog Zero', online: false, batteryVoltage: 0, };
render(<RoverCard rover={rover} />);
expect(screen.getByText('Offline')).toBeInTheDocument();
}); });
Testing Hooks
// src/hooks/useWebSocket.test.ts import { renderHook, waitFor } from '@testing-library/react'; import { describe, it, expect, vi } from 'vitest'; import { useWebSocket } from './useWebSocket'; import WS from 'jest-websocket-mock';
describe('useWebSocket', () => { it('connects to WebSocket server', async () => { const server = new WS('ws://localhost:1234');
const { result } = renderHook(() =>
useWebSocket('ws://localhost:1234')
);
await waitFor(() => {
expect(result.current.status).toBe('connected');
});
server.close();
});
it('receives messages', async () => { const server = new WS('ws://localhost:1234'); const onMessage = vi.fn();
renderHook(() =>
useWebSocket('ws://localhost:1234', { onMessage })
);
server.send({ type: 'update', data: 'test' });
await waitFor(() => {
expect(onMessage).toHaveBeenCalledWith({ type: 'update', data: 'test' });
});
server.close();
}); });
Mocking Zustand Store
// src/test/mocks/store.ts import { create } from 'zustand'; import { AppState } from '@/store';
export const createMockStore = (initialState?: Partial<AppState>) => { return create<AppState>(() => ({ rovers: [], selectedRover: null, teleopActive: false, ...initialState, // Actions setRovers: vi.fn(), selectRover: vi.fn(), startTeleop: vi.fn(), })); };
// In tests import { createMockStore } from '@/test/mocks/store';
it('displays selected rover', () => { const mockStore = createMockStore({ selectedRover: { id: 'rover-1', name: 'Frog Zero', online: true, }, });
render( <StoreProvider store={mockStore}> <RoverDetails /> </StoreProvider> );
expect(screen.getByText('Frog Zero')).toBeInTheDocument(); });
E2E Testing with Playwright
playwright.config.ts:
import { defineConfig } from '@playwright/test';
export default defineConfig({ testDir: './e2e', use: { baseURL: 'http://localhost:5173', screenshot: 'only-on-failure', video: 'retain-on-failure', }, webServer: { command: 'npm run dev', port: 5173, reuseExistingServer: !process.env.CI, }, });
e2e/teleop.spec.ts:
import { test, expect } from '@playwright/test';
test('teleop flow', async ({ page }) => { await page.goto('/');
// Select rover await page.click('[data-testid="rover-card-rover-1"]'); await expect(page).toHaveURL(//rover/rover-1/);
// Start teleop await page.click('[data-testid="start-teleop-button"]'); await expect(page.locator('[data-testid="teleop-active"]')).toBeVisible();
// Send command (keyboard) await page.keyboard.press('KeyW'); await page.waitForTimeout(100);
// Stop teleop await page.click('[data-testid="stop-teleop-button"]'); await expect(page.locator('[data-testid="teleop-active"]')).not.toBeVisible(); });
Docker Test Environments
Test Compose File
docker-compose.test.yml:
version: '3.8'
services: postgres-test: image: postgres:16 environment: POSTGRES_PASSWORD: test POSTGRES_DB: test_db ports: - "5433:5432" tmpfs: - /var/lib/postgresql/data # In-memory (fast, ephemeral)
redis-test: image: redis:7 ports: - "6380:6379"
test-runner: build: context: . dockerfile: Dockerfile.test depends_on: - postgres-test - redis-test environment: DATABASE_URL: postgres://postgres:test@postgres-test:5432/test_db REDIS_URL: redis://redis-test:6379 command: cargo test
Running tests:
Run tests in Docker
docker compose -f docker-compose.test.yml up --abort-on-container-exit
Cleanup
docker compose -f docker-compose.test.yml down -v
Test Fixtures with Docker
// tests/common/docker.rs use testcontainers::{clients::Cli, Container, RunnableImage}; use testcontainers::images::postgres::Postgres;
pub struct TestEnv<'a> { pub postgres: Container<'a, Postgres>, pub pool: PgPool, }
impl<'a> TestEnv<'a> { pub async fn new(docker: &'a Cli) -> Self { let postgres = docker.run(Postgres::default()); let port = postgres.get_host_port_ipv4(5432);
let pool = PgPoolOptions::new()
.connect(&format!("postgres://postgres:postgres@localhost:{}/postgres", port))
.await
.unwrap();
sqlx::migrate!("./migrations").run(&pool).await.unwrap();
Self { postgres, pool }
}
pub async fn seed_data(&self) {
sqlx::query("INSERT INTO zones (name) VALUES ('Test Zone')")
.execute(&self.pool)
.await
.unwrap();
}
}
Python Testing (splat-worker)
pytest Setup
tests/conftest.py:
import pytest import asyncio
@pytest.fixture def event_loop(): """Create event loop for async tests.""" loop = asyncio.get_event_loop_policy().new_event_loop() yield loop loop.close()
@pytest.fixture def sample_point_cloud(): """Load sample point cloud data.""" import numpy as np points = np.random.rand(1000, 3) colors = np.random.rand(1000, 3) return points, colors
tests/test_splat.py:
import pytest from splat_worker import process_session
@pytest.mark.asyncio async def test_process_session(sample_point_cloud): points, colors = sample_point_cloud
result = await process_session(points, colors)
assert result is not None
assert result.splat_count > 0
assert result.output_path.exists()
Rerun Recording Validation
Validate Recording Structure
// tests/recording_test.rs use rerun::RecordingStream;
#[tokio::test] async fn test_session_recording() { let rec = RecordingStream::new("test", rerun::default_store()).unwrap();
// Log some data
rec.log("world/robot/position", &rerun::Position3D::new(1.0, 2.0, 3.0)).unwrap();
// Save recording
rec.save("test.rrd").unwrap();
// Validate file exists and is not empty
let metadata = std::fs::metadata("test.rrd").unwrap();
assert!(metadata.len() > 0);
// Cleanup
std::fs::remove_file("test.rrd").unwrap();
}
Playback Validation
Play back recording
rerun test.rrd
Validate with Python
python -c "import rerun as rr; rr.load('test.rrd'); print('Valid')"
CI/CD Integration
GitHub Actions
. github/workflows/test.yml:
name: Tests
on: [push, pull_request]
jobs: test-rust: runs-on: ubuntu-latest services: postgres: image: postgres:16 env: POSTGRES_PASSWORD: test options: >- --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 ports: - 5432:5432
steps:
- uses: actions/checkout@v4
- name: Setup Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
- name: Run tests
run: cargo test
env:
DATABASE_URL: postgres://postgres:test@localhost/postgres
test-typescript: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4
- name: Setup Node
uses: actions/setup-node@v4
with:
node-version: '20'
- name: Install dependencies
run: npm ci
working-directory: depot/console
- name: Run tests
run: npm test
working-directory: depot/console
Testing Checklist
Before Committing
-
All unit tests pass: cargo test
-
Integration tests pass: cargo test --test '*'
-
TypeScript tests pass: npm test
-
Linting passes: cargo clippy , npm run lint
-
Format is correct: cargo fmt --check
Before Deploying
-
All tests pass in CI
-
Manual smoke tests completed
-
Database migrations tested
-
Rollback procedure verified
-
Monitoring dashboards checked
Test Coverage Goals
-
Critical paths: 100% (safety systems, CAN communication)
-
Core functionality: >80% (state machine, control logic)
-
Utilities: >60% (helpers, converters)
-
UI components: >50% (interactive elements)
Common Testing Patterns
Testing Error Handling
#[tokio::test] async fn test_error_handling() { let result = function_that_fails().await;
assert!(result.is_err());
assert_eq!(
result.unwrap_err().to_string(),
"Expected error message"
);
}
Testing with Timeouts
use tokio::time::{timeout, Duration};
#[tokio::test] async fn test_operation_completes_in_time() { let result = timeout( Duration::from_secs(5), slow_operation() ).await;
assert!(result.is_ok(), "Operation timed out");
}
Snapshot Testing
use insta::assert_debug_snapshot;
#[test] fn test_serialization() { let data = ComplexStruct { /* ... */ }; assert_debug_snapshot!(data); }
References
-
Tokio testing guide
-
SQLx testing
-
Testing Library docs
-
Vitest documentation
-
Playwright documentation
-
testcontainers-rs