# Pipeline System Overview ⭐ **FLAGSHIP FEATURE** ## Introduction The J4F Assistant features a **sophisticated, enterprise-grade pipeline processing system** built on NATS streaming that enables modular, configurable, and scalable data processing workflows. This system is particularly powerful for real-time streaming operations like chat message normalization, but can be extended to any data transformation pipeline. **🎯 Perfect for**: Real-time AI response processing, markdown normalization, content filtering, syntax highlighting, multi-format output generation, and custom data transformations. **⚑ Performance**: Zero-buffering, sub-millisecond chunk processing with full backpressure handling and conversation isolation. ## πŸš€ Getting Started ### Quick Example: Custom Text Processor ```javascript // 1. Create a custom step in src/pipeline/steps/myProcessor.js export async function* uppercaseProcessor(input, config = {}) { const { preserveCodeBlocks = true } = config; let inCodeBlock = false; for await (const chunk of input) { if (preserveCodeBlocks && chunk.includes('```')) { inCodeBlock = !inCodeBlock; } const processed = inCodeBlock ? chunk : chunk.toUpperCase(); yield processed; } } // 2. Register in src/pipeline/stepRegistry.js import { uppercaseProcessor } from './steps/myProcessor.js'; export const stepRegistry = { // ...existing steps uppercaseProcessor: uppercaseProcessor }; // 3. Use in API calls const pipeline = ['markdownNormalizer', 'uppercaseProcessor']; await PipelineManager.runPipelineForConversation(stream, uuid, userId, pipeline); ``` ### πŸ“š Documentation Links - **[Step Development Guide](./Step-Development-Guide.md)** - Complete guide to creating custom pipeline steps - **[Architecture Overview](../structure/Architecture.md)** - System design and component interaction - **[Backend Streaming](../backend/LangChain-Streaming.md)** - LangChain integration details ## Key Features ### πŸš€ **NATS-Powered Streaming** - **Real-time Processing**: Live streaming through NATS message queues - **Scalable Architecture**: Distributed processing with backpressure handling - **Isolation**: User and conversation-specific subjects (`conversation-{userId}-{uuid}`) - **Reliability**: Built-in error handling and EOF markers ### πŸ”§ **Modular Pipeline Steps** - **Composable**: Mix and match processing steps as needed - **Reusable**: Steps can be used across multiple pipelines - **Configurable**: Each step supports custom configuration - **Extensible**: Easy to add new processing steps ### πŸ“Š **Dynamic Pipeline Configuration** - **Database-Driven**: Pipeline definitions stored in MongoDB - **Runtime Composition**: Pipelines built and executed dynamically - **Multiple Contexts**: Different pipelines for different application sections - **Version Control**: Support for pipeline versioning and rollback ## Architecture Components ### Core Components ``` β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Data Source │───▢│ PipelineManager │───▢│ NATS Stream β”‚ β”‚ (Model Stream) β”‚ β”‚ β”‚ β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β–² β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Step Registry β”‚ β”‚ & Configuration β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ NATS Stream │───▢│ Pipeline Runner │───▢│ Frontend β”‚ β”‚ β”‚ β”‚ β”‚ β”‚ (SSE/WS) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ ``` ### Key Classes 1. **`PipelineManager`** - Main orchestrator for pipeline execution 2. **`stepRegistry`** - Registry of available processing steps 3. **`buildInterfaceRegistry`** - Interface resolver for input/output streams 4. **`runner`** - Core pipeline execution engine ## Current Implementation: Markdown Normalization ### Example Pipeline Flow ```javascript // 1. Model generates streaming text const modelStream = await langChainManager.streamResponse(message, userId); // 2. Pipeline processes stream through normalization steps const pipeline = [ 'markdownNormalizer', // Normalize markdown syntax 'headingNormalizer', // Fix heading formatting 'listNormalizer', // Standardize list formatting 'bulletNormalizer' // Convert Unicode bullets to dashes ]; // 3. Results streamed to frontend via NATS await PipelineManager.runPipelineForConversation( modelStream, conversationUuid, userId, pipeline ); ``` ### Pipeline Steps | Step | Purpose | Input | Output | |------|---------|-------|--------| | `markdownNormalizer` | Base markdown cleanup | Raw text stream | Normalized markdown | | `headingNormalizer` | Fix heading spacing | Markdown text | Properly spaced headings | | `listNormalizer` | Standardize lists | Markdown text | Consistent list formatting | | `bulletNormalizer` | Convert Unicode bullets | Markdown text | ASCII bullet points | ## NATS Subject Architecture ### Subject Naming Convention ``` # Pipeline Processing normalizer-{uuid}-{step} # Individual processing steps conversation-{userId}-{uuid} # Final output for frontend consumption # Special Markers __NATS_EOF__ # End of stream marker __STREAM_ERROR__:{message} # Error notifications ``` ### Message Flow 1. **Model Output** β†’ `normalizer-{uuid}-initial` 2. **Step Processing** β†’ `normalizer-{uuid}-{stepName}` 3. **Final Output** β†’ `conversation-{userId}-{uuid}` 4. **Frontend Consumption** β†’ SSE/WebSocket streaming ## Configuration & Management ### Pipeline Definition Format ```json { "name": "markdown-normalization-pipeline", "input": "ChatOutputStream", "output": "FEInput", "steps": [ { "name": "markdownNormalizer", "input": "ChatOutputStream", "output": "FEInput", "config": { "preserveCodeBlocks": true, "normalizeHeadings": true } } ] } ``` ### Step Registry ```javascript // src/pipeline/stepRegistry.js export const stepRegistry = { markdownNormalizer: markdownNormalizer, headingNormalizer: normalizeHeadings, listNormalizer: normalizeLists, bulletNormalizer: normalizeBullets, // ... additional steps }; ``` ## API Integration ### Streaming Endpoints - **`/api/message/stream`** - Basic streaming with pipeline processing - **`/api/unified/stream`** - Advanced streaming with full pipeline support - **`/api/markdown/stream`** - Markdown-specific streaming pipeline ### Frontend Integration ```javascript // Frontend receives processed chunks via SSE const eventSource = new EventSource('/api/unified/stream'); eventSource.onmessage = (event) => { const data = JSON.parse(event.data); if (data.type === 'markdown_chunk') { // Render pre-processed markdown renderChunk(data.data); } }; ``` ## Performance & Monitoring ### Built-in Debugging - **Timestamp Logging**: Each chunk processed with timing information - **Subject Tracking**: Monitor NATS subject activity - **Step Profiling**: Performance metrics for individual pipeline steps - **Error Tracking**: Comprehensive error reporting and recovery ### Monitoring Points ```javascript // Debug passthrough for timing analysis export async function* debugChunkPassthrough(input) { for await (const chunk of input) { console.debug(`[Pipeline] ${Date.now()} Processing chunk:`, chunk); yield chunk; } } ``` ## Extension Points ### Adding New Pipeline Steps 1. **Create Step Function**: ```javascript // src/pipeline/steps/customStep.js export async function* customStep(input) { for await (const chunk of input) { // Process chunk const processed = processChunk(chunk); yield processed; } } ``` 2. **Register Step**: ```javascript // src/pipeline/stepRegistry.js import { customStep } from './steps/customStep.js'; export const stepRegistry = { // ... existing steps customStep: customStep }; ``` 3. **Configure Pipeline**: ```json { "steps": [ { "name": "customStep", "config": { "option": "value" } } ] } ``` ### Creating New Pipeline Types - **Message Enrichment**: Add metadata and context - **Content Filtering**: Apply content policies and filtering - **Analytics**: Collect metrics and usage data - **Multi-format Output**: Generate multiple output formats simultaneously ## Future Enhancements ## 🎨 Visual Pipeline Editor ### βœ… **Already Available - Production Ready** The J4F Assistant includes a **sophisticated visual pipeline editor** with drag-and-drop functionality for creating and managing processing pipelines. #### Key Features - **🎯 Drag-and-Drop Interface**: Intuitive node-based pipeline construction - **πŸ“Š Real-time Visualization**: See your pipeline flow as you build it - **βš™οΈ Step Configuration**: Configure each step with custom parameters - **πŸ”— Connection Management**: Visual connection between pipeline steps - **πŸ’Ύ Pipeline Persistence**: Save and load pipeline configurations - **πŸ§ͺ Live Testing**: Test pipelines directly from the editor #### Available Pipeline Steps The editor includes all production-ready steps: - **Markdown Normalizer**: Basic markdown formatting and cleanup - **Cogito Streaming Normalizer**: Optimized for cogito:3b model output - **Simple Streaming Normalizer**: General-purpose text processing - **Custom Steps**: Any steps you develop are automatically available #### Pipeline Editor Interface ``` πŸ“ Pipeline Sidebar 🎨 Visual Editor Canvas βš™οΈ Configuration Panel β”œβ”€β”€ Available Steps β”œβ”€β”€ Model Input Node β”œβ”€β”€ Step Properties β”œβ”€β”€ Markdown Normalizer β”œβ”€β”€ Processing Steps β”œβ”€β”€ Configuration β”œβ”€β”€ Cogito Normalizer β”œβ”€β”€ Connection Lines β”œβ”€β”€ Parameter Tuning β”œβ”€β”€ Simple Normalizer └── NATS Output Node └── Live Preview └── Custom Steps ``` #### How to Use 1. **Create Pipeline**: Click "Create Pipeline" in the interface 2. **Add Steps**: Drag steps from the sidebar to the canvas 3. **Connect Steps**: Draw connections between input/output nodes 4. **Configure**: Click on steps to modify their configuration 5. **Test**: Use the built-in testing to verify pipeline behavior 6. **Save**: Persist your pipeline for production use ### πŸš€ **Planned Enhancements** 1. **Pipeline Versioning** - Support for pipeline version management 2. **Conditional Logic** - Branching and conditional processing 3. **External Integrations** - Webhooks and external service calls 4. **Performance Optimization** - Parallel processing and caching 5. **Pipeline Templates** - Pre-built pipeline templates for common use cases ### Roadmap - [ ] Graph-based pipeline execution (DAG support) - [ ] Pipeline templates and marketplace - [ ] Real-time pipeline monitoring dashboard - [ ] A/B testing for pipeline configurations - [ ] Machine learning pipeline integration - [ ] Collaborative editing features #### Screenshots and Visual Guides > **πŸ“Έ Documentation Enhancement**: Include screenshots showing: > - Pipeline editor interface with sidebar, canvas, and configuration panel > - Step configuration dialogs with parameter settings > - Connection drawing between pipeline steps > - Real-time pipeline status and monitoring > - Example pipelines for common use cases *Screenshots can be added to this documentation section to provide visual guidance for users.* ## Related Documentation - [Backend Overview](../backend/Overview.md) - [Streaming Implementation](../backend/LangChain-Streaming.md) - [API Reference](../api/Reference.md) - [Architecture Overview](../structure/Architecture.md)