# Developing Pipeline Step Modules โญ **DEVELOPER GUIDE** ## Introduction Pipeline Step modules are the **core building blocks** of the J4F Assistant's NATS-powered streaming pipeline. They are async generator functions that process streaming data in real-time, currently powering our **chat streaming** workflow with markdown normalization and content processing. **๐ŸŽฏ Current Implementation**: Chat stream โ†’ NATS Pipeline โ†’ Real-time processing โ†’ Frontend delivery **๐Ÿš€ Future Vision**: Multi-input pipelines, tool execution, AI-to-AI communication, and complex workflow orchestration **๐Ÿ“š What You'll Learn**: - Step module architecture and patterns for chat streaming - Real-world examples with the current chat pipeline - Advanced techniques for LLM response processing - Future pipeline capabilities and roadmap - Testing and debugging strategies ## ๐Ÿš€ Quick Start: Your First Pipeline Step ### Current Pipeline Flow ``` User Message โ†’ LLM โ†’ Chat Stream โ†’ NATS Pipeline โ†’ Processed Stream โ†’ Frontend ``` ### 1. Create a Chat Processing Step ```javascript // src/pipeline/steps/myFirstStep.js export async function* myFirstStep(input, config = {}) { const { prefix = '[PROCESSED]' } = config; for await (const chunk of input) { // Transform LLM chat chunks in real-time yield `${prefix} ${chunk}`; } } export const stepName = 'myFirstStep'; ``` ### 2. Register the Step ```javascript // src/pipeline/stepRegistry.js import { myFirstStep } from './steps/myFirstStep.js'; export const stepRegistry = { // ...existing steps (markdownNormalizer, cogitoStreamingNormalizer) myFirstStep: myFirstStep }; ``` ### 3. Test Your Step in Chat ```bash # Chat with your step active curl -X POST http://localhost:3000/api/unified/stream \ -H "Content-Type: application/json" \ -d '{"message": "Tell me about space", "pipeline": ["myFirstStep"]}' ``` **Result**: Every chunk from the LLM will be prefixed with `[PROCESSED]` before reaching the frontend ## ๐Ÿ“– Current Implementation Templates ### Template 1: Chat Stream Processor ```javascript // Perfect for: processing LLM responses, markdown cleanup, content filtering export async function* chatStreamProcessor(input, config = {}) { const { removeEmojis = false, addTimestamps = false, maxChunkLength = 1000 } = config; for await (const chunk of input) { let processed = chunk; // Remove emojis if requested if (removeEmojis) { processed = processed.replace(/[\u{1F600}-\u{1F64F}]|[\u{1F300}-\u{1F5FF}]|[\u{1F680}-\u{1F6FF}]|[\u{1F1E0}-\u{1F1FF}]/gu, ''); } // Add timestamps if (addTimestamps && chunk.trim()) { processed = `[${new Date().toISOString()}] ${processed}`; } // Chunk size management for streaming if (processed.length > maxChunkLength) { const chunks = processed.match(new RegExp(`.{1,${maxChunkLength}}`, 'g')); for (const subChunk of chunks) { yield subChunk; } } else { yield processed; } } } ``` ### Template 2: AI-to-AI Communication (Future Feature) ```javascript // ๐Ÿš€ ROADMAP FEATURE: Steps that call additional LLMs export async function* aiToAiProcessor(input, config = {}) { const { secondaryModel = 'gpt-4o-mini', prompt = 'Enhance this text:', threshold = 100 // Only process chunks over this length } = config; let buffer = ''; for await (const chunk of input) { buffer += chunk; // When we have enough content, send to secondary AI if (buffer.length >= threshold) { try { // ๐Ÿš€ FUTURE: This will call another LLM via the pipeline const enhanced = await callSecondaryLLM(prompt + buffer, secondaryModel); yield enhanced; buffer = ''; } catch (error) { // Fallback to original content yield buffer; buffer = ''; } } } // Process remaining buffer if (buffer) { yield buffer; } } ``` ### Template 3: Tool Call Executor (Future Feature) ```javascript // ๐Ÿš€ ROADMAP FEATURE: Parse and execute tool calls within the pipeline export async function* toolCallExecutor(input, config = {}) { const { enabledTools = [], sandbox = true } = config; let textBuffer = ''; for await (const chunk of input) { textBuffer += chunk; // Look for tool call patterns (e.g., ...) const toolCallPattern = /(.*?)<\/tool_call>/gs; let lastIndex = 0; let match; while ((match = toolCallPattern.exec(textBuffer)) !== null) { // Yield text before tool call yield textBuffer.slice(lastIndex, match.index); const [fullMatch, toolName, toolArgs] = match; if (enabledTools.includes(toolName)) { try { // ๐Ÿš€ FUTURE: Execute tool and inject result const result = await executeTool(toolName, toolArgs, { sandbox }); yield `**Tool Result (${toolName}):** ${result}\n\n`; } catch (error) { yield `**Tool Error (${toolName}):** ${error.message}\n\n`; } } else { // Tool not enabled, return as-is yield fullMatch; } lastIndex = match.index + fullMatch.length; } // Keep unprocessed part in buffer textBuffer = textBuffer.slice(lastIndex); } // Yield remaining text if (textBuffer) { yield textBuffer; } } ``` ## ๐ŸŽฏ Current Implementation vs. Future Roadmap ### โœ… **Currently Available (v1.0)** **Chat Stream Processing** - โœ… LLM response streaming through NATS pipeline - โœ… Real-time markdown normalization (`markdownNormalizer`, `cogitoStreamingNormalizer`) - โœ… Custom text transformation steps - โœ… Configurable processing chains - โœ… Frontend streaming with word-by-word animation **Working Pipeline Steps** ```javascript // These are production-ready examples 'markdownNormalizer' // Basic markdown cleanup 'cogitoStreamingNormalizer' // Optimized for cogito:3b model 'simpleStreamingNormalizer' // General-purpose text processing ``` ### ๐Ÿš€ **Planned Features (Roadmap)** **Multi-Input Pipelines** - ๐Ÿ”ฒ `` tag processing endpoint - ๐Ÿ”ฒ Tool call parsing and execution - ๐Ÿ”ฒ File upload processing pipelines - ๐Ÿ”ฒ Webhook input processing **AI-to-AI Communication** - ๐Ÿ”ฒ Secondary LLM calls within pipeline steps - ๐Ÿ”ฒ Multi-model consensus processing - ๐Ÿ”ฒ AI critique and enhancement workflows - ๐Ÿ”ฒ Cross-model translation and adaptation **Advanced Features** - ๐Ÿ”ฒ Conditional pipeline branching - ๐Ÿ”ฒ Parallel processing steps - ๐Ÿ”ฒ External API integration steps - ๐Ÿ”ฒ Database integration and storage steps - ๐Ÿ”ฒ Real-time collaboration features ### ๐Ÿ’ก **Step Ideas for Current Implementation** Since we're working with chat streams, here are practical step ideas you can build today: ```javascript // Content enhancement 'sentimentAnalyzer' // Add emotional context to responses 'factChecker' // Flag potentially incorrect information 'languageDetector' // Identify and tag language switches 'codeExtractor' // Pull out and format code blocks 'linkEnhancer' // Convert plain URLs to rich links // User experience 'readabilityOptimizer' // Improve text readability scores 'responseTimer' // Add processing time metadata 'chunkSizeOptimizer' // Optimize chunk sizes for smooth streaming 'errorRecovery' // Handle and recover from processing errors // Developer tools 'debugLogger' // Log processing details for debugging 'performanceProfiler' // Track processing performance 'contentValidator' // Validate output against rules 'rateLimiter' // Control processing speed ``` ## Real-World Implementation: Building on Chat Streaming Let's build a practical step that enhances the current chat experience: ### Example: Code Block Enhancer for Chat Streams ```javascript // src/pipeline/steps/codeBlockEnhancer.js export async function* codeBlockEnhancer(input, config = {}) { const { addCopyButton = true, showLanguage = true, addLineNumbers = false } = config; let buffer = ''; let inCodeBlock = false; let codeLanguage = ''; for await (const chunk of input) { buffer += chunk; // Detect code block start const startMatch = buffer.match(/```(\w+)?\n/); if (startMatch && !inCodeBlock) { const beforeCode = buffer.substring(0, startMatch.index); codeLanguage = startMatch[1] || 'text'; // Yield text before code block if (beforeCode) { yield beforeCode; } // Start enhanced code block let enhancement = `
`; if (showLanguage) { enhancement += `
${codeLanguage}
`; } if (addCopyButton) { enhancement += ``; } enhancement += '
';
      yield enhancement;
      
      // Update buffer and state
      buffer = buffer.substring(startMatch.index + startMatch[0].length);
      inCodeBlock = true;
    }
    
    // Detect code block end
    const endMatch = buffer.match(/```/);
    if (endMatch && inCodeBlock) {
      const codeContent = buffer.substring(0, endMatch.index);
      const afterCode = buffer.substring(endMatch.index + 3);
      
      // Yield code content
      if (codeContent) {
        yield escapeHtml(codeContent);
      }
      
      // Close enhanced code block
      yield '
'; // Yield text after code block if (afterCode) { yield afterCode; } // Reset state buffer = ''; inCodeBlock = false; codeLanguage = ''; } else if (!inCodeBlock && buffer.length > 500) { // Yield non-code content when buffer gets large yield buffer; buffer = ''; } } // Handle remaining buffer if (buffer) { if (inCodeBlock) { yield escapeHtml(buffer) + ''; } else { yield buffer; } } } function escapeHtml(text) { return text.replace(/[&<>"']/g, (char) => { const escape = { '&': '&', '<': '<', '>': '>', '"': '"', "'": ''' }; return escape[char]; }); } export const stepName = 'codeBlockEnhancer'; ``` ### Example: Response Quality Analyzer ```javascript // src/pipeline/steps/responseQualityAnalyzer.js export async function* responseQualityAnalyzer(input, config = {}) { const { addQualityScore = true, highlightKeyPoints = true, detectConfidence = true } = config; let completeResponse = ''; let chunks = []; for await (const chunk of input) { chunks.push(chunk); completeResponse += chunk; // Stream chunks normally while building complete response yield chunk; // Analyze every 10 chunks for real-time feedback if (chunks.length % 10 === 0) { const partialAnalysis = analyzePartialResponse(completeResponse); if (partialAnalysis.confidence === 'low') { yield '\n\n*[Assistant note: This response may require verification]*\n\n'; } } } // Final analysis if (addQualityScore) { const analysis = analyzeCompleteResponse(completeResponse); yield `\n\n---\n**Response Quality**: ${analysis.score}/10 | **Confidence**: ${analysis.confidence}`; if (analysis.keyPoints.length > 0) { yield `\n**Key Points**: ${analysis.keyPoints.join(', ')}`; } } } function analyzePartialResponse(text) { // Simple confidence detection based on language patterns const uncertainPhrases = ['might be', 'possibly', 'I think', 'maybe', 'not sure']; const uncertaintyCount = uncertainPhrases.reduce((count, phrase) => count + (text.toLowerCase().includes(phrase) ? 1 : 0), 0); return { confidence: uncertaintyCount > 2 ? 'low' : 'medium' }; } function analyzeCompleteResponse(text) { const wordCount = text.split(/\s+/).length; const hasCode = text.includes('```'); const hasLists = /[\*\-\d]\s/.test(text); const hasHeadings = text.includes('#'); // Simple scoring algorithm let score = 5; // Base score if (wordCount > 50) score += 1; if (hasCode) score += 1; if (hasLists) score += 1; if (hasHeadings) score += 1; if (wordCount > 200) score += 1; return { score: Math.min(score, 10), confidence: score > 7 ? 'high' : score > 5 ? 'medium' : 'low', keyPoints: extractKeyPoints(text) }; } function extractKeyPoints(text) { // Extract sentences that might be key points const sentences = text.split(/[.!?]+/).filter(s => s.trim().length > 20); return sentences .filter(s => /^[A-Z]/.test(s.trim())) // Starts with capital .slice(0, 3) // Max 3 key points .map(s => s.trim().substring(0, 50) + (s.length > 50 ? '...' : '')); } export const stepName = 'responseQualityAnalyzer'; ``` ## Step Module Architecture Every pipeline step is an **async generator function** that: - Accepts an async iterable input stream - Processes chunks one by one - Yields transformed output chunks - Maintains state across chunks if needed ```javascript // Basic step module template export async function* myCustomStep(input, config = {}) { // Initialize any state/configuration const { option1 = 'default', option2 = false } = config; for await (const chunk of input) { // Process the chunk const processedChunk = transform(chunk, option1, option2); // Yield the result yield processedChunk; } // Optional: flush any remaining state // yield finalData; } // Export step name for registry export const stepName = 'myCustomStep'; ``` ### Real Example: Simple Text Transformation ```javascript // src/pipeline/steps/uppercaseStep.js export async function* uppercaseStep(input, config = {}) { const { preserveCodeBlocks = true } = config; let inCodeBlock = false; for await (const chunk of input) { // Handle code block detection if (preserveCodeBlocks && chunk.includes('```')) { inCodeBlock = !inCodeBlock; } // Transform based on context const transformed = inCodeBlock ? chunk : chunk.toUpperCase(); yield transformed; } } export const stepName = 'uppercaseStep'; ``` ## Step Module Types ### 1. **Stateless Steps** Simple transformations that don't need to remember previous chunks. ```javascript // Character replacement example export async function* replaceCharsStep(input, config = {}) { const { from = '', to = '' } = config; for await (const chunk of input) { yield chunk.replace(new RegExp(from, 'g'), to); } } ``` ### 2. **Stateful Steps** More complex transformations that accumulate state across chunks. ```javascript // Buffer-based processing example export async function* bufferLinesStep(input, config = {}) { const { bufferSize = 3 } = config; const buffer = []; for await (const chunk of input) { buffer.push(chunk); // Process when buffer is full if (buffer.length >= bufferSize) { const processed = processLines(buffer); yield processed; buffer.length = 0; // Clear buffer } } // Flush remaining buffer if (buffer.length > 0) { yield processLines(buffer); } } ``` ### 3. **Complex Pattern Matching Steps** Advanced steps that understand content structure. ```javascript // Markdown link processing example export async function* linkProcessorStep(input, config = {}) { const { baseUrl = '', target = '_blank' } = config; let textBuffer = ''; for await (const chunk of input) { textBuffer += chunk; // Look for complete markdown links const linkPattern = /\[([^\]]+)\]\(([^)]+)\)/g; let lastIndex = 0; let match; while ((match = linkPattern.exec(textBuffer)) !== null) { // Yield text before the link yield textBuffer.slice(lastIndex, match.index); // Process and yield the link const [fullMatch, text, url] = match; const fullUrl = url.startsWith('http') ? url : baseUrl + url; yield `${text}`; lastIndex = match.index + fullMatch.length; } // Keep unprocessed part in buffer textBuffer = textBuffer.slice(lastIndex); } // Yield any remaining text if (textBuffer) { yield textBuffer; } } ``` ## Advanced Step Patterns ### 1. **Multi-Chunk Processing** For operations that need multiple chunks to make decisions: ```javascript export async function* smartLineBreakStep(input, config = {}) { const { lookAhead = 2 } = config; const chunkQueue = []; for await (const chunk of input) { chunkQueue.push(chunk); // Process when we have enough context if (chunkQueue.length > lookAhead) { const currentChunk = chunkQueue.shift(); const context = chunkQueue.join(''); // Make intelligent decisions based on context const processed = processWithContext(currentChunk, context); yield processed; } } // Flush remaining chunks while (chunkQueue.length > 0) { yield chunkQueue.shift(); } } ``` ### 2. **Conditional Processing** Steps that apply different logic based on content analysis: ```javascript export async function* conditionalFormatterStep(input, config = {}) { for await (const chunk of input) { // Detect content type if (isCodeBlock(chunk)) { yield formatAsCode(chunk); } else if (isList(chunk)) { yield formatAsList(chunk); } else if (isTable(chunk)) { yield formatAsTable(chunk); } else { yield formatAsText(chunk); } } } function isCodeBlock(chunk) { return chunk.includes('```') || /^\s{4,}/.test(chunk); } function isList(chunk) { return /^\s*[\d\-\*\+][\.\)\s]/.test(chunk); } ``` ### 3. **Error Handling and Recovery** Robust steps with proper error handling: ```javascript export async function* robustProcessorStep(input, config = {}) { const { fallbackMode = 'passthrough', logErrors = true } = config; for await (const chunk of input) { try { // Attempt complex processing const result = await complexTransformation(chunk); yield result; } catch (error) { if (logErrors) { console.error(`[robustProcessorStep] Error processing chunk:`, error); } // Fallback behavior switch (fallbackMode) { case 'passthrough': yield chunk; // Return original break; case 'skip': // Skip this chunk continue; case 'placeholder': yield '[Processing Error]'; break; default: yield chunk; } } } } ``` ## Production Example: Enhanced Markdown Normalizer Here's how we built the `cogitoStreamingNormalizer` - a real step that's currently in production: ```javascript // src/pipeline/steps/cogitoStreamingNormalizer.js export async function* cogitoStreamingNormalizer(input, config = {}) { const { debug = false } = config; let buffer = ''; let isCodeBlock = false; let justProcessedHeading = false; for await (const chunk of input) { if (debug) console.log('[Cogito] Processing chunk:', JSON.stringify(chunk)); buffer += chunk; // Track code blocks to avoid processing markdown inside them if (chunk.includes('```')) { isCodeBlock = !isCodeBlock; } if (!isCodeBlock) { // Process headings const headingMatch = buffer.match(/^(#{1,6})\s*(.+)$/m); if (headingMatch) { const [fullMatch, hashes, title] = headingMatch; const beforeHeading = buffer.substring(0, buffer.indexOf(fullMatch)); const afterHeading = buffer.substring(buffer.indexOf(fullMatch) + fullMatch.length); // Ensure proper spacing around headings let normalizedHeading = ''; if (beforeHeading && !beforeHeading.endsWith('\n\n')) { normalizedHeading += '\n\n'; } normalizedHeading += `${hashes} ${title.trim()}\n\n`; yield beforeHeading + normalizedHeading; buffer = afterHeading; justProcessedHeading = true; continue; } // Process numbered lists const listMatch = buffer.match(/^(\d+)\.\s*(.+)$/m); if (listMatch && !justProcessedHeading) { const [fullMatch, number, content] = listMatch; const beforeList = buffer.substring(0, buffer.indexOf(fullMatch)); const afterList = buffer.substring(buffer.indexOf(fullMatch) + fullMatch.length); let normalizedList = ''; if (beforeList && !beforeList.endsWith('\n')) { normalizedList += '\n'; } normalizedList += `${number}. ${content.trim()}\n`; yield beforeList + normalizedList; buffer = afterList; justProcessedHeading = false; continue; } } justProcessedHeading = false; // Yield buffer when it gets large enough or contains complete sentences if (buffer.length > 100 || /[.!?]\s/.test(buffer)) { const sentences = buffer.split(/([.!?]\s+)/); if (sentences.length > 2) { const completeContent = sentences.slice(0, -1).join(''); yield completeContent; buffer = sentences[sentences.length - 1]; } } } // Yield any remaining content if (buffer.trim()) { yield buffer; } } export const stepName = 'cogitoStreamingNormalizer'; ``` **Why This Works in Production:** - โœ… Handles fragmented markdown from LLM streaming - โœ… Preserves code blocks without modification - โœ… Ensures proper spacing for readability - โœ… Optimized for cogito:3b output patterns - โœ… Streams smoothly without blocking ## Step Registration and Configuration ### 1. **Register Your Step** Add your step to the step registry: ```javascript // src/pipeline/stepRegistry.js import { syntaxHighlighterStep, stepName as syntaxHighlighterName } from './steps/syntaxHighlighterStep.js'; export const stepRegistry = { // ...existing steps [syntaxHighlighterName]: syntaxHighlighterStep, }; ``` ### 2. **Configure Step Parameters** Steps can be configured when creating pipelines: ```javascript // Configure via API const pipelineConfig = { name: 'enhanced-markdown-pipeline', steps: [ { name: 'markdownNormalizer', config: { preserveCodeBlocks: true } }, { name: 'syntaxHighlighterStep', config: { theme: 'github-dark', showLineNumbers: true, languages: ['javascript', 'python', 'typescript'] } } ] }; // Run pipeline await PipelineManager.runPipelineForConversation( modelStream, uuid, userId, pipelineConfig ); ``` ## Testing Your Step Modules ### 1. **Unit Testing with Chat Stream Simulation** ```javascript // tests/pipeline/steps/codeBlockEnhancer.test.js import { codeBlockEnhancer } from '../../../src/pipeline/steps/codeBlockEnhancer.js'; async function* createChatStream(chunks) { for (const chunk of chunks) { yield chunk; // Simulate realistic chat streaming delay await new Promise(resolve => setTimeout(resolve, 10)); } } describe('codeBlockEnhancer', () => { test('enhances JavaScript code blocks in chat stream', async () => { // Simulate how an LLM might stream code const chatChunks = [ 'Here is a function:\n\n', '```javascript\n', 'function hello() {\n', ' console.log("Hello!");\n', '}\n', '```\n\n', 'This function logs a message.' ]; const results = []; const stream = codeBlockEnhancer(createChatStream(chatChunks), { addCopyButton: true, showLanguage: true }); for await (const chunk of stream) { results.push(chunk); } const output = results.join(''); expect(output).toContain('enhanced-code-block'); expect(output).toContain('data-language="javascript"'); expect(output).toContain('copy-btn'); expect(output).toContain('function hello'); }); test('handles fragmented code blocks correctly', async () => { // Test with chunks that break in the middle of code markers const fragmentedChunks = [ 'Some text ```java', 'script\ncons', 'ole.log("test");', '\n``` more text' ]; const results = []; const stream = codeBlockEnhancer(createChatStream(fragmentedChunks)); for await (const chunk of stream) { results.push(chunk); } const output = results.join(''); expect(output).toContain('enhanced-code-block'); expect(output).toContain('console.log("test");'); }); }); ``` ### 2. **Integration Testing with Real Pipeline** ```javascript // tests/pipeline/integration/chatPipelineFlow.test.js import { PipelineManager } from '../../../src/pipeline/PipelineManager.js'; async function* mockLLMResponse(chunks) { for (const chunk of chunks) { yield chunk; await new Promise(resolve => setTimeout(resolve, 50)); // Realistic delay } } test('complete chat pipeline with custom steps', async () => { const llmResponse = [ '# Battle Plan\n\n', '1. Deploy forces\n', '2. Secure perimeter\n', '```bash\n', 'deploy --force marine-squad\n', '```\n', 'Execute immediately.' ]; // Test the full pipeline as used in production const pipeline = ['cogitoStreamingNormalizer', 'codeBlockEnhancer']; const results = []; const stream = await PipelineManager.runPipelineForTest( mockLLMResponse(llmResponse), pipeline ); for await (const chunk of stream) { results.push(chunk); } const finalOutput = results.join(''); // Verify markdown was normalized expect(finalOutput).toMatch(/# Battle Plan\n\n/); expect(finalOutput).toMatch(/1\. Deploy forces\n/); // Verify code was enhanced expect(finalOutput).toContain('enhanced-code-block'); expect(finalOutput).toContain('data-language="bash"'); }); ``` ### 3. **Live Testing with the Web Interface** Create a test HTML page to verify your steps work with real chat: ```html Test My Pipeline Step
``` ## Best Practices ### 1. **Performance Optimization** ```javascript // โœ… Good: Efficient chunk processing export async function* efficientStep(input) { for await (const chunk of input) { // Process immediately and yield yield processChunk(chunk); } } // โŒ Bad: Accumulating all data before processing export async function* inefficientStep(input) { const allChunks = []; for await (const chunk of input) { allChunks.push(chunk); // Memory leak for large streams } yield processAllChunks(allChunks); } ``` ### 2. **Memory Management** ```javascript // โœ… Good: Bounded buffers with cleanup export async function* memoryEfficientStep(input) { const buffer = []; const MAX_BUFFER_SIZE = 1000; for await (const chunk of input) { buffer.push(chunk); if (buffer.length >= MAX_BUFFER_SIZE) { yield processBuffer(buffer); buffer.length = 0; // Clear buffer } } } ``` ### 3. **Error Resilience** ```javascript // โœ… Good: Graceful error handling export async function* resilientStep(input, config = {}) { const { onError = 'passthrough' } = config; for await (const chunk of input) { try { yield processChunk(chunk); } catch (error) { console.error('[resilientStep] Processing error:', error); if (onError === 'passthrough') { yield chunk; // Return original } // Continue processing next chunks } } } ``` ### 4. **Configuration Validation** ```javascript // โœ… Good: Validate and provide defaults export async function* configuredStep(input, config = {}) { // Validate configuration const { threshold = 10, mode = 'normal', enabled = true } = config; if (threshold < 0) { throw new Error('threshold must be non-negative'); } if (!['normal', 'strict', 'loose'].includes(mode)) { throw new Error('mode must be one of: normal, strict, loose'); } if (!enabled) { // Pass through unchanged for await (const chunk of input) { yield chunk; } return; } // Normal processing for await (const chunk of input) { yield processWithConfig(chunk, { threshold, mode }); } } ``` ## Debugging and Monitoring ### 1. **Debug Utilities** ```javascript // src/pipeline/utils/debugUtils.js export function createDebugStep(stepName) { return async function* debugWrapper(input, config = {}) { console.log(`[DEBUG] ${stepName} starting with config:`, config); let chunkCount = 0; for await (const chunk of input) { chunkCount++; console.log(`[DEBUG] ${stepName} chunk ${chunkCount}:`, chunk.substring(0, 50)); yield chunk; } console.log(`[DEBUG] ${stepName} processed ${chunkCount} chunks`); }; } // Usage const debugNormalizer = createDebugStep('markdownNormalizer'); ``` ### 2. **Performance Monitoring** ```javascript // Timing wrapper for steps export function createTimedStep(stepFunction, stepName) { return async function* timedWrapper(input, config = {}) { const startTime = Date.now(); let chunkCount = 0; for await (const chunk of stepFunction(input, config)) { chunkCount++; yield chunk; } const duration = Date.now() - startTime; console.log(`[PERF] ${stepName}: ${chunkCount} chunks in ${duration}ms`); }; } ``` ## Common Step Patterns Library ### 1. **Text Accumulator Pattern** ```javascript export function createAccumulatorStep(processFunction, flushTrigger) { return async function* accumulatorStep(input, config = {}) { let accumulator = ''; for await (const chunk of input) { accumulator += chunk; if (flushTrigger(accumulator, config)) { yield processFunction(accumulator, config); accumulator = ''; } } if (accumulator) { yield processFunction(accumulator, config); } }; } // Usage const sentenceProcessor = createAccumulatorStep( (text) => text.trim() + '. ', (acc) => acc.includes('.') ); ``` ### 2. **State Machine Pattern** ```javascript export function createStateMachineStep(states, initialState) { return async function* stateMachineStep(input, config = {}) { let currentState = initialState; for await (const chunk of input) { const stateHandler = states[currentState]; const { output, nextState } = stateHandler(chunk, config); if (output !== null) { yield output; } if (nextState) { currentState = nextState; } } }; } ``` ## ๐Ÿš€ Future Pipeline Features (Coming Soon) ### Multi-Input Pipeline Support ```javascript // ๐Ÿ”ฎ COMING SOON: Multiple input sources export async function* multiInputProcessor(inputs, config = {}) { const { chatStream, thinkingStream, toolCalls } = inputs; // Process multiple streams simultaneously for await (const [source, chunk] of combineStreams(chatStream, thinkingStream)) { if (source === 'thinking') { // Process internal reasoning yield ``; } else if (source === 'chat') { // Process user-facing content yield chunk; } } // Execute any tool calls found for (const toolCall of toolCalls) { const result = await executeTool(toolCall); yield `\n\n**Tool Result:** ${result}\n\n`; } } ``` ### AI-to-AI Pipeline Steps ```javascript // ๐Ÿ”ฎ COMING SOON: Chain multiple AI models export async function* aiChainProcessor(input, config = {}) { const { models = ['gpt-4o-mini', 'claude-3-haiku'] } = config; let content = ''; // Collect complete response for await (const chunk of input) { content += chunk; yield chunk; // Pass through original } // Send to secondary AI for enhancement for (const model of models) { const enhanced = await callAI(model, `Improve this response: ${content}`); yield `\n\n---\n**Enhanced by ${model}:**\n${enhanced}`; } } ``` ### Tool Execution Pipeline ```javascript // ๐Ÿ”ฎ COMING SOON: Execute tools within the pipeline export async function* toolExecutor(input, config = {}) { const { allowedTools = [], sandbox = true } = config; for await (const chunk of input) { // Look for tool call syntax: @tool:name(args) const toolMatch = chunk.match(/@(\w+):(\w+)\(([^)]*)\)/); if (toolMatch && allowedTools.includes(toolMatch[2])) { const [, category, tool, args] = toolMatch; try { const result = await executeTool(category, tool, args, { sandbox }); yield chunk.replace(toolMatch[0], `**[${tool} executed]** ${result}`); } catch (error) { yield chunk.replace(toolMatch[0], `**[${tool} failed]** ${error.message}`); } } else { yield chunk; } } } ``` ## ๐ŸŽจ Using the Visual Pipeline Editor ### Production-Ready Visual Interface The J4F Assistant includes a **complete visual pipeline editor** for creating and managing processing pipelines without code. #### Accessing the Pipeline Editor 1. **From Main Interface**: Look for the "Pipeline Editor" section in the sidebar 2. **Current Pipeline Display**: Shows the active pipeline configuration 3. **Create New**: Use "Create Pipeline" button to start building #### Building Your First Visual Pipeline ``` Step 1: Open Pipeline Editor โ”œโ”€โ”€ Current Pipeline: Shows active configuration โ”œโ”€โ”€ Available Steps: Drag-and-drop library โ””โ”€โ”€ Create Pipeline: Start new pipeline Step 2: Add Processing Steps โ”œโ”€โ”€ Drag "Simple Streaming Normalizer" to canvas โ”œโ”€โ”€ Drag "Markdown Normalizer" to canvas โ””โ”€โ”€ Connect: Input โ†’ Step 1 โ†’ Step 2 โ†’ Output Step 3: Configure Steps โ”œโ”€โ”€ Click on each step to open configuration โ”œโ”€โ”€ Set parameters (e.g., normalize headings: true) โ””โ”€โ”€ Preview configuration in real-time Step 4: Test & Deploy โ”œโ”€โ”€ Use built-in testing functionality โ”œโ”€โ”€ Save pipeline configuration โ””โ”€โ”€ Activate for chat processing ``` #### Visual Editor Features - **๐ŸŽฏ Node-Based Interface**: Each step is a visual node with input/output connections - **โš™๏ธ Configuration Panel**: Click any step to configure its parameters - **๐Ÿ”— Connection System**: Visual lines show data flow between steps - **๐Ÿ“Š Real-time Status**: See pipeline status (Steps: 2, Connections: 3, Status: Ready) - **๏ฟฝ Persistence**: Save and load pipeline configurations - **๐Ÿงช Testing**: Built-in pipeline testing without writing code #### Available Steps in Visual Editor From the screenshots, the editor includes: - **Markdown Normalizer**: Normalizes markdown formatting and structure - **Cogito Streaming Normalizer**: Optimized for cogito:3b model - **Simple Streaming Normalizer**: General-purpose processing - **Custom Steps**: Any steps you develop appear automatically #### Step Configuration Options When you click on a step, the configuration panel shows: - **Basic Properties**: Step ID, Display Name, Position - **Step Configuration**: Custom parameters for each step type - **Connection Management**: Input/output connection handling - **Live Preview**: See configuration changes in real-time ### Integration with Development #### Testing Your Custom Steps When you develop a new step using the code templates, it automatically appears in the visual editor: ```bash # 1. Develop your step (as shown in previous sections) export async function* myCustomStep(input, config = {}) { // ... step logic } # 2. Register the step # src/pipeline/stepRegistry.js export const stepRegistry = { myCustomStep: myCustomStep // Automatically appears in visual editor }; # 3. Use in visual editor # The step now appears in the sidebar and can be dragged onto the canvas ``` #### Visual to Code Workflow ``` Visual Editor โ†’ Generate Configuration โ†’ Apply to Chat Pipeline โ†“ โ†“ โ†“ Drag & Drop JSON Config Production Use Steps { "steps": [...] } Real-time Chat ``` The visual editor generates the same JSON configuration that you can use programmatically: ```javascript // Generated from visual editor const visualPipelineConfig = { "name": "My Visual Pipeline", "steps": [ { "name": "simpleStreamingNormalizer", "config": { "normalizeHeadings": true } }, { "name": "markdownNormalizer", "config": { "preserveCodeBlocks": true } } ] }; // Use in API calls fetch('/api/unified/stream', { method: 'POST', body: JSON.stringify({ message: "Hello world", pipelineConfig: visualPipelineConfig }) }); ``` ## ๐Ÿ› ๏ธ Contributing to the Pipeline System ### Development Workflow with Visual Editor 1. **Code Development**: Create your step using the templates above 2. **Auto-Registration**: Step appears in visual editor automatically 3. **Visual Testing**: Use drag-and-drop to test your step 4. **Configuration Tuning**: Use visual interface to find optimal settings 5. **Production Deployment**: Apply tested configuration to live chat ### Getting Started with Development 1. **Set up your development environment:** ```bash git clone cd j4f-assistant npm install npm run dev ``` 2. **Create your first step:** ```bash # Create step file touch src/pipeline/steps/myAwesomeStep.js # Add to registry # Edit src/pipeline/stepRegistry.js # Test it npm test -- --grep "myAwesomeStep" ``` 3. **Test with live chat:** ```bash # Start the server npm run dev # Open browser to test page open test-my-pipeline-step.html ``` ### Step Development Checklist - [ ] **Function signature:** `async function* stepName(input, config = {})` - [ ] **Export step name:** `export const stepName = 'myStep'` - [ ] **Handle streaming:** Process chunks as they arrive, don't accumulate everything - [ ] **Memory management:** Use bounded buffers, clean up state - [ ] **Error handling:** Graceful fallbacks, don't break the pipeline - [ ] **Configuration:** Support customization via config object - [ ] **Testing:** Unit tests + integration tests + live testing - [ ] **Documentation:** Clear examples and use cases - [ ] **Performance:** Optimize for real-time streaming ### Ideas for Community Contributions **Beginner-Friendly Steps:** - Text formatting (bold/italic detection) - Emoji replacement/standardization - Language translation hints - Reading time estimation - Word count tracking **Intermediate Steps:** - Sentiment analysis integration - Content summarization - Link preview generation - Image description generation - Table formatting enhancement **Advanced Steps:** - Multi-model consensus - Real-time fact checking - Code execution sandboxing - External API integration - Complex workflow orchestration ## Related Documentation - [Pipeline System Overview](./Overview.md) - Architecture and concepts - [Backend Streaming](../backend/LangChain-Streaming.md) - How streaming works - [API Reference](../api/Reference.md) - Available endpoints - [Contributing Guidelines](../CONTRIBUTING.md) - How to contribute --- **Ready to build your first pipeline step?** Start with a simple text transformation and work your way up to more complex processing. The pipeline system is designed to be powerful yet approachable - your custom step could be running in production within minutes! ## Visual Editor Best Practices #### When to Use Visual vs. Code **Use Visual Editor For:** - โœ… Quick pipeline prototyping and testing - โœ… Non-technical users creating processing workflows - โœ… Experimenting with step combinations and configurations - โœ… Teaching pipeline concepts to new team members - โœ… Debugging pipeline flow and connection issues **Use Code Configuration For:** - โœ… Production deployments with version control - โœ… Complex conditional logic and branching - โœ… Integration with CI/CD pipelines - โœ… Automated testing and configuration management - โœ… Advanced pipeline features not yet in visual editor #### Visual Editor Limitations (Current) - ๐Ÿ”ฒ Conditional branching (roadmap feature) - ๐Ÿ”ฒ Parallel processing paths (roadmap feature) - ๐Ÿ”ฒ External API integrations (roadmap feature) - ๐Ÿ”ฒ Complex error handling workflows (roadmap feature) #### Hybrid Workflow ``` Visual Editor โ†’ Export Config โ†’ Version Control โ†’ Production โ†“ โ†“ โ†“ โ†“ Prototype JSON Config Git Commit Auto-Deploy & Test Generation & Review & Monitor ``` Many teams use this hybrid approach: 1. **Prototype** in visual editor 2. **Export** configuration as JSON 3. **Version control** the JSON configuration 4. **Deploy** via automated systems 5. **Monitor** performance and adjust