Skip to content

Blog

MongoDB Atlas Vector Search for AI Applications: Advanced Semantic Similarity and Machine Learning Integration

Modern AI applications require sophisticated vector similarity search capabilities to power recommendation systems, retrieval-augmented generation (RAG), content discovery, and semantic search experiences. Traditional database systems struggle with high-dimensional vector operations, requiring complex integration with specialized vector databases that add architectural complexity, operational overhead, and data consistency challenges across multiple systems.

MongoDB Atlas Vector Search provides native support for high-dimensional vector similarity operations, enabling AI-powered applications to store, index, and query vector embeddings at scale while maintaining transactional consistency and familiar database operations. Unlike standalone vector databases that require separate infrastructure and complex data synchronization, Atlas Vector Search integrates seamlessly with existing MongoDB deployments, providing unified data management for both structured data and AI vector embeddings.

The Traditional Vector Search Challenge

Building AI applications with conventional database architectures creates significant technical and operational complexity:

-- Traditional PostgreSQL vector search - requires extensions and complex setup

-- Install pgvector extension (complex setup and maintenance)
CREATE EXTENSION IF NOT EXISTS vector;

-- Vector storage table with limited optimization capabilities
CREATE TABLE document_embeddings (
    document_id BIGSERIAL PRIMARY KEY,
    document_title TEXT NOT NULL,
    document_content TEXT NOT NULL,
    document_category VARCHAR(100),
    document_metadata JSONB,

    -- Vector storage (limited to specific dimensions)
    content_embedding vector(1536),  -- OpenAI ada-002 dimensions
    title_embedding vector(1536),

    -- Metadata for AI processing
    embedding_model VARCHAR(100) DEFAULT 'text-embedding-ada-002',
    embedding_created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Document processing
    word_count INTEGER,
    language_code VARCHAR(10),
    content_hash VARCHAR(64),

    -- System metadata
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Constraints and indexing
    CONSTRAINT valid_content_length CHECK (LENGTH(document_content) > 0),
    CONSTRAINT valid_embedding_dimensions CHECK (vector_dims(content_embedding) = 1536)
);

-- Create vector indexes (limited optimization options)
CREATE INDEX idx_content_embedding_cosine ON document_embeddings 
USING ivfflat (content_embedding vector_cosine_ops) 
WITH (lists = 100);

CREATE INDEX idx_title_embedding_l2 ON document_embeddings 
USING ivfflat (title_embedding vector_l2_ops) 
WITH (lists = 100);

-- Standard indexes for hybrid search
CREATE INDEX idx_category_created ON document_embeddings (document_category, created_at DESC);
CREATE INDEX idx_metadata_gin ON document_embeddings USING GIN (document_metadata);
CREATE INDEX idx_content_hash ON document_embeddings (content_hash);

-- Vector similarity search with limited performance and scalability
WITH vector_search_results AS (
    SELECT 
        document_id,
        document_title,
        document_content,
        document_category,
        document_metadata,

        -- Similarity calculations (computationally expensive)
        1 - (content_embedding <=> $1::vector) as cosine_similarity,
        content_embedding <-> $1::vector as l2_distance,
        content_embedding <#> $1::vector as inner_product,

        -- Metadata matching
        word_count,
        language_code,
        created_at

    FROM document_embeddings
    WHERE 
        -- Pre-filtering to reduce vector search scope
        document_category = $2  -- Category filter
        AND language_code = $3  -- Language filter
        AND created_at >= $4    -- Date range filter

        -- Vector similarity threshold (rough filtering)
        AND content_embedding <=> $1::vector < 0.3  -- Cosine distance threshold

    ORDER BY content_embedding <=> $1::vector  -- Sort by similarity
    LIMIT 50  -- Limit to manage performance
),

enhanced_results AS (
    SELECT 
        vsr.*,

        -- Additional metadata enrichment (limited capabilities)
        CASE 
            WHEN cosine_similarity >= 0.8 THEN 'highly_relevant'
            WHEN cosine_similarity >= 0.6 THEN 'relevant' 
            WHEN cosine_similarity >= 0.4 THEN 'somewhat_relevant'
            ELSE 'low_relevance'
        END as relevance_category,

        -- Content analysis (basic text processing only)
        LENGTH(document_content) as content_length,
        array_length(string_to_array(document_content, ' '), 1) as estimated_word_count,

        -- Ranking score combination
        (cosine_similarity * 0.7 + 
         CASE WHEN document_metadata->>'priority' = 'high' THEN 0.3 ELSE 0.0 END) as combined_score,

        -- Query metadata
        CURRENT_TIMESTAMP as search_performed_at

    FROM vector_search_results vsr
)

SELECT 
    document_id,
    document_title,
    LEFT(document_content, 200) || '...' as content_preview,
    document_category,

    -- Similarity metrics
    ROUND(cosine_similarity::NUMERIC, 4) as similarity_score,
    relevance_category,
    ROUND(combined_score::NUMERIC, 4) as ranking_score,

    -- Document metadata
    word_count,
    content_length,
    language_code,
    TO_CHAR(created_at, 'YYYY-MM-DD HH24:MI') as document_created,

    -- Search metadata
    search_performed_at

FROM enhanced_results
WHERE cosine_similarity >= 0.3  -- Minimum relevance threshold
ORDER BY combined_score DESC, cosine_similarity DESC
LIMIT 20;

-- Complex RAG (Retrieval-Augmented Generation) implementation
CREATE OR REPLACE FUNCTION execute_rag_query(
    query_embedding vector(1536),
    query_text TEXT,
    context_limit INTEGER DEFAULT 5,
    similarity_threshold NUMERIC DEFAULT 0.4
) RETURNS TABLE (
    context_documents JSONB,
    total_context_length INTEGER,
    average_similarity NUMERIC,
    generated_response TEXT
) AS $$
DECLARE
    context_docs JSONB := '[]'::JSONB;
    total_length INTEGER := 0;
    avg_similarity NUMERIC;
    doc_record RECORD;
    context_text TEXT := '';
BEGIN
    -- Retrieve relevant documents for context
    FOR doc_record IN
        SELECT 
            document_title,
            document_content,
            1 - (content_embedding <=> query_embedding) as similarity,
            LENGTH(document_content) as content_length
        FROM document_embeddings
        WHERE 1 - (content_embedding <=> query_embedding) >= similarity_threshold
        ORDER BY content_embedding <=> query_embedding
        LIMIT context_limit
    LOOP
        -- Build context for generation
        context_docs := context_docs || jsonb_build_object(
            'title', doc_record.document_title,
            'content', LEFT(doc_record.document_content, 1000),
            'similarity', doc_record.similarity,
            'length', doc_record.content_length
        );

        context_text := context_text || E'\n\n' || doc_record.document_title || E':\n' || 
                        LEFT(doc_record.document_content, 1000);
        total_length := total_length + doc_record.content_length;
    END LOOP;

    -- Calculate average similarity
    SELECT AVG((doc->>'similarity')::NUMERIC) INTO avg_similarity
    FROM jsonb_array_elements(context_docs) as doc;

    -- Return context information (actual LLM generation would be external)
    RETURN QUERY SELECT 
        context_docs,
        total_length,
        COALESCE(avg_similarity, 0.0),
        'Generated response would be created by external LLM service using context: ' || 
        LEFT(context_text, 200) || '...' as generated_response;

END;
$$ LANGUAGE plpgsql;

-- Execute RAG query (requires external LLM integration)
SELECT * FROM execute_rag_query(
    $1::vector,  -- Query embedding
    'What are the best practices for machine learning?',  -- Original query
    5,  -- Context documents limit
    0.4 -- Similarity threshold
);

-- Problems with traditional vector search approaches:
-- 1. Limited vector dimensions and performance optimization
-- 2. Complex setup and maintenance of vector extensions
-- 3. Poor integration between vector search and document metadata
-- 4. Limited scaling capabilities for high-dimensional vectors
-- 5. No native support for multiple similarity metrics
-- 6. Complex hybrid search combining vector and traditional queries
-- 7. Limited machine learning pipeline integration
-- 8. Expensive computational overhead for similarity calculations
-- 9. No native support for embedding model versioning
-- 10. Difficult operational management of vector indexes

MongoDB Atlas Vector Search provides native, high-performance vector operations:

// MongoDB Atlas Vector Search - native AI-powered vector similarity with unified data management
const { MongoClient } = require('mongodb');

// Advanced Atlas Vector Search Manager
class AtlasVectorSearchManager {
  constructor(connectionString, vectorConfig = {}) {
    this.connectionString = connectionString;
    this.client = null;
    this.db = null;

    // Vector search configuration
    this.config = {
      // Embedding configuration
      defaultEmbeddingModel: vectorConfig.defaultEmbeddingModel || 'text-embedding-ada-002',
      embeddingDimensions: vectorConfig.embeddingDimensions || 1536,
      embeddingProvider: vectorConfig.embeddingProvider || 'openai',

      // Vector index configuration
      vectorIndexes: vectorConfig.vectorIndexes || {},
      similarityMetrics: vectorConfig.similarityMetrics || ['cosine', 'euclidean', 'dotProduct'],

      // Search optimization
      enableHybridSearch: vectorConfig.enableHybridSearch !== false,
      enableSemanticCaching: vectorConfig.enableSemanticCaching !== false,
      defaultSearchLimit: vectorConfig.defaultSearchLimit || 20,

      // Performance tuning
      numCandidates: vectorConfig.numCandidates || 100,
      searchThreads: vectorConfig.searchThreads || 4,

      // AI integration
      enableRAGPipeline: vectorConfig.enableRAGPipeline !== false,
      enableRecommendations: vectorConfig.enableRecommendations !== false,
      enableSemanticAnalytics: vectorConfig.enableSemanticAnalytics !== false
    };

    this.initializeVectorSearch();
  }

  async initializeVectorSearch() {
    console.log('Initializing Atlas Vector Search system...');

    try {
      // Connect to MongoDB Atlas
      this.client = new MongoClient(this.connectionString);
      await this.client.connect();
      this.db = this.client.db();

      // Setup collections and vector indexes
      await this.setupVectorInfrastructure();

      // Initialize AI integration services
      await this.setupAIIntegration();

      console.log('Atlas Vector Search system initialized successfully');

    } catch (error) {
      console.error('Error initializing vector search:', error);
      throw error;
    }
  }

  async setupVectorInfrastructure() {
    console.log('Setting up vector search infrastructure...');

    try {
      // Create collections with optimized configuration
      this.collections = {
        documents: this.db.collection('documents'),
        userInteractions: this.db.collection('user_interactions'),
        searchAnalytics: this.db.collection('search_analytics'),
        embeddingCache: this.db.collection('embedding_cache')
      };

      // Create vector search indexes
      await this.createVectorIndexes();

      // Create supporting indexes for hybrid search
      await this.createHybridSearchIndexes();

      console.log('Vector infrastructure setup completed');

    } catch (error) {
      console.error('Error setting up vector infrastructure:', error);
      throw error;
    }
  }

  async createVectorIndexes() {
    console.log('Creating optimized vector search indexes...');

    try {
      // Primary content vector index with multiple similarity metrics
      const contentVectorIndex = {
        name: "vector_index_content_embeddings",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "contentEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"  // Primary similarity metric
            },
            {
              type: "filter",
              path: "category"
            },
            {
              type: "filter", 
              path: "language"
            },
            {
              type: "filter",
              path: "tags"
            },
            {
              type: "filter",
              path: "metadata.contentType"
            },
            {
              type: "filter",
              path: "isPublished"
            }
          ]
        }
      };

      // Title/summary vector index for heading-based search
      const titleVectorIndex = {
        name: "vector_index_title_embeddings",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "titleEmbedding", 
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "category"
            },
            {
              type: "filter",
              path: "metadata.priority"
            }
          ]
        }
      };

      // Multi-modal vector index for images and rich content
      const multiModalVectorIndex = {
        name: "vector_index_multimodal_embeddings",
        type: "vectorSearch", 
        definition: {
          fields: [
            {
              type: "vector",
              path: "imageEmbedding",
              numDimensions: 768,  // CLIP model dimensions
              similarity: "cosine"
            },
            {
              type: "vector",
              path: "textEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "mediaType"
            }
          ]
        }
      };

      // User behavior vector index for recommendations
      const userVectorIndex = {
        name: "vector_index_user_preferences",
        type: "vectorSearch",
        definition: {
          fields: [
            {
              type: "vector",
              path: "preferenceEmbedding",
              numDimensions: this.config.embeddingDimensions,
              similarity: "cosine"
            },
            {
              type: "filter",
              path: "userSegment"
            },
            {
              type: "filter",
              path: "isActive"
            }
          ]
        }
      };

      // Create the vector indexes
      const indexCreationTasks = [
        this.createCollectionIndex('documents', contentVectorIndex),
        this.createCollectionIndex('documents', titleVectorIndex),
        this.createCollectionIndex('documents', multiModalVectorIndex),
        this.createCollectionIndex('userInteractions', userVectorIndex)
      ];

      await Promise.all(indexCreationTasks);

      console.log('Vector indexes created successfully');

    } catch (error) {
      console.error('Error creating vector indexes:', error);
      throw error;
    }
  }

  async createCollectionIndex(collectionName, indexDefinition) {
    try {
      const collection = this.collections[collectionName];
      await collection.createIndex(
        indexDefinition.definition.fields.reduce((acc, field) => {
          if (field.type === 'vector') {
            acc[field.path] = 'vector';
          }
          return acc;
        }, {}),
        {
          name: indexDefinition.name,
          background: true
        }
      );
    } catch (error) {
      console.error(`Error creating index ${indexDefinition.name}:`, error);
    }
  }

  async performAdvancedVectorSearch(searchConfig) {
    console.log('Performing advanced vector search...');

    const searchStartTime = Date.now();

    try {
      // Build comprehensive vector search aggregation pipeline
      const vectorSearchPipeline = [
        // Stage 1: Vector similarity search
        {
          $vectorSearch: {
            index: searchConfig.indexName || "vector_index_content_embeddings",
            path: searchConfig.vectorPath || "contentEmbedding",
            queryVector: searchConfig.queryVector,
            numCandidates: searchConfig.numCandidates || this.config.numCandidates,
            limit: searchConfig.limit || this.config.defaultSearchLimit,

            // Advanced filtering for hybrid search
            filter: {
              $and: [
                ...(searchConfig.categoryFilter ? [{ category: { $in: searchConfig.categoryFilter } }] : []),
                ...(searchConfig.languageFilter ? [{ language: searchConfig.languageFilter }] : []),
                ...(searchConfig.dateRange ? [{
                  createdAt: {
                    $gte: searchConfig.dateRange.start,
                    $lte: searchConfig.dateRange.end
                  }
                }] : []),
                ...(searchConfig.tagsFilter ? [{ tags: { $in: searchConfig.tagsFilter } }] : []),
                ...(searchConfig.customFilters || [])
              ]
            }
          }
        },

        // Stage 2: Add similarity score and metadata enrichment
        {
          $addFields: {
            // Similarity scoring and ranking
            searchScore: { $meta: "vectorSearchScore" },
            searchRank: { $meta: "vectorSearchRank" },

            // Content analysis and metadata
            contentLength: { $strLenCP: "$content" },
            wordCount: {
              $size: {
                $split: ["$content", " "]
              }
            },

            // Relevance classification
            relevanceCategory: {
              $switch: {
                branches: [
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.8] },
                    then: "highly_relevant"
                  },
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.6] },
                    then: "relevant"
                  },
                  {
                    case: { $gte: [{ $meta: "vectorSearchScore" }, 0.4] },
                    then: "somewhat_relevant"
                  }
                ],
                default: "low_relevance"
              }
            },

            // Enhanced ranking with business logic
            enhancedScore: {
              $add: [
                { $meta: "vectorSearchScore" },

                // Boost for high-priority content
                {
                  $cond: [
                    { $eq: ["$metadata.priority", "high"] },
                    0.1,
                    0
                  ]
                },

                // Boost for recent content
                {
                  $cond: [
                    {
                      $gte: [
                        "$createdAt",
                        { $subtract: [new Date(), 7 * 24 * 60 * 60 * 1000] }
                      ]
                    },
                    0.05,
                    0
                  ]
                },

                // Boost for popular content
                {
                  $multiply: [
                    { $divide: [{ $ifNull: ["$analytics.viewCount", 0] }, 1000] },
                    0.02
                  ]
                }
              ]
            },

            // Search metadata
            searchMetadata: {
              queryProcessedAt: new Date(),
              indexUsed: searchConfig.indexName || "vector_index_content_embeddings",
              numCandidatesSearched: searchConfig.numCandidates || this.config.numCandidates
            }
          }
        },

        // Stage 3: Content enrichment and analysis
        {
          $lookup: {
            from: "user_interactions",
            let: { docId: "$_id" },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $eq: ["$documentId", "$$docId"] },
                      { $gte: ["$interactionDate", { $subtract: [new Date(), 30 * 24 * 60 * 60 * 1000] }] }
                    ]
                  }
                }
              },
              {
                $group: {
                  _id: null,
                  totalInteractions: { $sum: 1 },
                  averageRating: { $avg: "$rating" },
                  interactionTypes: { $addToSet: "$interactionType" },
                  uniqueUsers: { $addToSet: "$userId" }
                }
              }
            ],
            as: "recentInteractions"
          }
        },

        // Stage 4: Related content discovery
        {
          $lookup: {
            from: "documents",
            let: { 
              currentCategory: "$category",
              currentTags: "$tags",
              currentId: "$_id"
            },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $ne: ["$_id", "$$currentId"] },
                      { 
                        $or: [
                          { $eq: ["$category", "$$currentCategory"] },
                          { $in: ["$$currentTags", "$tags"] }
                        ]
                      }
                    ]
                  }
                }
              },
              {
                $sample: { size: 3 }
              },
              {
                $project: {
                  _id: 1,
                  title: 1,
                  category: 1,
                  tags: 1
                }
              }
            ],
            as: "relatedContent"
          }
        },

        // Stage 5: Final enrichment and formatting
        {
          $addFields: {
            // Interaction analytics
            interactionMetrics: {
              $cond: [
                { $gt: [{ $size: "$recentInteractions" }, 0] },
                {
                  totalInteractions: { $arrayElemAt: ["$recentInteractions.totalInteractions", 0] },
                  averageRating: { $arrayElemAt: ["$recentInteractions.averageRating", 0] },
                  uniqueUserCount: { $size: { $arrayElemAt: ["$recentInteractions.uniqueUsers", 0] } }
                },
                {
                  totalInteractions: 0,
                  averageRating: null,
                  uniqueUserCount: 0
                }
              ]
            },

            // Content summary for preview
            contentPreview: {
              $concat: [
                { $substr: ["$content", 0, 200] },
                "..."
              ]
            },

            // Final ranking score incorporating all factors
            finalScore: {
              $add: [
                "$enhancedScore",

                // Interaction quality boost
                {
                  $multiply: [
                    { $ifNull: ["$interactionMetrics.averageRating", 0] },
                    0.02
                  ]
                },

                // Engagement boost
                {
                  $multiply: [
                    { $divide: [{ $ifNull: ["$interactionMetrics.totalInteractions", 0] }, 100] },
                    0.03
                  ]
                }
              ]
            }
          }
        },

        // Stage 6: Final projection and cleanup
        {
          $project: {
            // Core content information
            title: 1,
            contentPreview: 1,
            category: 1,
            tags: 1,
            author: 1,
            createdAt: 1,
            language: 1,

            // Search relevance metrics
            searchScore: { $round: ["$searchScore", 4] },
            enhancedScore: { $round: ["$enhancedScore", 4] },
            finalScore: { $round: ["$finalScore", 4] },
            relevanceCategory: 1,
            searchRank: 1,

            // Content metrics
            contentLength: 1,
            wordCount: 1,

            // Engagement metrics
            interactionMetrics: 1,

            // Related content
            relatedContent: 1,

            // Metadata
            metadata: 1,
            searchMetadata: 1
          }
        },

        // Stage 7: Final sorting and ranking
        {
          $sort: {
            finalScore: -1,
            searchScore: -1,
            createdAt: -1
          }
        }
      ];

      // Execute the comprehensive vector search
      const searchResults = await this.collections.documents
        .aggregate(vectorSearchPipeline, {
          allowDiskUse: true,
          maxTimeMS: 30000
        })
        .toArray();

      const searchLatency = Date.now() - searchStartTime;

      // Log search analytics
      await this.logSearchAnalytics({
        queryVector: searchConfig.queryVector,
        resultsCount: searchResults.length,
        searchLatency: searchLatency,
        searchConfig: searchConfig,
        timestamp: new Date()
      });

      console.log(`Vector search completed: ${searchResults.length} results in ${searchLatency}ms`);

      return {
        success: true,
        results: searchResults,
        searchMetadata: {
          latency: searchLatency,
          resultsCount: searchResults.length,
          indexUsed: searchConfig.indexName || "vector_index_content_embeddings",
          numCandidatesSearched: searchConfig.numCandidates || this.config.numCandidates
        }
      };

    } catch (error) {
      console.error('Error performing vector search:', error);
      return {
        success: false,
        error: error.message,
        searchMetadata: {
          latency: Date.now() - searchStartTime
        }
      };
    }
  }

  async executeRAGPipeline(queryText, searchConfig = {}) {
    console.log('Executing RAG (Retrieval-Augmented Generation) pipeline...');

    try {
      // Generate query embedding (in production, this would call embedding API)
      const queryEmbedding = await this.generateEmbedding(queryText);

      // Perform vector search for relevant context
      const contextSearch = await this.performAdvancedVectorSearch({
        ...searchConfig,
        queryVector: queryEmbedding,
        limit: searchConfig.contextLimit || 5,
        numCandidates: searchConfig.numCandidates || 50
      });

      if (!contextSearch.success) {
        throw new Error(`Context retrieval failed: ${contextSearch.error}`);
      }

      // Build context for generation
      const contextDocuments = contextSearch.results;
      const contextText = contextDocuments
        .map((doc, index) => {
          return `Document ${index + 1} (Relevance: ${doc.relevanceCategory}):\nTitle: ${doc.title}\nContent: ${doc.contentPreview}`;
        })
        .join('\n\n');

      // Calculate context quality metrics
      const contextMetrics = {
        documentCount: contextDocuments.length,
        averageRelevance: contextDocuments.reduce((sum, doc) => sum + doc.searchScore, 0) / contextDocuments.length,
        totalContextLength: contextText.length,
        categories: [...new Set(contextDocuments.map(doc => doc.category))],
        languages: [...new Set(contextDocuments.map(doc => doc.language))]
      };

      // In production, this would call LLM API for generation
      const generatedResponse = await this.simulateResponseGeneration(queryText, contextText, contextMetrics);

      // Store RAG execution for analytics
      await this.logRAGExecution({
        query: queryText,
        contextMetrics: contextMetrics,
        responseGenerated: true,
        executionTime: Date.now(),
        contextDocuments: contextDocuments.map(doc => ({
          id: doc._id,
          title: doc.title,
          relevanceScore: doc.searchScore
        }))
      });

      return {
        success: true,
        query: queryText,
        contextDocuments: contextDocuments,
        contextMetrics: contextMetrics,
        generatedResponse: generatedResponse,
        searchMetadata: contextSearch.searchMetadata
      };

    } catch (error) {
      console.error('Error executing RAG pipeline:', error);
      return {
        success: false,
        error: error.message
      };
    }
  }

  async generateRecommendations(userId, recommendationConfig = {}) {
    console.log(`Generating recommendations for user: ${userId}`);

    try {
      // Get user interaction history and preferences
      const userProfile = await this.buildUserProfile(userId);

      if (!userProfile.success) {
        throw new Error(`Unable to build user profile: ${userProfile.error}`);
      }

      // Generate user preference embedding
      const userEmbedding = await this.generateUserPreferenceEmbedding(userProfile.data);

      // Find similar content based on user preferences
      const recommendationSearch = await this.performAdvancedVectorSearch({
        queryVector: userEmbedding,
        indexName: "vector_index_content_embeddings",
        limit: recommendationConfig.limit || 10,
        numCandidates: recommendationConfig.numCandidates || 100,

        // Filter out already interacted content
        customFilters: [
          {
            _id: {
              $nin: userProfile.data.interactedDocuments || []
            }
          }
        ]
      });

      // Find similar users for collaborative filtering
      const similarUsers = await this.findSimilarUsers(userId, userEmbedding);

      // Combine content-based and collaborative filtering
      const hybridRecommendations = await this.combineRecommendationStrategies(
        recommendationSearch.results,
        similarUsers,
        userProfile.data
      );

      return {
        success: true,
        userId: userId,
        recommendations: hybridRecommendations,
        recommendationMetadata: {
          contentBasedCount: recommendationSearch.results.length,
          collaborativeSignals: similarUsers.length,
          userProfileStrength: userProfile.data.profileStrength
        }
      };

    } catch (error) {
      console.error('Error generating recommendations:', error);
      return {
        success: false,
        error: error.message
      };
    }
  }

  async buildUserProfile(userId) {
    try {
      // Aggregate user interaction data
      const userInteractionPipeline = [
        {
          $match: {
            userId: userId,
            interactionDate: {
              $gte: new Date(Date.now() - 90 * 24 * 60 * 60 * 1000) // Last 90 days
            }
          }
        },
        {
          $lookup: {
            from: "documents",
            localField: "documentId",
            foreignField: "_id",
            as: "document"
          }
        },
        {
          $unwind: "$document"
        },
        {
          $group: {
            _id: "$userId",

            // Interaction patterns
            totalInteractions: { $sum: 1 },
            categories: { $addToSet: "$document.category" },
            tags: { $addToSet: "$document.tags" },
            languages: { $addToSet: "$document.language" },

            // Preference indicators
            averageRating: { $avg: "$rating" },
            favoriteCategories: {
              $push: {
                category: "$document.category",
                rating: "$rating",
                interactionType: "$interactionType"
              }
            },

            // Content characteristics
            preferredContentLength: { $avg: { $strLenCP: "$document.content" } },
            interactionTypes: { $addToSet: "$interactionType" },

            // Temporal patterns
            interactedDocuments: { $addToSet: "$documentId" },
            recentInteractions: {
              $push: {
                documentId: "$documentId",
                rating: "$rating",
                interactionDate: "$interactionDate"
              }
            }
          }
        }
      ];

      const userProfileData = await this.collections.userInteractions
        .aggregate(userInteractionPipeline)
        .toArray();

      if (userProfileData.length === 0) {
        return {
          success: false,
          error: 'No user interaction data found'
        };
      }

      const profile = userProfileData[0];

      // Calculate profile strength
      profile.profileStrength = Math.min(
        (profile.totalInteractions / 50) * 0.4 +
        (profile.categories.length / 10) * 0.3 +
        (profile.tags.flat().length / 20) * 0.3,
        1.0
      );

      return {
        success: true,
        data: profile
      };

    } catch (error) {
      return {
        success: false,
        error: error.message
      };
    }
  }

  async generateEmbedding(text) {
    // In production, this would call OpenAI or other embedding API
    // For demonstration, return a mock embedding
    return Array.from({ length: this.config.embeddingDimensions }, () => Math.random() - 0.5);
  }

  async generateUserPreferenceEmbedding(userProfile) {
    // In production, this would create embeddings based on user preferences
    // For demonstration, return a mock embedding based on user categories
    const categoryWeights = userProfile.favoriteCategories.reduce((acc, item) => {
      acc[item.category] = (acc[item.category] || 0) + (item.rating || 3);
      return acc;
    }, {});

    // Create weighted embedding (simplified approach)
    return Array.from({ length: this.config.embeddingDimensions }, () => Math.random() - 0.5);
  }

  async simulateResponseGeneration(queryText, contextText, contextMetrics) {
    // In production, this would call ChatGPT or another LLM
    return `Based on ${contextMetrics.documentCount} relevant documents with average relevance of ${contextMetrics.averageRelevance.toFixed(3)}, here's a comprehensive response to "${queryText}": [Generated response would appear here using the provided context from ${contextMetrics.categories.join(', ')} categories]`;
  }

  async logSearchAnalytics(searchData) {
    try {
      await this.collections.searchAnalytics.insertOne({
        ...searchData,
        createdAt: new Date()
      });
    } catch (error) {
      console.error('Error logging search analytics:', error);
    }
  }

  async logRAGExecution(ragData) {
    try {
      await this.collections.searchAnalytics.insertOne({
        type: 'rag_execution',
        ...ragData,
        createdAt: new Date()
      });
    } catch (error) {
      console.error('Error logging RAG execution:', error);
    }
  }

  async shutdown() {
    console.log('Shutting down Atlas Vector Search manager...');

    try {
      if (this.client) {
        await this.client.close();
      }
      console.log('Atlas Vector Search manager shutdown completed');
    } catch (error) {
      console.error('Error during shutdown:', error);
    }
  }
}

// Benefits of MongoDB Atlas Vector Search:
// - Native vector similarity search with automatic optimization
// - Seamless integration with existing MongoDB data and operations  
// - Advanced hybrid search combining vector similarity with traditional queries
// - Multiple similarity metrics (cosine, euclidean, dot product) in single platform
// - Automatic scaling and performance optimization for vector workloads
// - Built-in support for multi-modal embeddings and AI model integration
// - Real-time vector search with consistent results and ACID transactions
// - SQL-compatible vector operations through QueryLeaf integration
// - Enterprise-ready security, monitoring, and operational management
// - Native support for RAG pipelines and recommendation systems

module.exports = {
  AtlasVectorSearchManager
};

Understanding Atlas Vector Search Architecture

Advanced AI Integration Patterns

MongoDB Atlas Vector Search enables sophisticated AI application architectures with native vector operations:

// Enterprise AI Application Architecture with Atlas Vector Search
class EnterpriseAIVectorPlatform extends AtlasVectorSearchManager {
  constructor(connectionString, aiConfig) {
    super(connectionString, aiConfig);

    this.aiConfig = {
      ...aiConfig,
      enableMultiModalSearch: true,
      enableRealtimeRecommendations: true,
      enableSemanticAnalytics: true,
      enableContentGeneration: true,
      enableKnowledgeGraphs: true
    };

    this.setupEnterpriseAICapabilities();
  }

  async implementAdvancedAIWorkflows() {
    console.log('Implementing enterprise AI workflows...');

    const aiWorkflows = {
      // Multi-modal content processing
      multiModalProcessing: {
        textEmbeddings: true,
        imageEmbeddings: true,
        audioEmbeddings: true,
        videoEmbeddings: true
      },

      // Advanced recommendation engines
      recommendationSystems: {
        contentBasedFiltering: true,
        collaborativeFiltering: true,
        hybridRecommendations: true,
        realTimePersonalization: true
      },

      // Knowledge management and RAG
      knowledgeManagement: {
        documentRetrieval: true,
        contextGeneration: true,
        responseGeneration: true,
        knowledgeGraphIntegration: true
      },

      // Semantic search and discovery
      semanticCapabilities: {
        intentRecognition: true,
        entityExtraction: true,
        topicModeling: true,
        conceptualSearch: true
      }
    };

    return await this.deployAIWorkflows(aiWorkflows);
  }

  async setupRealtimePersonalization() {
    console.log('Setting up real-time personalization engine...');

    // Real-time user behavior processing
    const personalizationPipeline = [
      {
        $match: {
          timestamp: { $gte: new Date(Date.now() - 60000) } // Last minute
        }
      },
      {
        $lookup: {
          from: "user_profiles",
          localField: "userId", 
          foreignField: "userId",
          as: "userProfile"
        }
      },
      {
        $vectorSearch: {
          index: "vector_index_user_preferences",
          path: "userProfile.preferenceEmbedding",
          queryVector: "$behaviorEmbedding",
          numCandidates: 100,
          limit: 20
        }
      }
    ];

    return await this.deployPersonalizationEngine(personalizationPipeline);
  }
}

SQL-Style Vector Search Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB Atlas Vector Search operations:

-- QueryLeaf advanced vector search operations with SQL-familiar syntax

-- Configure Atlas Vector Search capabilities  
CONFIGURE VECTOR_SEARCH
SET provider = 'mongodb_atlas',
    default_embedding_model = 'text-embedding-ada-002',
    embedding_dimensions = 1536,
    similarity_metrics = ['cosine', 'euclidean', 'dot_product'],
    enable_hybrid_search = true,
    enable_semantic_caching = true,
    default_num_candidates = 100,
    default_search_limit = 20;

-- Create vector-optimized table for AI applications
CREATE TABLE ai_documents (
    document_id UUID PRIMARY KEY,
    title TEXT NOT NULL,
    content TEXT NOT NULL,
    category VARCHAR(100),
    language VARCHAR(10) DEFAULT 'en',

    -- Vector embeddings for semantic search
    content_embedding VECTOR(1536),
    title_embedding VECTOR(1536),
    summary_embedding VECTOR(1536),

    -- Multi-modal embeddings
    image_embedding VECTOR(768),   -- CLIP embeddings
    audio_embedding VECTOR(512),   -- Audio model embeddings

    -- Metadata for hybrid search
    tags TEXT[],
    author VARCHAR(200),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Content analysis
    word_count INTEGER,
    readability_score DECIMAL(5,2),
    sentiment_score DECIMAL(3,2),

    -- Engagement metrics
    view_count BIGINT DEFAULT 0,
    like_count BIGINT DEFAULT 0,
    share_count BIGINT DEFAULT 0,
    average_rating DECIMAL(3,2),

    -- AI metadata
    embedding_model VARCHAR(100) DEFAULT 'text-embedding-ada-002',
    embedding_version VARCHAR(20) DEFAULT 'v1',
    last_embedding_update TIMESTAMP DEFAULT CURRENT_TIMESTAMP,

    -- Vector search optimization
    is_published BOOLEAN DEFAULT true,
    content_type VARCHAR(50),
    priority_level INTEGER DEFAULT 1

) WITH (
    vector_indexes = [
        {
            name: 'idx_content_vector_search',
            path: 'content_embedding', 
            similarity: 'cosine',
            dimensions: 1536,
            filters: ['category', 'language', 'is_published', 'content_type']
        },
        {
            name: 'idx_title_vector_search',
            path: 'title_embedding',
            similarity: 'cosine', 
            dimensions: 1536,
            filters: ['category', 'priority_level']
        },
        {
            name: 'idx_multimodal_vector_search',
            path: ['content_embedding', 'image_embedding'],
            similarity: 'cosine',
            filters: ['content_type', 'language']
        }
    ]
);

-- Advanced semantic search with hybrid filtering
WITH semantic_search_results AS (
  SELECT 
    document_id,
    title,
    content,
    category,
    language,
    tags,
    author,
    created_at,
    word_count,
    view_count,
    average_rating,

    -- Vector similarity scoring
    VECTOR_SIMILARITY(content_embedding, $1, 'cosine') as content_similarity,
    VECTOR_SIMILARITY(title_embedding, $1, 'cosine') as title_similarity,
    VECTOR_SIMILARITY(summary_embedding, $1, 'cosine') as summary_similarity,

    -- Distance metrics for different use cases
    VECTOR_DISTANCE(content_embedding, $1, 'euclidean') as euclidean_distance,
    VECTOR_DISTANCE(content_embedding, $1, 'manhattan') as manhattan_distance,

    -- Hybrid ranking factors
    LOG(view_count + 1) as popularity_score,
    COALESCE(average_rating, 3.0) as quality_score,

    -- Temporal relevance
    EXTRACT(DAYS FROM (CURRENT_TIMESTAMP - created_at)) as days_old,
    CASE 
      WHEN created_at >= CURRENT_TIMESTAMP - INTERVAL '7 days' THEN 0.1
      WHEN created_at >= CURRENT_TIMESTAMP - INTERVAL '30 days' THEN 0.05
      ELSE 0.0
    END as recency_boost

  FROM ai_documents
  WHERE 
    -- Vector search with advanced filtering
    VECTOR_SEARCH(
      content_embedding,
      $1,  -- Query embedding vector
      similarity_metric => 'cosine',
      num_candidates => 100,
      filters => {
        'category': $2,        -- Category filter
        'language': $3,        -- Language filter  
        'is_published': true,
        'content_type': $4     -- Content type filter
      }
    )
    AND created_at >= $5       -- Date range filter
    AND word_count >= $6       -- Minimum content length
    AND (tags && $7 OR $7 IS NULL)  -- Tag overlap filter
),

enhanced_ranking AS (
  SELECT 
    ssr.*,

    -- Multi-factor ranking calculation
    (
      content_similarity * 0.4 +           -- Primary semantic similarity
      title_similarity * 0.2 +             -- Title relevance
      summary_similarity * 0.1 +           -- Summary relevance
      (popularity_score / 10.0) * 0.1 +    -- Engagement factor
      (quality_score / 5.0) * 0.1 +        -- Quality factor
      recency_boost +                       -- Temporal relevance
      CASE 
        WHEN priority_level >= 5 THEN 0.1   -- Priority boost
        ELSE 0.0 
      END
    ) as composite_relevance_score,

    -- Content analysis and categorization
    CASE 
      WHEN content_similarity >= 0.8 THEN 'highly_relevant'
      WHEN content_similarity >= 0.6 THEN 'relevant'
      WHEN content_similarity >= 0.4 THEN 'somewhat_relevant'
      ELSE 'marginally_relevant'
    END as relevance_category,

    -- Semantic clustering for diverse results
    NTILE(5) OVER (ORDER BY content_similarity DESC) as relevance_tier,

    -- Content quality indicators
    CASE 
      WHEN word_count >= 2000 AND average_rating >= 4.0 THEN 'comprehensive_high_quality'
      WHEN word_count >= 1000 AND average_rating >= 3.5 THEN 'detailed_good_quality'
      WHEN word_count >= 500 AND average_rating >= 3.0 THEN 'standard_quality'
      ELSE 'basic_content'
    END as content_quality_tier,

    -- Engagement performance metrics
    (view_count * 0.3 + like_count * 0.4 + share_count * 0.3) as engagement_score,

    -- Search result preview
    LEFT(content, 300) || CASE WHEN LENGTH(content) > 300 THEN '...' ELSE '' END as content_preview

  FROM semantic_search_results ssr
),

diversity_optimization AS (
  SELECT 
    er.*,

    -- Category diversity to prevent over-concentration
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY composite_relevance_score DESC) as category_rank,

    -- Author diversity for varied perspectives  
    ROW_NUMBER() OVER (PARTITION BY author ORDER BY composite_relevance_score DESC) as author_rank,

    -- Temporal diversity for balanced timeline coverage
    ROW_NUMBER() OVER (
      PARTITION BY DATE_TRUNC('month', created_at) 
      ORDER BY composite_relevance_score DESC
    ) as temporal_rank,

    -- Content length diversity
    CASE 
      WHEN word_count <= 500 THEN 'short'
      WHEN word_count <= 1500 THEN 'medium' 
      WHEN word_count <= 3000 THEN 'long'
      ELSE 'comprehensive'
    END as content_length_category,

    -- Similarity to previous results (prevents near-duplicates)
    LAG(content_similarity, 1) OVER (ORDER BY composite_relevance_score DESC) as prev_result_similarity

  FROM enhanced_ranking er
)

-- Final search results with comprehensive analytics
SELECT 
  document_id,
  title,
  content_preview,
  category,
  language,
  author,
  TO_CHAR(created_at, 'YYYY-MM-DD HH24:MI') as published_date,

  -- Relevance and ranking metrics
  ROUND(content_similarity::NUMERIC, 4) as semantic_similarity,
  ROUND(composite_relevance_score::NUMERIC, 4) as final_relevance_score,
  relevance_category,
  relevance_tier,

  -- Content characteristics
  word_count,
  content_quality_tier,
  content_length_category,

  -- Engagement and quality indicators
  view_count,
  average_rating,
  ROUND(engagement_score::NUMERIC, 1) as engagement_score,

  -- Diversity indicators
  category_rank,
  author_rank,
  temporal_rank,

  -- Metadata
  tags,
  ROUND(euclidean_distance::NUMERIC, 4) as euclidean_distance,
  days_old,

  -- Search quality indicators
  CASE 
    WHEN ABS(content_similarity - COALESCE(prev_result_similarity, 0)) < 0.05 THEN 'potential_duplicate'
    WHEN composite_relevance_score >= 0.8 THEN 'excellent_match'
    WHEN composite_relevance_score >= 0.6 THEN 'good_match'
    WHEN composite_relevance_score >= 0.4 THEN 'fair_match'
    ELSE 'weak_match'
  END as search_quality_assessment,

  -- Performance metadata
  CURRENT_TIMESTAMP as search_executed_at

FROM diversity_optimization
WHERE 
  -- Quality thresholds
  composite_relevance_score >= 0.3
  AND content_similarity >= 0.2

  -- Diversity constraints (ensure balanced results)
  AND category_rank <= 3        -- Max 3 results per category
  AND author_rank <= 2          -- Max 2 results per author
  AND temporal_rank <= 2        -- Max 2 results per month

ORDER BY 
  composite_relevance_score DESC,
  content_similarity DESC,
  engagement_score DESC
LIMIT 20;

-- Real-time recommendation engine with collaborative filtering
CREATE MATERIALIZED VIEW user_recommendation_profiles AS
WITH user_interaction_patterns AS (
  SELECT 
    user_id,

    -- Interaction behavior analysis
    COUNT(*) as total_interactions,
    COUNT(DISTINCT document_id) as unique_documents_viewed,
    COUNT(DISTINCT category) as categories_explored,
    AVG(interaction_rating) as average_rating,

    -- Preference extraction from interactions
    ARRAY_AGG(DISTINCT category ORDER BY COUNT(*) DESC) as preferred_categories,
    ARRAY_AGG(DISTINCT tags) as interacted_tags,

    -- Temporal patterns
    AVG(EXTRACT(HOUR FROM interaction_timestamp)) as preferred_interaction_hour,
    MODE() WITHIN GROUP (ORDER BY EXTRACT(DOW FROM interaction_timestamp)) as preferred_day_of_week,

    -- Content preferences
    AVG(word_count) as preferred_content_length,
    AVG(readability_score) as preferred_readability,

    -- Engagement patterns
    SUM(CASE WHEN interaction_type = 'like' THEN 1 ELSE 0 END) as likes_given,
    SUM(CASE WHEN interaction_type = 'share' THEN 1 ELSE 0 END) as shares_made,
    SUM(CASE WHEN interaction_type = 'bookmark' THEN 1 ELSE 0 END) as bookmarks_created

  FROM user_interactions ui
  JOIN ai_documents ad ON ui.document_id = ad.document_id  
  WHERE ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '90 days'
    AND ui.interaction_rating IS NOT NULL
  GROUP BY user_id
  HAVING COUNT(*) >= 5  -- Minimum interaction threshold
),

user_preference_vectors AS (
  SELECT 
    uip.user_id,
    uip.total_interactions,
    uip.preferred_categories,
    uip.average_rating,

    -- Generate user preference embedding from interaction patterns
    VECTOR_AGGREGATE(
      ad.content_embedding,
      weights => ui.interaction_rating,
      aggregation_method => 'weighted_average'
    ) as preference_embedding,

    -- Category preference strengths
    JSONB_OBJECT_AGG(
      ad.category,
      AVG(ui.interaction_rating)
    ) as category_preference_scores,

    -- Content characteristics preferences
    uip.preferred_content_length,
    uip.preferred_readability,

    -- Profile completeness and reliability
    LEAST(uip.total_interactions / 50.0, 1.0) as profile_completeness,
    CURRENT_TIMESTAMP as profile_generated_at

  FROM user_interaction_patterns uip
  JOIN user_interactions ui ON uip.user_id = ui.user_id
  JOIN ai_documents ad ON ui.document_id = ad.document_id
  WHERE ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '90 days'
  GROUP BY 
    uip.user_id, uip.total_interactions, uip.preferred_categories, 
    uip.average_rating, uip.preferred_content_length, uip.preferred_readability
);

-- Advanced recommendation generation with multiple strategies
WITH target_user_profile AS (
  SELECT * FROM user_recommendation_profiles 
  WHERE user_id = $1  -- Target user for recommendations
),

content_based_recommendations AS (
  SELECT 
    ad.document_id,
    ad.title,
    ad.category,
    ad.content_preview,
    ad.author,
    ad.created_at,
    ad.average_rating,
    ad.view_count,

    -- Content similarity to user preferences
    VECTOR_SIMILARITY(
      ad.content_embedding, 
      tup.preference_embedding, 
      'cosine'
    ) as content_similarity,

    -- Category preference alignment
    COALESCE(
      (tup.category_preference_scores->>ad.category)::NUMERIC,
      tup.average_rating
    ) as category_preference_score,

    -- Content characteristic matching
    ABS(ad.word_count - tup.preferred_content_length) / 1000.0 as length_mismatch,
    ABS(ad.readability_score - tup.preferred_readability) as readability_mismatch,

    'content_based' as recommendation_strategy

  FROM ai_documents ad
  CROSS JOIN target_user_profile tup
  WHERE ad.is_published = true
    AND ad.document_id NOT IN (
      -- Exclude already interacted content
      SELECT DISTINCT document_id 
      FROM user_interactions 
      WHERE user_id = $1
      AND interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '30 days'
    )
  ORDER BY 
    content_similarity DESC,
    category_preference_score DESC
  LIMIT 50
),

collaborative_filtering AS (
  -- Find similar users based on preference vectors
  WITH similar_users AS (
    SELECT 
      urp.user_id as similar_user_id,
      VECTOR_SIMILARITY(
        urp.preference_embedding,
        tup.preference_embedding,
        'cosine'
      ) as user_similarity,
      urp.profile_completeness

    FROM user_recommendation_profiles urp
    CROSS JOIN target_user_profile tup
    WHERE urp.user_id != tup.user_id
      AND urp.profile_completeness >= 0.3  -- Reliable profiles only
    ORDER BY user_similarity DESC
    LIMIT 20  -- Top similar users
  ),

  collaborative_recommendations AS (
    SELECT 
      ad.document_id,
      ad.title,
      ad.category,
      ad.content_preview,
      ad.author,
      ad.created_at,
      ad.average_rating,
      ad.view_count,

      -- Weighted recommendation score from similar users
      AVG(ui.interaction_rating * su.user_similarity) as collaborative_score,
      COUNT(*) as similar_user_interactions,
      'collaborative_filtering' as recommendation_strategy

    FROM similar_users su
    JOIN user_interactions ui ON su.similar_user_id = ui.user_id
    JOIN ai_documents ad ON ui.document_id = ad.document_id
    WHERE ad.is_published = true
      AND ui.interaction_rating >= 3  -- Positive interactions only
      AND ui.interaction_timestamp >= CURRENT_TIMESTAMP - INTERVAL '60 days'
      AND ad.document_id NOT IN (
        -- Exclude target user's interactions
        SELECT DISTINCT document_id 
        FROM user_interactions 
        WHERE user_id = $1
      )
    GROUP BY 
      ad.document_id, ad.title, ad.category, ad.content_preview,
      ad.author, ad.created_at, ad.average_rating, ad.view_count
    HAVING COUNT(*) >= 2  -- Multiple similar users recommended
    ORDER BY collaborative_score DESC
    LIMIT 30
  )

  SELECT * FROM collaborative_recommendations
),

hybrid_recommendations AS (
  -- Combine content-based and collaborative filtering
  SELECT 
    COALESCE(cb.document_id, cf.document_id) as document_id,
    COALESCE(cb.title, cf.title) as title,
    COALESCE(cb.category, cf.category) as category,
    COALESCE(cb.content_preview, cf.content_preview) as content_preview,
    COALESCE(cb.author, cf.author) as author,
    COALESCE(cb.created_at, cf.created_at) as created_at,
    COALESCE(cb.average_rating, cf.average_rating) as average_rating,
    COALESCE(cb.view_count, cf.view_count) as view_count,

    -- Hybrid scoring
    COALESCE(cb.content_similarity, 0.0) * 0.6 +
    COALESCE(cf.collaborative_score, 0.0) * 0.4 as hybrid_score,

    cb.content_similarity,
    cf.collaborative_score,
    cf.similar_user_interactions,

    -- Recommendation diversity factors
    ROW_NUMBER() OVER (PARTITION BY COALESCE(cb.category, cf.category) ORDER BY 
      (COALESCE(cb.content_similarity, 0.0) * 0.6 + COALESCE(cf.collaborative_score, 0.0) * 0.4) DESC
    ) as category_rank,

    -- Final recommendation strategy
    CASE 
      WHEN cb.document_id IS NOT NULL AND cf.document_id IS NOT NULL THEN 'hybrid'
      WHEN cb.document_id IS NOT NULL THEN 'content_based'
      WHEN cf.document_id IS NOT NULL THEN 'collaborative'
    END as recommendation_source

  FROM content_based_recommendations cb
  FULL OUTER JOIN collaborative_filtering cf ON cb.document_id = cf.document_id
)

-- Final personalized recommendations
SELECT 
  document_id,
  title,
  content_preview,
  category,
  author,
  TO_CHAR(created_at, 'YYYY-MM-DD') as published_date,

  -- Recommendation scoring
  ROUND(hybrid_score::NUMERIC, 4) as recommendation_score,
  ROUND(content_similarity::NUMERIC, 4) as content_match,
  ROUND(collaborative_score::NUMERIC, 4) as social_signal,
  recommendation_source,

  -- Content quality indicators
  average_rating,
  view_count,

  -- Diversity indicators
  category_rank,

  -- Confidence metrics
  CASE 
    WHEN recommendation_source = 'hybrid' AND hybrid_score >= 0.7 THEN 'high_confidence'
    WHEN hybrid_score >= 0.5 THEN 'medium_confidence'
    WHEN hybrid_score >= 0.3 THEN 'low_confidence'
    ELSE 'experimental'
  END as confidence_level,

  -- Recommendation explanation
  CASE recommendation_source
    WHEN 'content_based' THEN 'Recommended based on your content preferences'
    WHEN 'collaborative' THEN 'Recommended by users with similar interests'
    WHEN 'hybrid' THEN 'Recommended based on content analysis and user behavior'
  END as recommendation_explanation,

  CURRENT_TIMESTAMP as recommended_at

FROM hybrid_recommendations
WHERE 
  hybrid_score >= 0.2  -- Minimum recommendation threshold
  AND category_rank <= 2  -- Max 2 recommendations per category for diversity
ORDER BY 
  hybrid_score DESC,
  average_rating DESC NULLS LAST
LIMIT 15;

-- RAG (Retrieval-Augmented Generation) pipeline for question answering
CREATE FUNCTION execute_rag_pipeline(
    query_text TEXT,
    context_limit INTEGER DEFAULT 5,
    similarity_threshold DECIMAL DEFAULT 0.4,
    language_preference VARCHAR DEFAULT 'en'
) RETURNS TABLE (
    context_documents JSONB,
    context_summary TEXT,
    generated_response TEXT,
    confidence_score DECIMAL,
    sources_cited INTEGER,
    processing_metadata JSONB
) AS $$
DECLARE
    query_embedding VECTOR(1536);
    context_docs JSONB := '[]'::JSONB;
    context_text TEXT := '';
    total_context_length INTEGER := 0;
    avg_relevance DECIMAL;
    processing_start_time TIMESTAMP := CURRENT_TIMESTAMP;
BEGIN
    -- Generate embedding for the query (in production, call embedding API)
    query_embedding := GENERATE_EMBEDDING(query_text);

    -- Retrieve relevant context using vector search
    WITH context_retrieval AS (
        SELECT 
            document_id,
            title,
            content,
            category,
            author,
            created_at,
            average_rating,
            VECTOR_SIMILARITY(content_embedding, query_embedding, 'cosine') as relevance_score,
            word_count

        FROM ai_documents
        WHERE VECTOR_SEARCH(
            content_embedding,
            query_embedding,
            similarity_metric => 'cosine',
            num_candidates => 50,
            filters => {
                'language': language_preference,
                'is_published': true
            }
        )
        AND VECTOR_SIMILARITY(content_embedding, query_embedding, 'cosine') >= similarity_threshold
        ORDER BY relevance_score DESC
        LIMIT context_limit
    )

    -- Build context for generation
    SELECT 
        JSONB_AGG(
            JSONB_BUILD_OBJECT(
                'document_id', document_id,
                'title', title,
                'content', LEFT(content, 1000),
                'category', category,
                'author', author,
                'relevance_score', relevance_score,
                'word_count', word_count
            ) ORDER BY relevance_score DESC
        ),
        STRING_AGG(
            title || E':\n' || LEFT(content, 800) || E'\n\n',
            '' ORDER BY relevance_score DESC
        ),
        SUM(LENGTH(content)),
        AVG(relevance_score)
    INTO context_docs, context_text, total_context_length, avg_relevance
    FROM context_retrieval;

    -- Generate response (in production, call LLM API)
    -- For demonstration, return structured information
    RETURN QUERY SELECT 
        COALESCE(context_docs, '[]'::JSONB) as context_documents,
        CONCAT(
            'Based on ', COALESCE(JSONB_ARRAY_LENGTH(context_docs), 0), 
            ' relevant documents from categories: ',
            STRING_AGG(DISTINCT category, ', ')
        ) as context_summary,
        CONCAT(
            'Generated response to "', query_text, 
            '" based on the retrieved context. Average relevance: ',
            ROUND(COALESCE(avg_relevance, 0), 3)
        ) as generated_response,
        COALESCE(avg_relevance, 0.0) as confidence_score,
        COALESCE(JSONB_ARRAY_LENGTH(context_docs), 0) as sources_cited,
        JSONB_BUILD_OBJECT(
            'processing_time_ms', EXTRACT(MILLISECONDS FROM (CURRENT_TIMESTAMP - processing_start_time)),
            'context_length', total_context_length,
            'language_used', language_preference,
            'similarity_threshold', similarity_threshold,
            'timestamp', CURRENT_TIMESTAMP
        ) as processing_metadata
    FROM (
        SELECT DISTINCT category 
        FROM JSONB_TO_RECORDSET(context_docs) AS x(category TEXT)
    ) cat;

END;
$$ LANGUAGE plpgsql;

-- Execute RAG pipeline for question answering
SELECT * FROM execute_rag_pipeline(
    'What are the best practices for implementing microservices architecture?',
    5,    -- context_limit
    0.4,  -- similarity_threshold  
    'en'  -- language_preference
);

-- QueryLeaf provides comprehensive Atlas Vector Search capabilities:
-- 1. Native SQL syntax for vector similarity operations and embedding management
-- 2. Advanced hybrid search combining vector similarity with traditional filters
-- 3. Multi-modal vector search supporting text, image, and audio embeddings
-- 4. Intelligent recommendation systems with content-based and collaborative filtering
-- 5. Production-ready RAG pipeline implementation with context optimization
-- 6. Real-time personalization based on user behavior and preference vectors
-- 7. Comprehensive analytics and performance monitoring for AI applications
-- 8. Enterprise-ready vector indexing with automatic scaling and optimization
-- 9. SQL-familiar semantic search operations accessible to traditional database teams
-- 10. Native integration with MongoDB Atlas infrastructure and security features

Best Practices for Production Vector Search Implementation

Vector Index Design and Optimization

Essential practices for effective Atlas Vector Search deployment:

  1. Embedding Strategy: Choose appropriate embedding models and dimensions based on use case requirements and performance constraints
  2. Index Configuration: Design vector indexes with optimal similarity metrics, candidate limits, and filter combinations for query patterns
  3. Hybrid Search Architecture: Implement effective combination of vector similarity with traditional database filtering for comprehensive search experiences
  4. Performance Optimization: Monitor search latency, throughput, and resource utilization while optimizing for production workloads
  5. Embedding Management: Establish versioning, caching, and update strategies for embedding vectors and model evolution
  6. Quality Assurance: Implement relevance testing, search quality metrics, and continuous improvement processes

Enterprise AI Application Architecture

Design vector search systems for enterprise-scale AI applications:

  1. Multi-Modal Integration: Support diverse content types including text, images, audio, and video with appropriate embedding strategies
  2. Real-Time Personalization: Implement dynamic user preference modeling with real-time recommendation updates
  3. Knowledge Management: Design comprehensive RAG pipelines for question answering, document retrieval, and content generation
  4. Scalability Planning: Architecture design for growing vector collections, user bases, and query volumes
  5. Security and Governance: Implement access controls, data privacy, and audit capabilities for enterprise compliance
  6. Monitoring and Analytics: Establish comprehensive observability for search performance, user satisfaction, and business impact

Conclusion

MongoDB Atlas Vector Search provides comprehensive AI-powered semantic similarity capabilities that enable sophisticated recommendation systems, knowledge management platforms, and intelligent content discovery applications through native vector operations, advanced hybrid search, and seamless integration with existing MongoDB infrastructure. The unified platform approach eliminates the complexity of managing separate vector databases while delivering enterprise-ready performance, security, and operational simplicity.

Key Atlas Vector Search benefits include:

  • Native Vector Operations: High-performance similarity search with multiple metrics integrated directly into MongoDB Atlas infrastructure
  • Hybrid Search Excellence: Sophisticated combination of semantic similarity with traditional database filtering for comprehensive search experiences
  • AI Application Ready: Built-in support for RAG pipelines, recommendation engines, and real-time personalization systems
  • Multi-Modal Capability: Native support for text, image, audio, and video embeddings within unified document structures
  • Enterprise Integration: Seamless integration with existing MongoDB security, monitoring, and operational infrastructure
  • SQL Accessibility: Familiar SQL-style vector operations through QueryLeaf for accessible AI application development

Whether you're building intelligent search platforms, recommendation systems, knowledge management applications, or conversational AI interfaces, MongoDB Atlas Vector Search with QueryLeaf's familiar SQL interface provides the foundation for scalable, high-performance AI applications.

QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB Atlas Vector Search operations while providing SQL-familiar syntax for semantic similarity, recommendation algorithms, and RAG pipeline construction. Advanced vector operations, hybrid search strategies, and AI application patterns are seamlessly accessible through familiar SQL constructs, making sophisticated AI development approachable for SQL-oriented development teams.

The combination of Atlas Vector Search's powerful similarity capabilities with SQL-style AI operations makes it an ideal platform for applications requiring both advanced semantic understanding and familiar database interaction patterns, ensuring your AI applications can scale efficiently while delivering intelligent, personalized user experiences.

MongoDB Geospatial Indexing and Location-Based Queries: Building High-Performance GIS Applications with Advanced Spatial Analysis and SQL-Compatible Operations

Modern location-aware applications require sophisticated geospatial data management capabilities to deliver real-time proximity searches, route optimization, geofencing, and spatial analytics at massive scale. Traditional relational databases struggle with the complex geometric calculations, multi-dimensional indexing requirements, and performance demands of location-based services, often requiring expensive third-party GIS extensions or external spatial processing systems.

MongoDB provides native geospatial indexing and query capabilities that enable applications to efficiently store, index, and query location data using industry-standard GeoJSON formats. Unlike traditional database approaches that require complex extensions or specialized spatial databases, MongoDB's built-in geospatial features deliver high-performance spatial operations, intelligent indexing strategies, and comprehensive query capabilities designed for modern mapping, logistics, and location-aware applications.

Traditional Geospatial Data Challenges

Managing location data with conventional database approaches creates significant performance, complexity, and scalability challenges:

-- Traditional PostgreSQL geospatial implementation (complex setup and limited performance)

-- Requires PostGIS extension for spatial capabilities
CREATE EXTENSION IF NOT EXISTS postgis;
CREATE EXTENSION IF NOT EXISTS postgis_topology;

-- Location-based application schema with complex spatial types
CREATE TABLE business_locations (
    business_id BIGSERIAL PRIMARY KEY,
    business_name VARCHAR(200) NOT NULL,
    category VARCHAR(100) NOT NULL,
    address TEXT NOT NULL,

    -- Complex spatial column requiring PostGIS
    location GEOMETRY(POINT, 4326) NOT NULL, -- WGS84 coordinate system
    service_area GEOMETRY(POLYGON, 4326),    -- Service boundary polygon

    -- Business metadata
    phone VARCHAR(20),
    email VARCHAR(100),
    website VARCHAR(200),
    operating_hours JSONB,
    rating NUMERIC(3,2),
    price_level INTEGER CHECK (price_level BETWEEN 1 AND 4),

    -- Operational data
    is_active BOOLEAN DEFAULT true,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Spatial indexes (PostGIS-specific syntax)
CREATE INDEX idx_business_location_gist ON business_locations 
    USING GIST (location);
CREATE INDEX idx_business_service_area_gist ON business_locations 
    USING GIST (service_area);

-- Customer location tracking table
CREATE TABLE customer_locations (
    location_id BIGSERIAL PRIMARY KEY,
    customer_id BIGINT NOT NULL,
    location GEOMETRY(POINT, 4326) NOT NULL,
    location_accuracy_meters NUMERIC(8,2),
    location_timestamp TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

    -- Location context
    location_type VARCHAR(20) DEFAULT 'gps', -- 'gps', 'network', 'manual'
    address_geocoded TEXT,
    city VARCHAR(100),
    state VARCHAR(50),
    country VARCHAR(50),
    postal_code VARCHAR(20)
);

CREATE INDEX idx_customer_location_gist ON customer_locations 
    USING GIST (location);
CREATE INDEX idx_customer_location_timestamp ON customer_locations 
    (customer_id, location_timestamp DESC);

-- Delivery routes and logistics
CREATE TABLE delivery_routes (
    route_id BIGSERIAL PRIMARY KEY,
    driver_id INTEGER NOT NULL,
    vehicle_id INTEGER NOT NULL,
    route_date DATE NOT NULL,

    -- Route geometry as LineString
    route_path GEOMETRY(LINESTRING, 4326),
    planned_stops GEOMETRY(MULTIPOINT, 4326),

    -- Route metrics
    estimated_distance_km NUMERIC(10,3),
    estimated_duration_minutes INTEGER,
    actual_distance_km NUMERIC(10,3),
    actual_duration_minutes INTEGER,

    -- Route status
    status VARCHAR(20) DEFAULT 'planned', -- 'planned', 'in_progress', 'completed', 'cancelled'
    started_at TIMESTAMP WITH TIME ZONE,
    completed_at TIMESTAMP WITH TIME ZONE
);

CREATE INDEX idx_delivery_route_path_gist ON delivery_routes 
    USING GIST (route_path);

-- Complex proximity search with PostGIS functions
WITH nearby_businesses AS (
    SELECT 
        bl.business_id,
        bl.business_name,
        bl.category,
        bl.address,
        bl.rating,
        bl.price_level,

        -- Spatial calculations using PostGIS functions
        ST_Distance(
            bl.location::geography, 
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)::geography
        ) as distance_meters,

        -- Convert geometry to GeoJSON for application consumption
        ST_AsGeoJSON(bl.location) as location_geojson,

        -- Additional spatial analysis
        ST_Within(
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326),
            bl.service_area
        ) as within_service_area,

        -- Bearing calculation
        ST_Azimuth(
            bl.location,
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)
        ) * 180 / PI() as bearing_degrees

    FROM business_locations bl
    WHERE 
        bl.is_active = true

        -- Spatial filter using bounding box for initial filtering
        AND ST_DWithin(
            bl.location::geography,
            ST_SetSRID(ST_MakePoint(-122.4194, 37.7749), 4326)::geography,
            5000  -- 5km radius
        )
),

ranked_results AS (
    SELECT 
        nb.*,

        -- Complex scoring algorithm
        (
            -- Distance component (closer is better)
            (1.0 - (distance_meters / 5000.0)) * 0.4 +

            -- Rating component
            (COALESCE(rating, 0) / 5.0) * 0.3 +

            -- Service area bonus
            CASE WHEN within_service_area THEN 0.2 ELSE 0 END +

            -- Category relevance (hardcoded for example)
            CASE 
                WHEN category = 'restaurant' THEN 0.1
                WHEN category = 'retail' THEN 0.05
                ELSE 0
            END
        ) as relevance_score,

        -- Categorize distance for user display
        CASE 
            WHEN distance_meters <= 500 THEN 'Very Close'
            WHEN distance_meters <= 1000 THEN 'Walking Distance'
            WHEN distance_meters <= 2000 THEN 'Short Drive'
            ELSE 'Moderate Distance'
        END as distance_category

    FROM nearby_businesses nb
)

SELECT 
    business_id,
    business_name,
    category,
    address,
    ROUND(distance_meters::numeric, 0) as distance_meters,
    distance_category,
    rating,
    price_level,
    ROUND(relevance_score::numeric, 3) as relevance_score,
    ROUND(bearing_degrees::numeric, 1) as bearing_from_user,
    within_service_area,
    location_geojson

FROM ranked_results
WHERE distance_meters <= 5000  -- 5km maximum distance
ORDER BY relevance_score DESC, distance_meters ASC
LIMIT 20;

-- Problems with PostGIS approach:
-- 1. Complex extension setup and maintenance requirements
-- 2. Specialized spatial syntax different from standard SQL
-- 3. Performance challenges with complex spatial calculations
-- 4. Limited integration with application development workflows
-- 5. Complex data type management and coordinate system handling
-- 6. Difficult debugging and query optimization for spatial operations
-- 7. Expensive licensing and infrastructure requirements for enterprise features
-- 8. Limited support for modern GeoJSON standards and web mapping libraries
-- 9. Complex backup and replication handling for spatial indexes
-- 10. Steep learning curve for developers without GIS background

MongoDB provides native, high-performance geospatial capabilities:

// MongoDB native geospatial operations - powerful and developer-friendly
const { MongoClient } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017');
const db = client.db('geospatial_app');

// Advanced Geospatial Application Manager
class MongoGeospatialManager {
  constructor(db) {
    this.db = db;
    this.collections = new Map();
    this.spatialIndexes = new Map();
    this.geoQueryCache = new Map();
  }

  async initializeGeospatialCollections() {
    console.log('Initializing geospatial collections with spatial indexes...');

    // Business locations collection
    const businessCollection = this.db.collection('business_locations');
    await this.createBusinessLocationIndexes(businessCollection);

    // Customer tracking collection
    const customerCollection = this.db.collection('customer_locations');
    await this.createCustomerLocationIndexes(customerCollection);

    // Delivery routes collection
    const routesCollection = this.db.collection('delivery_routes');
    await this.createDeliveryRouteIndexes(routesCollection);

    // Geofences and zones collection
    const geofencesCollection = this.db.collection('geofences');
    await this.createGeofenceIndexes(geofencesCollection);

    this.collections.set('businesses', businessCollection);
    this.collections.set('customers', customerCollection);
    this.collections.set('routes', routesCollection);
    this.collections.set('geofences', geofencesCollection);

    console.log('✅ Geospatial collections initialized with optimized indexes');
    return this.collections;
  }

  async createBusinessLocationIndexes(collection) {
    console.log('Creating business location spatial indexes...');

    // 2dsphere index for location-based queries (GeoJSON format)
    await collection.createIndexes([
      {
        key: { "location": "2dsphere" },
        name: "idx_business_location_2dsphere",
        background: true
      },
      {
        key: { "service_area": "2dsphere" },
        name: "idx_business_service_area_2dsphere", 
        background: true
      },
      {
        key: { "category": 1, "location": "2dsphere" },
        name: "idx_category_location_compound",
        background: true
      },
      {
        key: { "rating": -1, "location": "2dsphere" },
        name: "idx_rating_location_compound",
        background: true
      },
      {
        key: { "price_level": 1, "category": 1, "location": "2dsphere" },
        name: "idx_price_category_location_compound",
        background: true
      }
    ]);

    console.log('✅ Business location indexes created');
  }

  async createCustomerLocationIndexes(collection) {
    console.log('Creating customer location tracking indexes...');

    await collection.createIndexes([
      {
        key: { "location": "2dsphere" },
        name: "idx_customer_location_2dsphere",
        background: true
      },
      {
        key: { "customer_id": 1, "location_timestamp": -1 },
        name: "idx_customer_timeline",
        background: true
      },
      {
        key: { "location_timestamp": -1, "location": "2dsphere" },
        name: "idx_timeline_location_compound",
        background: true
      }
    ]);

    console.log('✅ Customer location indexes created');
  }

  async createDeliveryRouteIndexes(collection) {
    console.log('Creating delivery route spatial indexes...');

    await collection.createIndexes([
      {
        key: { "route_path": "2dsphere" },
        name: "idx_route_path_2dsphere",
        background: true
      },
      {
        key: { "planned_stops": "2dsphere" },
        name: "idx_planned_stops_2dsphere",
        background: true
      },
      {
        key: { "driver_id": 1, "route_date": -1 },
        name: "idx_driver_date",
        background: true
      },
      {
        key: { "status": 1, "route_date": -1 },
        name: "idx_status_date",
        background: true
      }
    ]);

    console.log('✅ Delivery route indexes created');
  }

  async createGeofenceIndexes(collection) {
    console.log('Creating geofence spatial indexes...');

    await collection.createIndexes([
      {
        key: { "boundary": "2dsphere" },
        name: "idx_geofence_boundary_2dsphere",
        background: true
      },
      {
        key: { "fence_type": 1, "boundary": "2dsphere" },
        name: "idx_fence_type_boundary_compound",
        background: true
      }
    ]);

    console.log('✅ Geofence indexes created');
  }

  async insertBusinessLocations(businesses) {
    console.log(`Inserting ${businesses.length} business locations...`);

    const businessCollection = this.collections.get('businesses');
    const businessDocuments = businesses.map(business => ({
      business_name: business.name,
      category: business.category,
      address: business.address,

      // GeoJSON Point format for location
      location: {
        type: "Point",
        coordinates: [business.longitude, business.latitude]  // [lng, lat]
      },

      // Optional service area as GeoJSON Polygon
      service_area: business.service_radius ? this.createCirclePolygon(
        [business.longitude, business.latitude], 
        business.service_radius
      ) : null,

      // Business metadata
      contact: {
        phone: business.phone,
        email: business.email,
        website: business.website
      },

      operating_hours: business.hours || {},
      rating: business.rating || 0,
      price_level: business.price_level || 1,

      // Operational data
      is_active: business.is_active !== false,
      created_at: new Date(),
      updated_at: new Date(),

      // Additional location context
      location_metadata: {
        address_components: business.address_components || {},
        geocoding_accuracy: business.geocoding_accuracy || 'high',
        timezone: business.timezone,
        locale: business.locale || 'en-US'
      }
    }));

    const result = await businessCollection.insertMany(businessDocuments, {
      ordered: false
    });

    console.log(`✅ Inserted ${result.insertedCount} business locations`);
    return result;
  }

  async findNearbyBusinesses(userLocation, options = {}) {
    console.log(`Finding nearby businesses around [${userLocation.longitude}, ${userLocation.latitude}]...`);

    const {
      maxDistance = 5000,        // 5km default radius
      category = null,
      minRating = 0,
      priceLevel = null,
      limit = 20,
      sortBy = 'distance'        // 'distance', 'rating', 'relevance'
    } = options;

    const businessCollection = this.collections.get('businesses');
    const userPoint = [userLocation.longitude, userLocation.latitude];

    try {
      const searchPipeline = [
        // Stage 1: Geospatial proximity filter
        {
          $geoNear: {
            near: {
              type: "Point",
              coordinates: userPoint
            },
            distanceField: "distance_meters",
            maxDistance: maxDistance,
            spherical: true,
            query: {
              is_active: true,
              ...(category && { category: category }),
              ...(minRating > 0 && { rating: { $gte: minRating } }),
              ...(priceLevel && { price_level: priceLevel })
            }
          }
        },

        // Stage 2: Add computed fields for analysis
        {
          $addFields: {
            // Distance categorization
            distance_category: {
              $switch: {
                branches: [
                  { case: { $lte: ["$distance_meters", 500] }, then: "very_close" },
                  { case: { $lte: ["$distance_meters", 1000] }, then: "walking_distance" },
                  { case: { $lte: ["$distance_meters", 2000] }, then: "short_drive" },
                  { case: { $lte: ["$distance_meters", 5000] }, then: "moderate_distance" }
                ],
                default: "far"
              }
            },

            // Check if user is within business service area
            within_service_area: {
              $cond: {
                if: { $ne: ["$service_area", null] },
                then: {
                  $function: {
                    body: function(serviceArea, userPoint) {
                      // Simple point-in-polygon check (simplified for example)
                      return serviceArea != null;
                    },
                    args: ["$service_area", userPoint],
                    lang: "js"
                  }
                },
                else: false
              }
            },

            // Calculate bearing from user to business
            bearing_degrees: {
              $function: {
                body: function(businessCoords, userCoords) {
                  // Calculate bearing using geographic formulas
                  const lat1 = userCoords[1] * Math.PI / 180;
                  const lat2 = businessCoords[1] * Math.PI / 180;
                  const deltaLng = (businessCoords[0] - userCoords[0]) * Math.PI / 180;

                  const y = Math.sin(deltaLng) * Math.cos(lat2);
                  const x = Math.cos(lat1) * Math.sin(lat2) - 
                           Math.sin(lat1) * Math.cos(lat2) * Math.cos(deltaLng);

                  let bearing = Math.atan2(y, x) * 180 / Math.PI;
                  return (bearing + 360) % 360;
                },
                args: ["$location.coordinates", userPoint],
                lang: "js"
              }
            },

            // Relevance score calculation
            relevance_score: {
              $add: [
                // Distance component (closer is better) - 40% weight
                {
                  $multiply: [
                    { $subtract: [1, { $divide: ["$distance_meters", maxDistance] }] },
                    0.4
                  ]
                },

                // Rating component - 30% weight
                { $multiply: [{ $divide: [{ $ifNull: ["$rating", 0] }, 5] }, 0.3] },

                // Service area bonus - 20% weight
                { $cond: ["$within_service_area", 0.2, 0] },

                // Category relevance bonus - 10% weight
                {
                  $switch: {
                    branches: [
                      { case: { $eq: ["$category", "restaurant"] }, then: 0.1 },
                      { case: { $eq: ["$category", "retail"] }, then: 0.05 }
                    ],
                    default: 0
                  }
                }
              ]
            }
          }
        },

        // Stage 3: Add directional information
        {
          $addFields: {
            direction_compass: {
              $switch: {
                branches: [
                  { case: { $and: [{ $gte: ["$bearing_degrees", 337.5] }, { $lt: ["$bearing_degrees", 22.5] }] }, then: "N" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 22.5] }, { $lt: ["$bearing_degrees", 67.5] }] }, then: "NE" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 67.5] }, { $lt: ["$bearing_degrees", 112.5] }] }, then: "E" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 112.5] }, { $lt: ["$bearing_degrees", 157.5] }] }, then: "SE" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 157.5] }, { $lt: ["$bearing_degrees", 202.5] }] }, then: "S" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 202.5] }, { $lt: ["$bearing_degrees", 247.5] }] }, then: "SW" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 247.5] }, { $lt: ["$bearing_degrees", 292.5] }] }, then: "W" },
                  { case: { $and: [{ $gte: ["$bearing_degrees", 292.5] }, { $lt: ["$bearing_degrees", 337.5] }] }, then: "NW" }
                ],
                default: "N"
              }
            },

            // Human-readable distance
            distance_display: {
              $switch: {
                branches: [
                  { 
                    case: { $lt: ["$distance_meters", 1000] },
                    then: { $concat: [{ $toString: { $round: ["$distance_meters", 0] } }, "m"] }
                  },
                  {
                    case: { $lt: ["$distance_meters", 10000] },
                    then: { $concat: [{ $toString: { $round: [{ $divide: ["$distance_meters", 1000] }, 1] } }, "km"] }
                  }
                ],
                default: { $concat: [{ $toString: { $round: [{ $divide: ["$distance_meters", 1000] }, 0] } }, "km"] }
              }
            }
          }
        },

        // Stage 4: Sort based on user preference
        {
          $sort: sortBy === 'rating' ? { rating: -1, distance_meters: 1 } :
                 sortBy === 'relevance' ? { relevance_score: -1, distance_meters: 1 } :
                 { distance_meters: 1 }
        },

        // Stage 5: Limit results
        { $limit: limit },

        // Stage 6: Project final output
        {
          $project: {
            business_name: 1,
            category: 1,
            address: 1,
            location: 1,
            contact: 1,
            rating: 1,
            price_level: 1,

            // Calculated fields
            distance_meters: { $round: ["$distance_meters", 0] },
            distance_display: 1,
            distance_category: 1,
            bearing_degrees: { $round: ["$bearing_degrees", 1] },
            direction_compass: 1,
            relevance_score: { $round: ["$relevance_score", 3] },
            within_service_area: 1,

            // Metadata
            created_at: 1,
            location_metadata: 1
          }
        }
      ];

      const startTime = Date.now();
      const nearbyBusinesses = await businessCollection.aggregate(searchPipeline, {
        allowDiskUse: true
      }).toArray();
      const queryTime = Date.now() - startTime;

      console.log(`✅ Found ${nearbyBusinesses.length} nearby businesses in ${queryTime}ms`);

      return {
        user_location: userLocation,
        search_params: options,
        query_time_ms: queryTime,
        total_results: nearbyBusinesses.length,
        businesses: nearbyBusinesses
      };

    } catch (error) {
      console.error('Error finding nearby businesses:', error);
      throw error;
    }
  }

  async implementGeofencingSystem() {
    console.log('Setting up advanced geofencing system...');

    const geofencesCollection = this.collections.get('geofences');

    // Create various types of geofences
    const sampleGeofences = [
      {
        fence_id: 'delivery_zone_downtown',
        fence_name: 'Downtown Delivery Zone',
        fence_type: 'delivery_boundary',

        // GeoJSON Polygon for complex delivery zone
        boundary: {
          type: "Polygon",
          coordinates: [[
            [-122.4194, 37.7749],  // San Francisco downtown area
            [-122.4094, 37.7849],
            [-122.3994, 37.7849],
            [-122.3994, 37.7649],
            [-122.4194, 37.7649],
            [-122.4194, 37.7749]   // Close the polygon
          ]]
        },

        // Geofence properties
        properties: {
          delivery_fee: 2.99,
          estimated_delivery_time: 30,
          service_level: 'premium',
          operating_hours: {
            monday: { start: '08:00', end: '22:00' },
            tuesday: { start: '08:00', end: '22:00' },
            wednesday: { start: '08:00', end: '22:00' },
            thursday: { start: '08:00', end: '22:00' },
            friday: { start: '08:00', end: '23:00' },
            saturday: { start: '09:00', end: '23:00' },
            sunday: { start: '10:00', end: '21:00' }
          }
        },

        is_active: true,
        created_at: new Date()
      },

      {
        fence_id: 'high_demand_area_financial',
        fence_name: 'Financial District High Demand Zone',
        fence_type: 'pricing_zone',

        // Circular geofence using buffered point
        boundary: {
          type: "Polygon",
          coordinates: [this.createCirclePolygon([-122.4000, 37.7900], 1000).coordinates[0]]
        },

        properties: {
          surge_multiplier: 1.5,
          priority_processing: true,
          rush_hour_bonus: true
        },

        is_active: true,
        created_at: new Date()
      }
    ];

    await geofencesCollection.insertMany(sampleGeofences);
    console.log('✅ Geofencing system configured with sample zones');
  }

  async checkGeofenceEntries(location, customer_id) {
    console.log(`Checking geofence entries for customer ${customer_id}...`);

    const geofencesCollection = this.collections.get('geofences');
    const point = [location.longitude, location.latitude];

    try {
      // Find all geofences containing the location
      const containingGeofences = await geofencesCollection.find({
        is_active: true,
        boundary: {
          $geoIntersects: {
            $geometry: {
              type: "Point",
              coordinates: point
            }
          }
        }
      }).toArray();

      const geofenceEvents = [];

      for (const geofence of containingGeofences) {
        // Check if this is a new entry (simplified logic)
        const event = {
          customer_id: customer_id,
          geofence_id: geofence.fence_id,
          geofence_name: geofence.fence_name,
          fence_type: geofence.fence_type,
          event_type: 'entry',
          event_timestamp: new Date(),
          location: {
            type: "Point",
            coordinates: point
          },
          properties: geofence.properties
        };

        geofenceEvents.push(event);

        // Trigger appropriate business logic based on geofence type
        await this.handleGeofenceEvent(event);
      }

      console.log(`✅ Processed ${geofenceEvents.length} geofence events`);
      return geofenceEvents;

    } catch (error) {
      console.error('Error checking geofence entries:', error);
      throw error;
    }
  }

  async handleGeofenceEvent(event) {
    console.log(`Handling geofence event: ${event.event_type} for ${event.geofence_name}`);

    // Store geofence event
    await this.db.collection('geofence_events').insertOne(event);

    // Business logic based on geofence type
    switch (event.fence_type) {
      case 'delivery_boundary':
        await this.handleDeliveryZoneEntry(event);
        break;
      case 'pricing_zone':
        await this.handlePricingZoneEntry(event);
        break;
      default:
        console.log(`No specific handler for fence type: ${event.fence_type}`);
    }
  }

  async handleDeliveryZoneEntry(event) {
    console.log(`Customer entered delivery zone: ${event.geofence_name}`);

    // Update customer delivery preferences
    await this.db.collection('customer_profiles').updateOne(
      { customer_id: event.customer_id },
      {
        $set: {
          current_delivery_zone: event.geofence_id,
          delivery_fee: event.properties.delivery_fee,
          estimated_delivery_time: event.properties.estimated_delivery_time
        },
        $push: {
          zone_history: {
            zone_id: event.geofence_id,
            entered_at: event.event_timestamp,
            properties: event.properties
          }
        }
      },
      { upsert: true }
    );
  }

  async handlePricingZoneEntry(event) {
    console.log(`Customer entered high-demand pricing zone: ${event.geofence_name}`);

    // Apply dynamic pricing
    await this.db.collection('pricing_adjustments').insertOne({
      customer_id: event.customer_id,
      zone_id: event.geofence_id,
      surge_multiplier: event.properties.surge_multiplier,
      applied_at: event.event_timestamp,
      expires_at: new Date(Date.now() + 30 * 60 * 1000) // 30 minutes
    });
  }

  async optimizeDeliveryRoutes(deliveries, startLocation) {
    console.log(`Optimizing delivery route for ${deliveries.length} stops...`);

    const routesCollection = this.collections.get('routes');

    try {
      // Simple nearest-neighbor route optimization
      let currentLocation = startLocation;
      const optimizedRoute = [];
      const remainingDeliveries = [...deliveries];

      while (remainingDeliveries.length > 0) {
        // Find nearest delivery location
        let nearestIndex = 0;
        let shortestDistance = Number.MAX_VALUE;

        for (let i = 0; i < remainingDeliveries.length; i++) {
          const delivery = remainingDeliveries[i];
          const distance = this.calculateDistance(
            currentLocation,
            delivery.location.coordinates
          );

          if (distance < shortestDistance) {
            shortestDistance = distance;
            nearestIndex = i;
          }
        }

        // Add nearest delivery to route
        const nextDelivery = remainingDeliveries.splice(nearestIndex, 1)[0];
        optimizedRoute.push({
          ...nextDelivery,
          distance_from_previous: shortestDistance,
          estimated_travel_time: Math.ceil(shortestDistance / 30 * 60) // Assume 30 km/h average
        });

        currentLocation = nextDelivery.location.coordinates;
      }

      // Calculate total route metrics
      const totalDistance = optimizedRoute.reduce((sum, stop) => sum + stop.distance_from_previous, 0);
      const totalTime = optimizedRoute.reduce((sum, stop) => sum + stop.estimated_travel_time, 0);

      // Create route path as LineString
      const routePath = {
        type: "LineString",
        coordinates: [
          [startLocation[0], startLocation[1]], // Start point
          ...optimizedRoute.map(stop => stop.location.coordinates)
        ]
      };

      // Store optimized route
      const routeDocument = {
        route_id: `route_${Date.now()}`,
        driver_id: null, // To be assigned
        vehicle_id: null, // To be assigned
        route_date: new Date(),

        route_path: routePath,
        planned_stops: {
          type: "MultiPoint",
          coordinates: optimizedRoute.map(stop => stop.location.coordinates)
        },

        deliveries: optimizedRoute,

        metrics: {
          total_distance_km: Math.round(totalDistance / 1000 * 100) / 100,
          estimated_duration_minutes: totalTime,
          stop_count: optimizedRoute.length,
          optimization_algorithm: 'nearest_neighbor'
        },

        status: 'planned',
        created_at: new Date()
      };

      const result = await routesCollection.insertOne(routeDocument);

      console.log(`✅ Route optimized: ${optimizedRoute.length} stops, ${Math.round(totalDistance/1000*10)/10}km, ${totalTime}min`);

      return {
        route_id: result.insertedId,
        optimized_route: optimizedRoute,
        route_path: routePath,
        metrics: routeDocument.metrics
      };

    } catch (error) {
      console.error('Error optimizing delivery route:', error);
      throw error;
    }
  }

  async performSpatialAnalytics(analysisType, parameters = {}) {
    console.log(`Performing spatial analysis: ${analysisType}`);

    const businessCollection = this.collections.get('businesses');

    try {
      switch (analysisType) {
        case 'density_analysis':
          return await this.performDensityAnalysis(parameters);
        case 'coverage_analysis':
          return await this.performCoverageAnalysis(parameters);
        case 'competition_analysis':
          return await this.performCompetitionAnalysis(parameters);
        default:
          throw new Error(`Unknown analysis type: ${analysisType}`);
      }
    } catch (error) {
      console.error(`Error performing ${analysisType}:`, error);
      throw error;
    }
  }

  async performDensityAnalysis(parameters) {
    const { 
      center, 
      radius = 5000, 
      gridSize = 1000,
      category = null 
    } = parameters;

    const businessCollection = this.collections.get('businesses');

    // Create analysis grid around center point
    const densityPipeline = [
      // Find businesses within analysis area
      {
        $geoNear: {
          near: {
            type: "Point",
            coordinates: [center.longitude, center.latitude]
          },
          distanceField: "distance",
          maxDistance: radius,
          spherical: true,
          query: {
            is_active: true,
            ...(category && { category: category })
          }
        }
      },

      // Group businesses into grid cells
      {
        $group: {
          _id: {
            // Simple grid cell calculation
            grid_x: {
              $floor: {
                $divide: [
                  { $multiply: [
                    { $subtract: [{ $arrayElemAt: ["$location.coordinates", 0] }, center.longitude] },
                    111320  // Approximate meters per degree longitude
                  ]},
                  gridSize
                ]
              }
            },
            grid_y: {
              $floor: {
                $divide: [
                  { $multiply: [
                    { $subtract: [{ $arrayElemAt: ["$location.coordinates", 1] }, center.latitude] },
                    110540  // Approximate meters per degree latitude
                  ]},
                  gridSize
                ]
              }
            }
          },
          business_count: { $sum: 1 },
          avg_rating: { $avg: "$rating" },
          business_types: { $addToSet: "$category" },
          businesses: { $push: {
            name: "$business_name",
            rating: "$rating",
            location: "$location"
          }}
        }
      },

      // Calculate density metrics
      {
        $addFields: {
          density_per_km2: {
            $multiply: [
              "$business_count",
              { $divide: [1000000, { $multiply: [gridSize, gridSize] }] }
            ]
          },
          diversity_index: { $size: "$business_types" }
        }
      },

      // Sort by density
      {
        $sort: { business_count: -1 }
      }
    ];

    const densityResults = await businessCollection.aggregate(densityPipeline).toArray();

    return {
      analysis_type: 'density_analysis',
      parameters: parameters,
      grid_size_meters: gridSize,
      total_grid_cells: densityResults.length,
      density_results: densityResults
    };
  }

  // Utility methods
  createCirclePolygon(center, radiusMeters) {
    const points = 64; // Number of points in circle
    const coordinates = [];

    for (let i = 0; i <= points; i++) {
      const angle = (i * 2 * Math.PI) / points;
      const dx = radiusMeters * Math.cos(angle);
      const dy = radiusMeters * Math.sin(angle);

      // Convert meters to degrees (approximate)
      const deltaLat = dy / 110540;
      const deltaLng = dx / (111320 * Math.cos(center[1] * Math.PI / 180));

      coordinates.push([
        center[0] + deltaLng,
        center[1] + deltaLat
      ]);
    }

    return {
      type: "Polygon",
      coordinates: [coordinates]
    };
  }

  calculateDistance(point1, point2) {
    // Haversine formula for calculating distance between two points
    const R = 6371e3; // Earth's radius in meters
    const lat1 = point1[1] * Math.PI / 180;
    const lat2 = point2[1] * Math.PI / 180;
    const deltaLat = (point2[1] - point1[1]) * Math.PI / 180;
    const deltaLng = (point2[0] - point1[0]) * Math.PI / 180;

    const a = Math.sin(deltaLat/2) * Math.sin(deltaLat/2) +
              Math.cos(lat1) * Math.cos(lat2) *
              Math.sin(deltaLng/2) * Math.sin(deltaLng/2);
    const c = 2 * Math.atan2(Math.sqrt(a), Math.sqrt(1-a));

    return R * c;
  }

  async generateSpatialReport() {
    console.log('Generating comprehensive spatial analytics report...');

    const report = {
      generated_at: new Date(),
      collections: {}
    };

    const collectionNames = ['business_locations', 'customer_locations', 'delivery_routes', 'geofences'];

    for (const collectionName of collectionNames) {
      try {
        const collection = this.db.collection(collectionName);

        // Get basic collection statistics
        const stats = await this.db.runCommand({ collStats: collectionName });

        // Get spatial index statistics
        const indexes = await collection.listIndexes().toArray();
        const spatialIndexes = indexes.filter(idx => 
          Object.values(idx.key || {}).includes('2dsphere') || 
          Object.values(idx.key || {}).includes('2d')
        );

        // Get document count and sample
        const documentCount = await collection.countDocuments();
        const sampleDocs = await collection.find({}).limit(3).toArray();

        report.collections[collectionName] = {
          document_count: documentCount,
          storage_size: stats.storageSize,
          avg_document_size: stats.avgObjSize,
          spatial_indexes: spatialIndexes.length,
          spatial_index_details: spatialIndexes.map(idx => ({
            name: idx.name,
            key: idx.key,
            sparse: idx.sparse || false
          })),
          sample_documents: sampleDocs.map(doc => {
            // Remove sensitive data for reporting
            const { _id, location, ...metadata } = doc;
            return { location, metadata: Object.keys(metadata) };
          })
        };
      } catch (error) {
        report.collections[collectionName] = { error: error.message };
      }
    }

    return report;
  }

  async shutdown() {
    console.log('Shutting down geospatial manager...');
    await this.client.close();
    console.log('Geospatial manager shutdown completed');
  }
}

// Export the geospatial manager
module.exports = { MongoGeospatialManager };

// MongoDB Geospatial Benefits:
// - Native GeoJSON support with industry-standard spatial data formats
// - High-performance 2dsphere indexes optimized for spherical geometry calculations
// - Comprehensive spatial query operators for proximity, intersection, and containment
// - Efficient geospatial aggregation pipelines for spatial analytics
// - Built-in support for complex geometries: Point, LineString, Polygon, MultiPolygon
// - Real-time geofencing capabilities with change streams integration
// - Seamless integration with mapping libraries and GIS applications
// - SQL-compatible spatial operations through QueryLeaf integration
// - Automatic spatial index optimization for query performance
// - Scalable architecture supporting massive location datasets

Understanding MongoDB Geospatial Architecture

Advanced Location-Based Query Patterns

MongoDB's geospatial capabilities enable sophisticated location-based application patterns:

// Advanced geospatial query patterns for real-world applications
class AdvancedGeospatialQueries {
  constructor(db) {
    this.db = db;
    this.queryCache = new Map();
  }

  async implementAdvancedSpatialQueries() {
    console.log('Demonstrating advanced geospatial query patterns...');

    // Pattern 1: Multi-criteria proximity search
    await this.multiCriteriaProximitySearch();

    // Pattern 2: Route intersection analysis
    await this.routeIntersectionAnalysis();

    // Pattern 3: Spatial clustering and heat map generation
    await this.spatialClusteringAnalysis();

    // Pattern 4: Dynamic geofence management
    await this.dynamicGeofenceManagement();

    console.log('Advanced geospatial patterns demonstrated');
  }

  async multiCriteriaProximitySearch() {
    console.log('Performing multi-criteria proximity search...');

    const businessCollection = this.db.collection('business_locations');

    // Complex search combining multiple spatial and business criteria
    const complexSearchPipeline = [
      {
        $geoNear: {
          near: {
            type: "Point",
            coordinates: [-122.4194, 37.7749] // San Francisco
          },
          distanceField: "distance_meters",
          maxDistance: 3000,
          spherical: true,
          query: {
            is_active: true,
            rating: { $gte: 4.0 }
          }
        }
      },

      // Add time-based availability filtering
      {
        $addFields: {
          is_currently_open: {
            $function: {
              body: function(operatingHours) {
                const now = new Date();
                const currentDay = ['sunday', 'monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday'][now.getDay()];
                const currentTime = now.getHours() * 100 + now.getMinutes();

                if (!operatingHours || !operatingHours[currentDay]) {
                  return false;
                }

                const dayHours = operatingHours[currentDay];
                const startTime = parseInt(dayHours.start.replace(':', ''));
                const endTime = parseInt(dayHours.end.replace(':', ''));

                return currentTime >= startTime && currentTime <= endTime;
              },
              args: ["$operating_hours"],
              lang: "js"
            }
          }
        }
      },

      // Service area intersection check
      {
        $addFields: {
          provides_delivery_to_user: {
            $cond: {
              if: { $ne: ["$service_area", null] },
              then: {
                $function: {
                  body: function(serviceArea, userLocation) {
                    // Simplified point-in-polygon check
                    // In production, use more sophisticated algorithms
                    return serviceArea != null;
                  },
                  args: ["$service_area", [-122.4194, 37.7749]],
                  lang: "js"
                }
              },
              else: false
            }
          }
        }
      },

      // Calculate composite score
      {
        $addFields: {
          composite_score: {
            $add: [
              // Distance component (40%)
              { $multiply: [
                { $subtract: [1, { $divide: ["$distance_meters", 3000] }] },
                0.4
              ]},

              // Rating component (30%)
              { $multiply: [{ $divide: ["$rating", 5] }, 0.3] },

              // Current availability bonus (20%)
              { $cond: ["$is_currently_open", 0.2, 0] },

              // Delivery service bonus (10%)
              { $cond: ["$provides_delivery_to_user", 0.1, 0] }
            ]
          }
        }
      },

      // Filter and sort by composite score
      {
        $match: {
          composite_score: { $gte: 0.5 } // Minimum quality threshold
        }
      },

      {
        $sort: { composite_score: -1, distance_meters: 1 }
      },

      { $limit: 15 },

      {
        $project: {
          business_name: 1,
          category: 1,
          rating: 1,
          distance_meters: { $round: ["$distance_meters", 0] },
          is_currently_open: 1,
          provides_delivery_to_user: 1,
          composite_score: { $round: ["$composite_score", 3] },
          location: 1
        }
      }
    ];

    const results = await businessCollection.aggregate(complexSearchPipeline).toArray();
    console.log(`✅ Found ${results.length} businesses matching complex criteria`);
    return results;
  }

  async routeIntersectionAnalysis() {
    console.log('Analyzing route intersections with geofences...');

    const routesCollection = this.db.collection('delivery_routes');
    const geofencesCollection = this.db.collection('geofences');

    // Find routes that intersect with specific geofences
    const intersectionPipeline = [
      {
        $match: {
          status: 'in_progress',
          route_path: { $exists: true }
        }
      },

      // Lookup intersecting geofences
      {
        $lookup: {
          from: 'geofences',
          let: { route_path: '$route_path' },
          pipeline: [
            {
              $match: {
                $expr: {
                  $and: [
                    { $eq: ['$is_active', true] },
                    {
                      $function: {
                        body: function(geofenceBoundary, routePath) {
                          // Simplified intersection logic
                          // In production, use proper geometric intersection algorithms
                          return geofenceBoundary && routePath;
                        },
                        args: ['$boundary', '$$route_path'],
                        lang: 'js'
                      }
                    }
                  ]
                }
              }
            }
          ],
          as: 'intersecting_geofences'
        }
      },

      // Filter routes with intersections
      {
        $match: {
          'intersecting_geofences.0': { $exists: true }
        }
      },

      // Calculate intersection impact
      {
        $addFields: {
          intersection_analysis: {
            $map: {
              input: '$intersecting_geofences',
              as: 'geofence',
              in: {
                fence_id: '$$geofence.fence_id',
                fence_type: '$$geofence.fence_type',
                impact_type: {
                  $switch: {
                    branches: [
                      { case: { $eq: ['$$geofence.fence_type', 'pricing_zone'] }, then: 'cost_increase' },
                      { case: { $eq: ['$$geofence.fence_type', 'restricted_zone'] }, then: 'route_restriction' },
                      { case: { $eq: ['$$geofence.fence_type', 'priority_zone'] }, then: 'priority_handling' }
                    ],
                    default: 'monitoring'
                  }
                },
                properties: '$$geofence.properties'
              }
            }
          }
        }
      },

      {
        $project: {
          route_id: 1,
          driver_id: 1,
          route_date: 1,
          status: 1,
          intersection_count: { $size: '$intersecting_geofences' },
          intersection_analysis: 1,
          estimated_impact: {
            $reduce: {
              input: '$intersection_analysis',
              initialValue: { cost_multiplier: 1.0, priority_boost: 0 },
              in: {
                cost_multiplier: {
                  $cond: [
                    { $eq: ['$$this.impact_type', 'cost_increase'] },
                    { $multiply: ['$$value.cost_multiplier', '$$this.properties.surge_multiplier'] },
                    '$$value.cost_multiplier'
                  ]
                },
                priority_boost: {
                  $cond: [
                    { $eq: ['$$this.impact_type', 'priority_handling'] },
                    { $add: ['$$value.priority_boost', 1] },
                    '$$value.priority_boost'
                  ]
                }
              }
            }
          }
        }
      }
    ];

    const intersectionResults = await routesCollection.aggregate(intersectionPipeline).toArray();
    console.log(`✅ Analyzed ${intersectionResults.length} routes with geofence intersections`);
    return intersectionResults;
  }

  async spatialClusteringAnalysis() {
    console.log('Performing spatial clustering analysis...');

    const businessCollection = this.db.collection('business_locations');

    // Density-based clustering for business locations
    const clusteringPipeline = [
      {
        $match: {
          is_active: true,
          location: { $exists: true }
        }
      },

      // Create spatial grid for clustering
      {
        $addFields: {
          grid_cell: {
            x: {
              $floor: {
                $multiply: [
                  { $arrayElemAt: ['$location.coordinates', 0] },
                  1000  // Grid precision
                ]
              }
            },
            y: {
              $floor: {
                $multiply: [
                  { $arrayElemAt: ['$location.coordinates', 1] },
                  1000  // Grid precision
                ]
              }
            }
          }
        }
      },

      // Group by grid cells
      {
        $group: {
          _id: '$grid_cell',
          business_count: { $sum: 1 },
          categories: { $addToSet: '$category' },
          avg_rating: { $avg: '$rating' },
          businesses: { $push: {
            business_id: '$_id',
            business_name: '$business_name',
            category: '$category',
            location: '$location',
            rating: '$rating'
          }},

          // Calculate cluster center
          center_longitude: { $avg: { $arrayElemAt: ['$location.coordinates', 0] } },
          center_latitude: { $avg: { $arrayElemAt: ['$location.coordinates', 1] } }
        }
      },

      // Filter significant clusters
      {
        $match: {
          business_count: { $gte: 3 }  // Minimum cluster size
        }
      },

      // Add cluster analysis
      {
        $addFields: {
          cluster_center: {
            type: 'Point',
            coordinates: ['$center_longitude', '$center_latitude']
          },
          diversity_index: { $size: '$categories' },
          cluster_density: '$business_count', // Simplified density metric

          cluster_characteristics: {
            $switch: {
              branches: [
                {
                  case: { $gte: ['$business_count', 10] },
                  then: 'high_density_commercial'
                },
                {
                  case: { $and: [
                    { $gte: ['$business_count', 5] },
                    { $gte: ['$diversity_index', 4] }
                  ]},
                  then: 'diverse_business_district'
                },
                {
                  case: { $eq: [{ $size: '$categories' }, 1] },
                  then: 'specialized_cluster'
                }
              ],
              default: 'mixed_commercial'
            }
          }
        }
      },

      // Sort by cluster significance
      {
        $sort: { business_count: -1, diversity_index: -1 }
      }
    ];

    const clusterResults = await businessCollection.aggregate(clusteringPipeline).toArray();

    // Generate heat map data
    const heatMapData = clusterResults.map(cluster => ({
      lat: cluster.center_latitude,
      lng: cluster.center_longitude,
      intensity: Math.min(cluster.business_count / 10, 1), // Normalized intensity
      business_count: cluster.business_count,
      characteristics: cluster.cluster_characteristics
    }));

    console.log(`✅ Identified ${clusterResults.length} business clusters`);
    return {
      clusters: clusterResults,
      heat_map_data: heatMapData
    };
  }

  async dynamicGeofenceManagement() {
    console.log('Implementing dynamic geofence management...');

    const geofencesCollection = this.db.collection('geofences');
    const eventsCollection = this.db.collection('geofence_events');

    // Analyze geofence performance and adjust boundaries
    const performancePipeline = [
      {
        $match: {
          event_timestamp: { 
            $gte: new Date(Date.now() - 24 * 60 * 60 * 1000) // Last 24 hours
          }
        }
      },

      // Group by geofence
      {
        $group: {
          _id: '$geofence_id',
          total_events: { $sum: 1 },
          unique_customers: { $addToSet: '$customer_id' },
          event_types: { $addToSet: '$event_type' },
          avg_dwell_time: { $avg: '$dwell_time_minutes' },

          // Collect event locations for boundary analysis
          event_locations: { $push: '$location' },

          latest_properties: { $last: '$properties' }
        }
      },

      // Calculate performance metrics
      {
        $addFields: {
          unique_customer_count: { $size: '$unique_customers' },
          event_rate_per_hour: { $divide: ['$total_events', 24] },

          // Analyze spatial distribution of events
          boundary_efficiency: {
            $function: {
              body: function(eventLocations) {
                // Simplified efficiency calculation
                // In production, analyze point distribution within geofence
                return eventLocations.length > 10 ? 0.8 : 0.6;
              },
              args: ['$event_locations'],
              lang: 'js'
            }
          }
        }
      },

      // Identify geofences needing adjustment
      {
        $addFields: {
          needs_adjustment: {
            $or: [
              { $lt: ['$boundary_efficiency', 0.7] },
              { $lt: ['$event_rate_per_hour', 1] },
              { $gt: ['$event_rate_per_hour', 20] }
            ]
          },

          adjustment_type: {
            $switch: {
              branches: [
                { 
                  case: { $lt: ['$event_rate_per_hour', 1] },
                  then: 'expand_boundary'
                },
                {
                  case: { $gt: ['$event_rate_per_hour', 20] },
                  then: 'contract_boundary'
                },
                {
                  case: { $lt: ['$boundary_efficiency', 0.7] },
                  then: 'reshape_boundary'
                }
              ],
              default: 'no_change'
            }
          }
        }
      },

      // Filter geofences that need updates
      {
        $match: {
          needs_adjustment: true
        }
      }
    ];

    const adjustmentCandidates = await eventsCollection.aggregate(performancePipeline).toArray();

    // Apply recommended adjustments
    for (const candidate of adjustmentCandidates) {
      await this.applyGeofenceAdjustment(candidate);
    }

    console.log(`✅ Analyzed ${adjustmentCandidates.length} geofences for dynamic adjustment`);
    return adjustmentCandidates;
  }

  async applyGeofenceAdjustment(adjustmentCandidate) {
    const geofencesCollection = this.db.collection('geofences');
    const geofenceId = adjustmentCandidate._id;

    console.log(`Applying ${adjustmentCandidate.adjustment_type} to geofence ${geofenceId}`);

    // Create adjustment record
    const adjustment = {
      geofence_id: geofenceId,
      adjustment_type: adjustmentCandidate.adjustment_type,
      reason: `Performance optimization - ${adjustmentCandidate.adjustment_type}`,
      applied_at: new Date(),
      previous_metrics: {
        event_rate_per_hour: adjustmentCandidate.event_rate_per_hour,
        boundary_efficiency: adjustmentCandidate.boundary_efficiency,
        unique_customer_count: adjustmentCandidate.unique_customer_count
      }
    };

    // Store adjustment history
    await this.db.collection('geofence_adjustments').insertOne(adjustment);

    // Update geofence properties based on adjustment type
    const updateDoc = {
      $set: {
        last_adjusted: new Date(),
        adjustment_history: adjustment
      }
    };

    switch (adjustmentCandidate.adjustment_type) {
      case 'expand_boundary':
        // Implement boundary expansion logic
        updateDoc.$inc = { 'properties.expansion_factor': 0.1 };
        break;
      case 'contract_boundary':
        // Implement boundary contraction logic
        updateDoc.$inc = { 'properties.contraction_factor': 0.1 };
        break;
      case 'reshape_boundary':
        // Implement boundary reshaping logic
        updateDoc.$set['properties.needs_manual_review'] = true;
        break;
    }

    await geofencesCollection.updateOne(
      { fence_id: geofenceId },
      updateDoc
    );
  }
}

// Export the advanced queries class
module.exports = { AdvancedGeospatialQueries };

SQL-Style Geospatial Operations with QueryLeaf

QueryLeaf enables familiar SQL syntax for MongoDB geospatial operations:

-- QueryLeaf geospatial operations with SQL-familiar syntax

-- Create geospatial table with spatial column
CREATE TABLE business_locations (
  business_id SERIAL PRIMARY KEY,
  business_name VARCHAR(200) NOT NULL,
  category VARCHAR(100) NOT NULL,
  address TEXT NOT NULL,
  location POINT NOT NULL,  -- GeoJSON Point stored as POINT type
  service_area POLYGON,     -- GeoJSON Polygon for service boundaries
  rating DECIMAL(3,2) DEFAULT 0,
  price_level INTEGER CHECK (price_level BETWEEN 1 AND 4),
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"location": "2dsphere", "service_area": "2dsphere"}'
);

-- Insert geospatial data using standard SQL syntax
INSERT INTO business_locations (
  business_name, category, address, location, service_area, rating, price_level
) VALUES 
  ('Downtown Cafe', 'restaurant', '123 Main St', ST_Point(-122.4194, 37.7749), ST_Buffer(ST_Point(-122.4194, 37.7749), 0.01), 4.5, 2),
  ('Tech Bookstore', 'retail', '456 Tech Ave', ST_Point(-122.4094, 37.7849), ST_Buffer(ST_Point(-122.4094, 37.7849), 0.015), 4.2, 3),
  ('Local Grocery', 'grocery', '789 Local Rd', ST_Point(-122.3994, 37.7649), ST_Buffer(ST_Point(-122.3994, 37.7649), 0.008), 3.8, 1);

-- Proximity-based queries with familiar SQL spatial functions
WITH nearby_businesses AS (
  SELECT 
    business_id,
    business_name,
    category,
    address,
    rating,
    price_level,

    -- Calculate distance using SQL spatial functions
    ST_Distance(location, ST_Point(-122.4150, 37.7750)) as distance_meters,

    -- Check if user location is within service area
    ST_Within(ST_Point(-122.4150, 37.7750), service_area) as within_service_area,

    -- Calculate bearing from user to business
    ST_Azimuth(ST_Point(-122.4150, 37.7750), location) * 180 / PI() as bearing_degrees,

    -- Convert geometry to GeoJSON for application use
    ST_AsGeoJSON(location) as location_geojson

  FROM business_locations
  WHERE 
    is_active = true

    -- Spatial proximity filter (5km radius)
    AND ST_DWithin(location, ST_Point(-122.4150, 37.7750), 5000)
),

scored_results AS (
  SELECT 
    nb.*,

    -- Multi-criteria scoring algorithm
    (
      -- Distance component (40% weight) - closer is better
      (1.0 - (distance_meters / 5000.0)) * 0.4 +

      -- Rating component (30% weight)
      (rating / 5.0) * 0.3 +

      -- Service area coverage bonus (20% weight)
      CASE WHEN within_service_area THEN 0.2 ELSE 0 END +

      -- Category preference bonus (10% weight)
      CASE 
        WHEN category = 'restaurant' THEN 0.1
        WHEN category = 'grocery' THEN 0.05
        ELSE 0
      END
    ) as relevance_score,

    -- Categorize distance for user-friendly display
    CASE 
      WHEN distance_meters <= 500 THEN 'Very Close'
      WHEN distance_meters <= 1000 THEN 'Walking Distance'
      WHEN distance_meters <= 2000 THEN 'Short Drive'
      WHEN distance_meters <= 5000 THEN 'Moderate Distance'
      ELSE 'Far'
    END as distance_category,

    -- Convert bearing to compass direction
    CASE 
      WHEN bearing_degrees >= 337.5 OR bearing_degrees < 22.5 THEN 'North'
      WHEN bearing_degrees >= 22.5 AND bearing_degrees < 67.5 THEN 'Northeast'
      WHEN bearing_degrees >= 67.5 AND bearing_degrees < 112.5 THEN 'East'
      WHEN bearing_degrees >= 112.5 AND bearing_degrees < 157.5 THEN 'Southeast'
      WHEN bearing_degrees >= 157.5 AND bearing_degrees < 202.5 THEN 'South'
      WHEN bearing_degrees >= 202.5 AND bearing_degrees < 247.5 THEN 'Southwest'
      WHEN bearing_degrees >= 247.5 AND bearing_degrees < 292.5 THEN 'West'
      ELSE 'Northwest'
    END as direction_compass

  FROM nearby_businesses nb
)

SELECT 
  business_id,
  business_name,
  category,
  address,
  ROUND(distance_meters::NUMERIC, 0) as distance_meters,
  distance_category,
  direction_compass,
  ROUND(bearing_degrees::NUMERIC, 1) as bearing_degrees,
  rating,
  price_level,
  within_service_area,
  ROUND(relevance_score::NUMERIC, 3) as relevance_score,
  location_geojson

FROM scored_results
WHERE 
  distance_meters <= 5000  -- 5km maximum distance
  AND relevance_score >= 0.3  -- Minimum relevance threshold

ORDER BY relevance_score DESC, distance_meters ASC
LIMIT 20;

-- Geofencing and spatial containment analysis
CREATE TABLE geofences (
  fence_id VARCHAR(50) PRIMARY KEY,
  fence_name VARCHAR(200) NOT NULL,
  fence_type VARCHAR(50) NOT NULL,  -- 'delivery_zone', 'pricing_zone', 'restricted_area'
  boundary POLYGON NOT NULL,
  properties JSONB DEFAULT '{}',
  is_active BOOLEAN DEFAULT true,
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"boundary": "2dsphere"}'
);

-- Insert geofence boundaries
INSERT INTO geofences (fence_id, fence_name, fence_type, boundary, properties) VALUES 
  ('downtown_delivery', 'Downtown Delivery Zone', 'delivery_zone', 
   ST_GeomFromGeoJSON('{"type":"Polygon","coordinates":[[[-122.42,-37.77],[-122.40,-37.78],[-122.39,-37.76],[-122.42,-37.77]]]}'),
   '{"delivery_fee": 2.99, "estimated_time": 30}'),
  ('high_demand_pricing', 'Financial District Surge Zone', 'pricing_zone',
   ST_Buffer(ST_Point(-122.4000, 37.7900), 0.01),
   '{"surge_multiplier": 1.5, "peak_hours": ["08:00-10:00", "17:00-19:00"]}');

-- Check which geofences contain a specific location
WITH location_analysis AS (
  SELECT 
    ST_Point(-122.4100, 37.7800) as user_location
),

geofence_containment AS (
  SELECT 
    gf.fence_id,
    gf.fence_name,
    gf.fence_type,
    gf.properties,

    -- Check if user location is within geofence
    ST_Within(la.user_location, gf.boundary) as user_inside_fence,

    -- Calculate distance to geofence boundary
    ST_Distance(la.user_location, ST_Boundary(gf.boundary)) as distance_to_boundary,

    -- Calculate area of geofence
    ST_Area(gf.boundary) as fence_area_sq_degrees

  FROM geofences gf
  CROSS JOIN location_analysis la
  WHERE gf.is_active = true
)

SELECT 
  fence_id,
  fence_name,
  fence_type,
  user_inside_fence,
  CASE 
    WHEN user_inside_fence THEN 'Inside geofence'
    WHEN distance_to_boundary <= 0.001 THEN 'Near boundary'
    ELSE 'Outside geofence'
  END as proximity_status,
  ROUND(distance_to_boundary::NUMERIC * 111000, 0) as distance_to_boundary_meters,
  properties

FROM geofence_containment
WHERE 
  user_inside_fence = true 
  OR distance_to_boundary <= 0.005  -- Within ~500m of boundary

ORDER BY distance_to_boundary ASC;

-- Route optimization and path analysis
CREATE TABLE delivery_routes (
  route_id VARCHAR(50) PRIMARY KEY,
  driver_id INTEGER NOT NULL,
  route_date DATE NOT NULL,
  route_path LINESTRING NOT NULL,  -- Path as LineString geometry
  planned_stops MULTIPOINT NOT NULL,  -- Stop locations as MultiPoint
  total_distance_km DECIMAL(10,3),
  estimated_duration_minutes INTEGER,
  status VARCHAR(20) DEFAULT 'planned',
  created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
) WITH (
  spatial_indexes = '{"route_path": "2dsphere", "planned_stops": "2dsphere"}'
);

-- Analyze route intersections with geofences
WITH route_geofence_analysis AS (
  SELECT 
    dr.route_id,
    dr.driver_id,
    dr.route_date,
    dr.status,

    -- Find intersecting geofences
    ARRAY_AGG(
      CASE 
        WHEN ST_Intersects(dr.route_path, gf.boundary) 
        THEN JSON_BUILD_OBJECT(
          'fence_id', gf.fence_id,
          'fence_type', gf.fence_type,
          'properties', gf.properties
        )
        ELSE NULL
      END
    ) FILTER (WHERE ST_Intersects(dr.route_path, gf.boundary)) as intersecting_geofences,

    -- Calculate route metrics
    ST_Length(dr.route_path) * 111000 as route_length_meters,  -- Convert to meters
    ST_NumPoints(dr.planned_stops) as stop_count,

    -- Check if route passes through restricted areas
    BOOL_OR(
      CASE 
        WHEN gf.fence_type = 'restricted_area' AND ST_Intersects(dr.route_path, gf.boundary)
        THEN true 
        ELSE false 
      END
    ) as passes_through_restricted_area

  FROM delivery_routes dr
  LEFT JOIN geofences gf ON ST_Intersects(dr.route_path, gf.boundary)
  WHERE 
    dr.route_date = CURRENT_DATE
    AND dr.status IN ('planned', 'in_progress')
  GROUP BY dr.route_id, dr.driver_id, dr.route_date, dr.status, dr.route_path, dr.planned_stops
),

route_impact_analysis AS (
  SELECT 
    rga.*,

    -- Calculate impact of geofence intersections
    CASE 
      WHEN passes_through_restricted_area THEN 'Route requires rerouting'
      WHEN ARRAY_LENGTH(intersecting_geofences, 1) > 0 THEN 'Route has cost/time implications'
      ELSE 'Route clear'
    END as route_status,

    -- Estimate cost impact
    COALESCE(
      (
        SELECT SUM(
          CASE 
            WHEN fence->>'fence_type' = 'pricing_zone' 
            THEN (fence->>'properties'->>'surge_multiplier')::NUMERIC - 1
            ELSE 0
          END
        )
        FROM UNNEST(intersecting_geofences) as fence
      ), 0
    ) as estimated_cost_increase_multiplier

  FROM route_geofence_analysis rga
)

SELECT 
  route_id,
  driver_id,
  route_date,
  status,
  ROUND(route_length_meters::NUMERIC, 0) as route_length_meters,
  stop_count,
  route_status,
  passes_through_restricted_area,
  ARRAY_LENGTH(intersecting_geofences, 1) as geofence_intersection_count,
  ROUND(estimated_cost_increase_multiplier::NUMERIC, 2) as cost_multiplier,
  intersecting_geofences

FROM route_impact_analysis
ORDER BY 
  passes_through_restricted_area DESC,
  estimated_cost_increase_multiplier DESC,
  route_length_meters ASC;

-- Spatial analytics and density analysis
CREATE VIEW business_density_analysis AS
WITH spatial_grid AS (
  -- Create analysis grid for density calculation
  SELECT 
    grid_x,
    grid_y,
    ST_MakeBox2D(
      ST_Point(grid_x * 0.01 - 122.5, grid_y * 0.01 + 37.7),
      ST_Point((grid_x + 1) * 0.01 - 122.5, (grid_y + 1) * 0.01 + 37.7)
    ) as grid_cell
  FROM 
    GENERATE_SERIES(0, 50) as grid_x,
    GENERATE_SERIES(0, 50) as grid_y
),

grid_business_counts AS (
  SELECT 
    sg.grid_x,
    sg.grid_y,
    sg.grid_cell,

    -- Count businesses in each grid cell
    COUNT(bl.business_id) as business_count,
    ARRAY_AGG(bl.category) as categories,
    AVG(bl.rating) as avg_rating,

    -- Calculate grid cell center point
    ST_Centroid(sg.grid_cell) as cell_center

  FROM spatial_grid sg
  LEFT JOIN business_locations bl ON ST_Within(bl.location, sg.grid_cell)
  WHERE bl.is_active = true OR bl.business_id IS NULL
  GROUP BY sg.grid_x, sg.grid_y, sg.grid_cell
),

density_analysis AS (
  SELECT 
    gbc.*,

    -- Calculate density metrics
    business_count * 100.0 as businesses_per_km2,  -- Approximate conversion
    ARRAY_LENGTH(ARRAY_REMOVE(categories, NULL), 1) as category_diversity,

    -- Classify density level
    CASE 
      WHEN business_count >= 10 THEN 'high_density'
      WHEN business_count >= 5 THEN 'medium_density'
      WHEN business_count >= 1 THEN 'low_density'
      ELSE 'no_businesses'
    END as density_classification,

    -- Generate GeoJSON for mapping
    ST_AsGeoJSON(cell_center) as center_geojson,
    ST_AsGeoJSON(grid_cell) as cell_boundary_geojson

  FROM grid_business_counts gbc
  WHERE business_count > 0  -- Only include cells with businesses
)

SELECT 
  grid_x,
  grid_y,
  business_count,
  ROUND(businesses_per_km2::NUMERIC, 1) as businesses_per_km2,
  category_diversity,
  density_classification,
  ROUND(avg_rating::NUMERIC, 2) as avg_rating,
  categories,
  center_geojson,
  cell_boundary_geojson

FROM density_analysis
ORDER BY business_count DESC, category_diversity DESC;

-- QueryLeaf provides comprehensive geospatial capabilities:
-- 1. Standard SQL spatial data types (POINT, POLYGON, LINESTRING)
-- 2. Familiar spatial functions (ST_Distance, ST_Within, ST_Buffer, etc.)
-- 3. Geospatial indexing with MongoDB's 2dsphere indexes
-- 4. Complex proximity searches with multi-criteria scoring
-- 5. Geofencing and spatial containment analysis
-- 6. Route optimization and intersection analysis
-- 7. Spatial analytics and density calculations
-- 8. Integration with GeoJSON for web mapping libraries
-- 9. Performance-optimized spatial queries
-- 10. Seamless conversion between SQL spatial syntax and MongoDB operations

Best Practices for Geospatial Implementation

Collection Design and Index Optimization

Essential practices for production geospatial deployments:

  1. Coordinate System: Use WGS84 (EPSG:4326) coordinate system for global compatibility
  2. GeoJSON Standards: Store location data in standard GeoJSON format for interoperability
  3. Index Strategy: Create 2dsphere indexes on location fields for optimal query performance
  4. Compound Indexes: Combine spatial indexes with business logic fields for efficient filtering
  5. Data Validation: Implement proper validation for coordinate ranges and geometry types
  6. Precision Management: Choose appropriate precision levels for coordinate storage and calculations

Performance and Scalability

Optimize geospatial operations for high-throughput location-based applications:

  1. Query Optimization: Use $geoNear for proximity searches with distance-based sorting
  2. Bounding Box Filtering: Apply initial bounding box filters before complex spatial calculations
  3. Aggregation Pipelines: Leverage aggregation frameworks for complex spatial analytics
  4. Caching Strategies: Implement intelligent caching for frequently accessed location data
  5. Data Modeling: Design schemas that align with common geospatial query patterns
  6. Sharding Considerations: Plan geospatial sharding strategies for global applications

Conclusion

MongoDB's native geospatial capabilities provide comprehensive location-based application development features that eliminate the complexity and overhead of traditional GIS database approaches. The combination of efficient spatial indexing, sophisticated query operators, and seamless GeoJSON integration enables high-performance location-aware applications that scale effectively with growing user bases and data volumes.

Key MongoDB Geospatial benefits include:

  • Native GeoJSON Support: Industry-standard spatial data formats with seamless web integration
  • High-Performance Indexing: 2dsphere indexes optimized for spherical geometry calculations
  • Comprehensive Query Operators: Complete set of spatial operations for proximity, intersection, and containment
  • Scalable Architecture: Efficient handling of massive location datasets with intelligent partitioning
  • Real-time Capabilities: Change streams enable immediate geofence and location event processing
  • SQL Compatibility: Familiar spatial query patterns for existing SQL development teams

Whether you're building ride-sharing platforms, delivery logistics systems, real estate applications, location-based social networks, or any geospatial application requiring sophisticated spatial analysis, MongoDB's geospatial features with QueryLeaf's SQL-familiar interface provide the foundation for modern location-based services that remain both powerful and approachable for traditional SQL development teams.

QueryLeaf Integration: QueryLeaf automatically leverages MongoDB's geospatial capabilities while providing familiar SQL spatial functions and syntax. Complex proximity searches, geofencing operations, and spatial analytics are seamlessly accessible through standard SQL spatial constructs, making sophisticated geospatial development both efficient and maintainable for SQL-oriented development teams.

The integration of enterprise-grade geospatial capabilities with SQL-style operations makes MongoDB an ideal platform for location-based applications that require both high-performance spatial processing and familiar development patterns, ensuring your geospatial solutions remain both effective and maintainable as they scale to global deployments.

MongoDB Time Series Collections for IoT and Real-Time Analytics: High-Performance Sensor Data Management and Stream Processing

Modern IoT applications generate massive volumes of time-stamped sensor data that require specialized storage and query optimization strategies. Traditional relational databases struggle with the volume, velocity, and analytical requirements of IoT workloads, particularly when dealing with millions of data points per second from distributed sensor networks, real-time alerting systems, and complex analytical queries across historical time ranges.

MongoDB time series collections provide purpose-built storage and query optimization specifically designed for time-stamped data, offering automatic data organization, specialized compression algorithms, and optimized aggregation pipelines that can handle high-velocity IoT data ingestion while supporting real-time analytics and historical trend analysis at scale.

The IoT Data Challenge

Traditional approaches to storing and analyzing time series data face significant scalability and performance limitations:

-- Traditional PostgreSQL time series approach - performance bottlenecks
CREATE TABLE sensor_readings (
    reading_id BIGSERIAL PRIMARY KEY,
    device_id VARCHAR(50) NOT NULL,
    sensor_type VARCHAR(50) NOT NULL,
    timestamp TIMESTAMPTZ NOT NULL,
    value DECIMAL(15,4) NOT NULL,
    unit VARCHAR(20),
    location_lat DECIMAL(10,8),
    location_lng DECIMAL(11,8),

    -- Basic metadata
    device_status VARCHAR(20) DEFAULT 'online',
    data_quality INTEGER DEFAULT 100,

    CONSTRAINT valid_quality CHECK (data_quality BETWEEN 0 AND 100)
);

-- Partitioning by time (complex maintenance)
CREATE TABLE sensor_readings_2025_01 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');

CREATE TABLE sensor_readings_2025_02 PARTITION OF sensor_readings
FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');

-- Multiple indexes required for different query patterns
CREATE INDEX idx_sensor_device_time ON sensor_readings (device_id, timestamp);
CREATE INDEX idx_sensor_type_time ON sensor_readings (sensor_type, timestamp);
CREATE INDEX idx_sensor_location ON sensor_readings (location_lat, location_lng);
CREATE INDEX idx_sensor_timestamp ON sensor_readings (timestamp);

-- Complex aggregation queries for analytics
WITH hourly_averages AS (
    SELECT 
        device_id,
        sensor_type,
        DATE_TRUNC('hour', timestamp) as hour_bucket,
        AVG(value) as avg_value,
        COUNT(*) as reading_count,
        MIN(value) as min_value,
        MAX(value) as max_value,
        STDDEV(value) as stddev_value
    FROM sensor_readings
    WHERE timestamp >= NOW() - INTERVAL '24 hours'
      AND device_status = 'online'
      AND data_quality > 80
    GROUP BY device_id, sensor_type, DATE_TRUNC('hour', timestamp)
),

device_statistics AS (
    SELECT 
        device_id,
        sensor_type,
        COUNT(*) as total_hours,
        AVG(avg_value) as daily_average,
        MAX(max_value) as daily_peak,
        MIN(min_value) as daily_low,
        AVG(reading_count) as avg_readings_per_hour,

        -- Calculate trend using linear regression approximation
        CASE 
            WHEN COUNT(*) > 1 THEN
                (COUNT(*) * SUM(EXTRACT(EPOCH FROM hour_bucket) * avg_value) - 
                 SUM(EXTRACT(EPOCH FROM hour_bucket)) * SUM(avg_value)) /
                (COUNT(*) * SUM(POWER(EXTRACT(EPOCH FROM hour_bucket), 2)) - 
                 POWER(SUM(EXTRACT(EPOCH FROM hour_bucket)), 2))
            ELSE 0
        END as trend_slope
    FROM hourly_averages
    GROUP BY device_id, sensor_type
)

-- Final aggregation (performance intensive)
SELECT 
    ds.device_id,
    ds.sensor_type,
    ROUND(ds.daily_average, 2) as avg_24h,
    ROUND(ds.daily_peak, 2) as peak_24h,
    ROUND(ds.daily_low, 2) as low_24h,
    ROUND(ds.avg_readings_per_hour, 0) as readings_per_hour,

    -- Trend analysis
    CASE 
        WHEN ds.trend_slope > 0.1 THEN 'rising'
        WHEN ds.trend_slope < -0.1 THEN 'falling'
        ELSE 'stable'
    END as trend_direction,

    -- Alert conditions
    CASE 
        WHEN ds.daily_peak > 100 THEN 'high_alert'
        WHEN ds.daily_low < 10 THEN 'low_alert'
        WHEN ds.avg_readings_per_hour < 5 THEN 'connectivity_alert'
        ELSE 'normal'
    END as alert_status,

    ds.total_hours

FROM device_statistics ds
WHERE ds.total_hours >= 20  -- At least 20 hours of data
ORDER BY ds.device_id, ds.sensor_type;

-- Challenges with traditional time series approaches:
-- 1. Storage overhead - separate tables and partition management
-- 2. Index explosion - multiple indexes needed for various query patterns
-- 3. Query complexity - complex CTEs and window functions for basic analytics
-- 4. Maintenance burden - manual partition creation and cleanup
-- 5. Limited compression - basic storage compression insufficient for time series patterns
-- 6. Scaling bottlenecks - horizontal scaling requires complex sharding strategies
-- 7. Real-time constraints - difficult to optimize for both writes and analytics
-- 8. Data lifecycle management - complex procedures for archiving and cleanup

-- Real-time ingestion performance issues
INSERT INTO sensor_readings (
    device_id, sensor_type, timestamp, value, unit, 
    location_lat, location_lng, device_status, data_quality
)
SELECT 
    'device_' || (random() * 1000)::int,
    CASE (random() * 5)::int 
        WHEN 0 THEN 'temperature'
        WHEN 1 THEN 'humidity'
        WHEN 2 THEN 'pressure'
        WHEN 3 THEN 'light'
        ELSE 'motion'
    END,
    NOW() - (random() * interval '1 hour'),
    random() * 100,
    CASE (random() * 5)::int 
        WHEN 0 THEN 'celsius'
        WHEN 1 THEN 'percent'
        WHEN 2 THEN 'pascal'
        WHEN 3 THEN 'lux'
        ELSE 'boolean'
    END,
    40.7128 + (random() - 0.5) * 0.1,
    -74.0060 + (random() - 0.5) * 0.1,
    CASE WHEN random() > 0.1 THEN 'online' ELSE 'offline' END,
    80 + (random() * 20)::int
FROM generate_series(1, 10000) -- 10K inserts - already showing performance issues
ON CONFLICT DO NOTHING;

-- Problems:
-- 1. Linear performance degradation with data volume
-- 2. Index maintenance overhead during high-velocity writes
-- 3. Lock contention during concurrent analytics and writes
-- 4. Complex query optimization required for time-range queries
-- 5. Storage bloat due to lack of time-series specific compression
-- 6. Difficult to implement real-time alerting efficiently
-- 7. Complex setup for distributed deployments across geographic regions
-- 8. Limited built-in support for time-series specific operations

MongoDB Time Series Collections eliminate these limitations with purpose-built time series capabilities:

// MongoDB Time Series Collections - optimized for IoT and analytics workloads
const { MongoClient, ObjectId } = require('mongodb');

const client = new MongoClient('mongodb://localhost:27017/?replicaSet=rs0');
const db = client.db('iot_platform');

class MongoDBTimeSeriesManager {
  constructor(db) {
    this.db = db;

    // Time series collections with automatic optimization
    this.sensorData = null;
    this.deviceMetrics = null;
    this.analyticsCache = null;

    this.initializeTimeSeriesCollections();
  }

  async initializeTimeSeriesCollections() {
    console.log('Setting up optimized time series collections...');

    try {
      // Primary sensor data collection
      await this.db.createCollection('sensor_readings', {
        timeseries: {
          timeField: 'timestamp',
          metaField: 'device',
          granularity: 'seconds', // Optimal for IoT sensor data
          bucketMaxSpanSeconds: 3600, // 1-hour buckets
          bucketRoundingSeconds: 60 // Round to nearest minute
        },
        expireAfterSeconds: 31536000, // 1 year retention
        storageEngine: { wiredTiger: { configString: 'block_compressor=zstd' } }
      });

      // Device analytics and metrics collection
      await this.db.createCollection('device_metrics', {
        timeseries: {
          timeField: 'timestamp',
          metaField: 'device_info',
          granularity: 'minutes', // Aggregated data points
          bucketMaxSpanSeconds: 86400, // 24-hour buckets for analytics
          bucketRoundingSeconds: 3600 // Round to nearest hour
        },
        expireAfterSeconds: 94608000 // 3 year retention for analytics
      });

      // Real-time analytics cache for dashboard queries
      await this.db.createCollection('analytics_cache', {
        timeseries: {
          timeField: 'computed_at',
          metaField: 'computation_type',
          granularity: 'minutes'
        },
        expireAfterSeconds: 604800 // 1 week cache retention
      });

      this.sensorData = this.db.collection('sensor_readings');
      this.deviceMetrics = this.db.collection('device_metrics');
      this.analyticsCache = this.db.collection('analytics_cache');

      // Create specialized indexes for time series queries
      await this.createOptimizedIndexes();

      console.log('Time series collections initialized successfully');

    } catch (error) {
      console.error('Error initializing time series collections:', error);
      throw error;
    }
  }

  async createOptimizedIndexes() {
    console.log('Creating optimized time series indexes...');

    // Sensor data indexes
    await this.sensorData.createIndexes([
      // Compound index for device + time range queries
      { 
        key: { 'device.id': 1, timestamp: 1 },
        name: 'device_time_optimal'
      },

      // Sensor type + time for analytics
      { 
        key: { 'device.sensor_type': 1, timestamp: 1 },
        name: 'sensor_type_time'
      },

      // Geospatial index for location-based queries
      { 
        key: { 'device.location': '2dsphere' },
        name: 'device_location_geo'
      },

      // Value range index for threshold queries
      { 
        key: { 'measurements.value': 1, timestamp: 1 },
        name: 'value_threshold_time',
        partialFilterExpression: { 'measurements.value': { $exists: true } }
      }
    ]);

    // Device metrics indexes
    await this.deviceMetrics.createIndexes([
      {
        key: { 'device_info.id': 1, timestamp: -1 },
        name: 'device_metrics_latest'
      },

      {
        key: { 'device_info.facility': 1, timestamp: 1 },
        name: 'facility_time_series'
      }
    ]);

    console.log('Time series indexes created successfully');
  }

  async ingestSensorData(deviceId, sensorType, measurements, metadata = {}) {
    const timestamp = new Date();

    try {
      const document = {
        timestamp: timestamp,

        // Metadata field for efficient bucketing
        device: {
          id: deviceId,
          sensor_type: sensorType,
          facility: metadata.facility || 'default',
          zone: metadata.zone || 'unspecified',
          location: metadata.location ? {
            type: 'Point',
            coordinates: [metadata.location.lng, metadata.location.lat]
          } : null,

          // Device characteristics
          model: metadata.model || 'unknown',
          firmware_version: metadata.firmware_version || '1.0',
          installation_date: metadata.installation_date,

          // Network information
          network_info: {
            connection_type: metadata.connection_type || 'wifi',
            signal_strength: metadata.signal_strength || -50,
            gateway_id: metadata.gateway_id
          }
        },

        // Time series measurements
        measurements: this.normalizeMeasurements(measurements),

        // Data quality and status
        quality_metrics: {
          data_quality_score: this.calculateDataQuality(measurements, metadata),
          sensor_health: metadata.sensor_health || 'normal',
          calibration_status: metadata.calibration_status || 'valid',
          measurement_accuracy: metadata.measurement_accuracy || 0.95
        },

        // Processing metadata
        processing_info: {
          ingestion_timestamp: timestamp,
          processing_latency_ms: 0,
          source: metadata.source || 'sensor_direct',
          batch_id: metadata.batch_id,
          schema_version: '2.0'
        }
      };

      const result = await this.sensorData.insertOne(document);

      // Trigger real-time processing if enabled
      if (metadata.enable_realtime_processing !== false) {
        await this.processRealTimeAnalytics(document);
      }

      return {
        success: true,
        documentId: result.insertedId,
        timestamp: timestamp,
        bucketed: true, // Time series collections automatically bucket
        processing_time_ms: Date.now() - timestamp.getTime()
      };

    } catch (error) {
      console.error('Sensor data ingestion failed:', error);
      return {
        success: false,
        error: error.message,
        timestamp: timestamp
      };
    }
  }

  normalizeMeasurements(rawMeasurements) {
    // Normalize different measurement formats into consistent structure
    const normalized = {};

    if (Array.isArray(rawMeasurements)) {
      // Handle array of measurement objects
      rawMeasurements.forEach(measurement => {
        if (measurement.type && measurement.value !== undefined) {
          normalized[measurement.type] = {
            value: Number(measurement.value),
            unit: measurement.unit || '',
            precision: measurement.precision || 2,
            range: measurement.range || { min: null, max: null }
          };
        }
      });
    } else if (typeof rawMeasurements === 'object') {
      // Handle object with measurement properties
      Object.entries(rawMeasurements).forEach(([key, value]) => {
        if (typeof value === 'number') {
          normalized[key] = {
            value: value,
            unit: '',
            precision: 2,
            range: { min: null, max: null }
          };
        } else if (typeof value === 'object' && value.value !== undefined) {
          normalized[key] = {
            value: Number(value.value),
            unit: value.unit || '',
            precision: value.precision || 2,
            range: value.range || { min: null, max: null }
          };
        }
      });
    }

    return normalized;
  }

  calculateDataQuality(measurements, metadata) {
    let qualityScore = 100;

    // Check signal strength impact
    if (metadata.signal_strength < -80) {
      qualityScore -= 20;
    } else if (metadata.signal_strength < -70) {
      qualityScore -= 10;
    }

    // Check measurement consistency
    Object.values(measurements).forEach(measurement => {
      if (typeof measurement === 'object' && measurement.value !== undefined) {
        const value = Number(measurement.value);
        const range = measurement.range;

        if (range && range.min !== null && range.max !== null) {
          if (value < range.min || value > range.max) {
            qualityScore -= 15; // Out of expected range
          }
        }

        // Check for anomalous readings
        if (isNaN(value) || !isFinite(value)) {
          qualityScore -= 30;
        }
      }
    });

    return Math.max(0, qualityScore);
  }

  async processRealTimeAnalytics(document) {
    const deviceId = document.device.id;
    const timestamp = document.timestamp;

    // Real-time threshold monitoring
    await this.checkAlertThresholds(document);

    // Update device status and health metrics
    await this.updateDeviceHealthMetrics(deviceId, document);

    // Calculate rolling averages for dashboard
    await this.updateRollingAverages(deviceId, document);
  }

  async checkAlertThresholds(document) {
    const measurements = document.measurements;
    const deviceId = document.device.id;
    const sensorType = document.device.sensor_type;

    // Define threshold rules (could be stored in configuration collection)
    const thresholds = {
      temperature: { min: -10, max: 60, critical: 80 },
      humidity: { min: 0, max: 100, critical: 95 },
      pressure: { min: 900, max: 1100, critical: 1200 },
      light: { min: 0, max: 100000, critical: 120000 },
      motion: { min: 0, max: 1, critical: null }
    };

    const sensorThresholds = thresholds[sensorType];
    if (!sensorThresholds) return;

    Object.entries(measurements).forEach(async ([measurementType, measurement]) => {
      const value = measurement.value;
      const threshold = sensorThresholds;

      let alertLevel = null;
      let alertMessage = null;

      if (threshold.critical && value > threshold.critical) {
        alertLevel = 'critical';
        alertMessage = `Critical ${measurementType} level: ${value} (threshold: ${threshold.critical})`;
      } else if (value > threshold.max) {
        alertLevel = 'high';
        alertMessage = `High ${measurementType} level: ${value} (max: ${threshold.max})`;
      } else if (value < threshold.min) {
        alertLevel = 'low';
        alertMessage = `Low ${measurementType} level: ${value} (min: ${threshold.min})`;
      }

      if (alertLevel) {
        await this.createAlert({
          device_id: deviceId,
          sensor_type: sensorType,
          measurement_type: measurementType,
          alert_level: alertLevel,
          message: alertMessage,
          value: value,
          threshold: threshold,
          timestamp: document.timestamp,
          location: document.device.location
        });
      }
    });
  }

  async createAlert(alertData) {
    const alertsCollection = this.db.collection('alerts');

    const alert = {
      _id: new ObjectId(),
      ...alertData,
      created_at: new Date(),
      status: 'active',
      acknowledged: false,
      acknowledged_by: null,
      acknowledged_at: null,
      resolved: false,
      resolved_at: null,
      escalation_level: 0,

      // Alert metadata
      correlation_id: `${alertData.device_id}_${alertData.sensor_type}_${alertData.measurement_type}`,
      alert_hash: this.calculateAlertHash(alertData)
    };

    // Check for duplicate recent alerts (deduplication)
    const recentAlert = await alertsCollection.findOne({
      alert_hash: alert.alert_hash,
      created_at: { $gte: new Date(Date.now() - 300000) }, // Last 5 minutes
      status: 'active'
    });

    if (!recentAlert) {
      await alertsCollection.insertOne(alert);

      // Trigger real-time notifications
      await this.sendAlertNotification(alert);
    } else {
      // Update escalation level for repeated alerts
      await alertsCollection.updateOne(
        { _id: recentAlert._id },
        { 
          $inc: { escalation_level: 1 },
          $set: { last_occurrence: new Date() }
        }
      );
    }
  }

  calculateAlertHash(alertData) {
    const crypto = require('crypto');
    const hashString = `${alertData.device_id}:${alertData.sensor_type}:${alertData.measurement_type}:${alertData.alert_level}`;
    return crypto.createHash('md5').update(hashString).digest('hex');
  }

  async sendAlertNotification(alert) {
    // Implementation would integrate with notification systems
    console.log(`ALERT [${alert.alert_level.toUpperCase()}]: ${alert.message}`);

    // Here you would integrate with:
    // - Email/SMS services
    // - Slack/Teams webhooks  
    // - PagerDuty/OpsGenie
    // - Custom notification APIs
  }

  async updateDeviceHealthMetrics(deviceId, document) {
    const now = new Date();

    // Calculate device health score based on multiple factors
    const healthMetrics = {
      timestamp: now,
      device_info: {
        id: deviceId,
        facility: document.device.facility,
        zone: document.device.zone
      },

      health_indicators: {
        data_quality_score: document.quality_metrics.data_quality_score,
        signal_strength: document.device.network_info.signal_strength,
        sensor_health: document.quality_metrics.sensor_health,
        measurement_frequency: await this.calculateMeasurementFrequency(deviceId),
        last_communication: now,

        // Calculated health score
        overall_health_score: this.calculateOverallHealthScore(document),

        // Status indicators
        is_online: true,
        is_responsive: true,
        calibration_valid: document.quality_metrics.calibration_status === 'valid'
      },

      performance_metrics: {
        uptime_percentage: await this.calculateUptimePercentage(deviceId),
        average_response_time_ms: document.processing_info.processing_latency_ms,
        data_completeness_percentage: 100, // Could be calculated based on expected vs actual measurements
        error_rate_percentage: 0 // Could be calculated from failed measurements
      }
    };

    await this.deviceMetrics.insertOne(healthMetrics);
  }

  calculateOverallHealthScore(document) {
    let score = 100;

    // Factor in data quality
    score = score * (document.quality_metrics.data_quality_score / 100);

    // Factor in signal strength
    const signalStrength = document.device.network_info.signal_strength;
    if (signalStrength < -80) {
      score *= 0.8;
    } else if (signalStrength < -70) {
      score *= 0.9;
    }

    // Factor in sensor health
    if (document.quality_metrics.sensor_health !== 'normal') {
      score *= 0.7;
    }

    return Math.round(score);
  }

  async calculateMeasurementFrequency(deviceId, windowMinutes = 60) {
    const windowStart = new Date(Date.now() - windowMinutes * 60 * 1000);

    const count = await this.sensorData.countDocuments({
      'device.id': deviceId,
      timestamp: { $gte: windowStart }
    });

    return count / windowMinutes; // Measurements per minute
  }

  async calculateUptimePercentage(deviceId, windowHours = 24) {
    const windowStart = new Date(Date.now() - windowHours * 60 * 60 * 1000);

    // Get expected measurement intervals (assuming every minute)
    const expectedMeasurements = windowHours * 60;

    const actualMeasurements = await this.sensorData.countDocuments({
      'device.id': deviceId,
      timestamp: { $gte: windowStart }
    });

    return Math.min(100, (actualMeasurements / expectedMeasurements) * 100);
  }

  async updateRollingAverages(deviceId, document) {
    // Update cached analytics for dashboard performance
    const measurementTypes = Object.keys(document.measurements);

    for (const measurementType of measurementTypes) {
      const value = document.measurements[measurementType].value;

      // Calculate rolling averages for different time windows
      const timeWindows = [
        { name: '5min', minutes: 5 },
        { name: '1hour', minutes: 60 },
        { name: '24hour', minutes: 1440 }
      ];

      for (const window of timeWindows) {
        await this.updateWindowAverage(deviceId, measurementType, value, window);
      }
    }
  }

  async updateWindowAverage(deviceId, measurementType, currentValue, window) {
    const windowStart = new Date(Date.now() - window.minutes * 60 * 1000);

    // Calculate average for the time window using aggregation
    const pipeline = [
      {
        $match: {
          'device.id': deviceId,
          timestamp: { $gte: windowStart },
          [`measurements.${measurementType}`]: { $exists: true }
        }
      },
      {
        $group: {
          _id: null,
          average: { $avg: `$measurements.${measurementType}.value` },
          count: { $sum: 1 },
          min: { $min: `$measurements.${measurementType}.value` },
          max: { $max: `$measurements.${measurementType}.value` },
          stddev: { $stdDevPop: `$measurements.${measurementType}.value` }
        }
      }
    ];

    const result = await this.sensorData.aggregate(pipeline).next();

    if (result) {
      const cacheDocument = {
        computed_at: new Date(),
        computation_type: {
          type: 'rolling_average',
          device_id: deviceId,
          measurement_type: measurementType,
          window: window.name
        },

        statistics: {
          average: result.average,
          count: result.count,
          min: result.min,
          max: result.max,
          stddev: result.stddev || 0,
          current_value: currentValue,

          // Trend calculation
          trend: currentValue > result.average ? 'rising' : 
                 currentValue < result.average ? 'falling' : 'stable',
          deviation_percentage: Math.abs((currentValue - result.average) / result.average * 100)
        }
      };

      await this.analyticsCache.replaceOne(
        { 
          'computation_type.type': 'rolling_average',
          'computation_type.device_id': deviceId,
          'computation_type.measurement_type': measurementType,
          'computation_type.window': window.name
        },
        cacheDocument,
        { upsert: true }
      );
    }
  }

  async getDeviceAnalytics(deviceId, options = {}) {
    const timeRange = options.timeRange || '24h';
    const measurementTypes = options.measurementTypes || null;
    const includeAggregations = options.includeAggregations !== false;

    try {
      // Parse time range
      const timeRangeMs = this.parseTimeRange(timeRange);
      const startTime = new Date(Date.now() - timeRangeMs);

      // Build aggregation pipeline
      const pipeline = [
        {
          $match: {
            'device.id': deviceId,
            timestamp: { $gte: startTime }
          }
        }
      ];

      // Add measurement type filtering if specified
      if (measurementTypes && measurementTypes.length > 0) {
        const measurementFilters = {};
        measurementTypes.forEach(type => {
          measurementFilters[`measurements.${type}`] = { $exists: true };
        });
        pipeline.push({ $match: { $or: Object.entries(measurementFilters).map(([key, value]) => ({ [key]: value })) } });
      }

      if (includeAggregations) {
        // Add aggregation stages for comprehensive analytics
        pipeline.push(
          {
            $addFields: {
              hour_bucket: {
                $dateTrunc: { date: '$timestamp', unit: 'hour' }
              }
            }
          },
          {
            $group: {
              _id: {
                hour: '$hour_bucket',
                sensor_type: '$device.sensor_type'
              },

              // Time and count metrics
              measurement_count: { $sum: 1 },
              first_measurement: { $min: '$timestamp' },
              last_measurement: { $max: '$timestamp' },

              // Data quality metrics
              avg_quality_score: { $avg: '$quality_metrics.data_quality_score' },
              min_quality_score: { $min: '$quality_metrics.data_quality_score' },

              // Network metrics
              avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

              // Measurement statistics (dynamic based on available measurements)
              measurements: { $push: '$measurements' }
            }
          },
          {
            $addFields: {
              // Process measurements to calculate statistics for each type
              measurement_stats: {
                $reduce: {
                  input: '$measurements',
                  initialValue: {},
                  in: {
                    $mergeObjects: [
                      '$$value',
                      {
                        $arrayToObject: {
                          $map: {
                            input: { $objectToArray: '$$this' },
                            in: {
                              k: '$$this.k',
                              v: {
                                values: { $concatArrays: [{ $ifNull: [{ $getField: { field: 'values', input: { $getField: { field: '$$this.k', input: '$$value' } } } }, []] }, ['$$this.v.value']] },
                                unit: '$$this.v.unit'
                              }
                            }
                          }
                        }
                      }
                    ]
                  }
                }
              }
            }
          },
          {
            $addFields: {
              // Calculate final statistics for each measurement type
              final_measurement_stats: {
                $arrayToObject: {
                  $map: {
                    input: { $objectToArray: '$measurement_stats' },
                    in: {
                      k: '$$this.k',
                      v: {
                        count: { $size: '$$this.v.values' },
                        average: { $avg: '$$this.v.values' },
                        min: { $min: '$$this.v.values' },
                        max: { $max: '$$this.v.values' },
                        stddev: { $stdDevPop: '$$this.v.values' },
                        unit: '$$this.v.unit'
                      }
                    }
                  }
                }
              }
            }
          },
          {
            $sort: { '_id.hour': 1 }
          }
        );
      } else {
        // Simple data retrieval without aggregations
        pipeline.push(
          {
            $sort: { timestamp: -1 }
          },
          {
            $limit: options.limit || 1000
          }
        );
      }

      const results = await this.sensorData.aggregate(pipeline).toArray();

      // Get cached analytics for quick dashboard metrics
      const cachedAnalytics = await this.getCachedAnalytics(deviceId, measurementTypes);

      return {
        success: true,
        device_id: deviceId,
        time_range: timeRange,
        query_timestamp: new Date(),
        data_points: results,
        cached_analytics: cachedAnalytics,

        summary: {
          total_measurements: results.reduce((sum, item) => sum + (item.measurement_count || 1), 0),
          time_span: {
            start: startTime,
            end: new Date()
          },
          measurement_types: measurementTypes || 'all'
        }
      };

    } catch (error) {
      console.error('Error retrieving device analytics:', error);
      return {
        success: false,
        error: error.message,
        device_id: deviceId
      };
    }
  }

  async getCachedAnalytics(deviceId, measurementTypes = null) {
    const query = {
      'computation_type.device_id': deviceId
    };

    if (measurementTypes && measurementTypes.length > 0) {
      query['computation_type.measurement_type'] = { $in: measurementTypes };
    }

    const cachedResults = await this.analyticsCache.find(query)
      .sort({ computed_at: -1 })
      .toArray();

    // Organize cached results by measurement type and window
    const organized = {};

    cachedResults.forEach(result => {
      const measurementType = result.computation_type.measurement_type;
      const window = result.computation_type.window;

      if (!organized[measurementType]) {
        organized[measurementType] = {};
      }

      organized[measurementType][window] = {
        ...result.statistics,
        computed_at: result.computed_at
      };
    });

    return organized;
  }

  parseTimeRange(timeRange) {
    const ranges = {
      '5m': 5 * 60 * 1000,
      '15m': 15 * 60 * 1000,
      '1h': 60 * 60 * 1000,
      '6h': 6 * 60 * 60 * 1000,
      '24h': 24 * 60 * 60 * 1000,
      '7d': 7 * 24 * 60 * 60 * 1000,
      '30d': 30 * 24 * 60 * 60 * 1000
    };

    return ranges[timeRange] || ranges['24h'];
  }

  async getFacilityOverview(facility, options = {}) {
    const timeRange = options.timeRange || '24h';
    const timeRangeMs = this.parseTimeRange(timeRange);
    const startTime = new Date(Date.now() - timeRangeMs);

    try {
      const pipeline = [
        {
          $match: {
            'device.facility': facility,
            timestamp: { $gte: startTime }
          }
        },
        {
          $group: {
            _id: {
              device_id: '$device.id',
              zone: '$device.zone',
              sensor_type: '$device.sensor_type'
            },

            latest_timestamp: { $max: '$timestamp' },
            measurement_count: { $sum: 1 },
            avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
            avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

            // Latest measurements for current values
            latest_measurements: { $last: '$measurements' },

            // Device info
            device_info: { $last: '$device' },
            latest_quality_metrics: { $last: '$quality_metrics' }
          }
        },
        {
          $addFields: {
            // Calculate device status
            device_status: {
              $switch: {
                branches: [
                  {
                    case: { $lt: ['$latest_timestamp', { $subtract: [new Date(), 300000] }] }, // 5 minutes
                    then: 'offline'
                  },
                  {
                    case: { $lt: ['$avg_data_quality', 50] },
                    then: 'degraded'
                  },
                  {
                    case: { $lt: ['$avg_signal_strength', -80] },
                    then: 'poor_connectivity'
                  }
                ],
                default: 'online'
              }
            },

            // Time since last measurement
            minutes_since_last_measurement: {
              $divide: [
                { $subtract: [new Date(), '$latest_timestamp'] },
                60000
              ]
            }
          }
        },
        {
          $group: {
            _id: '$_id.zone',

            device_count: { $sum: 1 },

            // Status distribution
            online_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'online'] }, 1, 0] }
            },
            offline_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'offline'] }, 1, 0] }
            },
            degraded_devices: {
              $sum: { $cond: [{ $eq: ['$device_status', 'degraded'] }, 1, 0] }
            },

            // Performance metrics
            avg_data_quality: { $avg: '$avg_data_quality' },
            avg_signal_strength: { $avg: '$avg_signal_strength' },
            total_measurements: { $sum: '$measurement_count' },

            // Sensor type distribution
            sensor_types: { $addToSet: '$_id.sensor_type' },

            // Device details
            devices: {
              $push: {
                device_id: '$_id.device_id',
                sensor_type: '$_id.sensor_type',
                status: '$device_status',
                last_seen: '$latest_timestamp',
                data_quality: '$avg_data_quality',
                signal_strength: '$avg_signal_strength',
                measurement_count: '$measurement_count',
                latest_measurements: '$latest_measurements'
              }
            }
          }
        },
        {
          $sort: { '_id': 1 }
        }
      ];

      const results = await this.sensorData.aggregate(pipeline).toArray();

      // Calculate facility-wide statistics
      const facilityStats = results.reduce((stats, zone) => {
        stats.total_devices += zone.device_count;
        stats.total_online += zone.online_devices;
        stats.total_offline += zone.offline_devices;
        stats.total_degraded += zone.degraded_devices;
        stats.total_measurements += zone.total_measurements;

        stats.avg_data_quality = (stats.avg_data_quality * stats.zones_processed + zone.avg_data_quality) / (stats.zones_processed + 1);
        stats.avg_signal_strength = (stats.avg_signal_strength * stats.zones_processed + zone.avg_signal_strength) / (stats.zones_processed + 1);
        stats.zones_processed += 1;

        // Collect unique sensor types
        zone.sensor_types.forEach(type => {
          if (!stats.sensor_types.includes(type)) {
            stats.sensor_types.push(type);
          }
        });

        return stats;
      }, {
        total_devices: 0,
        total_online: 0,
        total_offline: 0,
        total_degraded: 0,
        total_measurements: 0,
        avg_data_quality: 0,
        avg_signal_strength: 0,
        sensor_types: [],
        zones_processed: 0
      });

      // Calculate health score
      const healthScore = Math.round(
        (facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 0.6 +
        (facilityStats.avg_data_quality / 100) * 0.3 +
        ((facilityStats.avg_signal_strength + 100) / 50) * 0.1
      );

      return {
        success: true,
        facility: facility,
        time_range: timeRange,
        generated_at: new Date(),

        facility_overview: {
          total_devices: facilityStats.total_devices,
          online_devices: facilityStats.total_online,
          offline_devices: facilityStats.total_offline,
          degraded_devices: facilityStats.total_degraded,

          uptime_percentage: Math.round((facilityStats.total_online / Math.max(facilityStats.total_devices, 1)) * 100),
          avg_data_quality: Math.round(facilityStats.avg_data_quality),
          avg_signal_strength: Math.round(facilityStats.avg_signal_strength),
          facility_health_score: healthScore,

          sensor_types: facilityStats.sensor_types,
          total_measurements_today: facilityStats.total_measurements,

          zones: results
        }
      };

    } catch (error) {
      console.error('Error generating facility overview:', error);
      return {
        success: false,
        error: error.message,
        facility: facility
      };
    }
  }

  async performBatchIngestion(batchData, options = {}) {
    const batchSize = options.batchSize || 1000;
    const enableValidation = options.enableValidation !== false;
    const startTime = Date.now();

    console.log(`Starting batch ingestion of ${batchData.length} records...`);

    try {
      const results = {
        total_records: batchData.length,
        processed_records: 0,
        failed_records: 0,
        batches_processed: 0,
        processing_time_ms: 0,
        errors: []
      };

      // Process in batches for optimal performance
      for (let i = 0; i < batchData.length; i += batchSize) {
        const batch = batchData.slice(i, i + batchSize);
        const batchStartTime = Date.now();

        // Prepare documents for insertion
        const documents = batch.map(record => {
          try {
            if (enableValidation) {
              this.validateBatchRecord(record);
            }

            return {
              timestamp: new Date(record.timestamp),

              device: {
                id: record.device_id,
                sensor_type: record.sensor_type,
                facility: record.facility || 'unknown',
                zone: record.zone || 'unspecified',
                location: record.location ? {
                  type: 'Point',
                  coordinates: [record.location.lng, record.location.lat]
                } : null,
                model: record.device_model || 'unknown',
                firmware_version: record.firmware_version || '1.0',
                network_info: {
                  connection_type: record.connection_type || 'unknown',
                  signal_strength: record.signal_strength || -50,
                  gateway_id: record.gateway_id
                }
              },

              measurements: this.normalizeMeasurements(record.measurements || record.values),

              quality_metrics: {
                data_quality_score: record.data_quality_score || 95,
                sensor_health: record.sensor_health || 'normal',
                calibration_status: record.calibration_status || 'valid',
                measurement_accuracy: record.measurement_accuracy || 0.95
              },

              processing_info: {
                ingestion_timestamp: new Date(),
                processing_latency_ms: 0,
                source: 'batch_import',
                batch_id: options.batchId || `batch_${Date.now()}`,
                schema_version: '2.0'
              }
            };
          } catch (validationError) {
            results.errors.push({
              record_index: i + batch.indexOf(record),
              error: validationError.message,
              record: record
            });
            return null;
          }
        }).filter(doc => doc !== null);

        if (documents.length > 0) {
          try {
            await this.sensorData.insertMany(documents, { 
              ordered: false,
              writeConcern: { w: 'majority', j: true }
            });

            results.processed_records += documents.length;
          } catch (insertError) {
            results.failed_records += documents.length;
            results.errors.push({
              batch_index: results.batches_processed,
              error: insertError.message,
              documents_count: documents.length
            });
          }
        }

        results.batches_processed += 1;
        const batchTime = Date.now() - batchStartTime;

        console.log(`Batch ${results.batches_processed} processed: ${documents.length} records in ${batchTime}ms`);
      }

      results.processing_time_ms = Date.now() - startTime;
      results.success_rate = (results.processed_records / results.total_records) * 100;
      results.throughput_records_per_second = Math.round(results.processed_records / (results.processing_time_ms / 1000));

      console.log(`Batch ingestion completed: ${results.processed_records}/${results.total_records} records processed in ${results.processing_time_ms}ms`);

      return {
        success: true,
        results: results
      };

    } catch (error) {
      console.error('Batch ingestion failed:', error);
      return {
        success: false,
        error: error.message,
        processing_time_ms: Date.now() - startTime
      };
    }
  }

  validateBatchRecord(record) {
    if (!record.device_id) {
      throw new Error('device_id is required');
    }

    if (!record.sensor_type) {
      throw new Error('sensor_type is required');
    }

    if (!record.timestamp) {
      throw new Error('timestamp is required');
    }

    if (!record.measurements && !record.values) {
      throw new Error('measurements or values are required');
    }

    // Validate timestamp format
    const timestamp = new Date(record.timestamp);
    if (isNaN(timestamp.getTime())) {
      throw new Error('Invalid timestamp format');
    }

    // Validate timestamp is not in the future
    if (timestamp > new Date()) {
      throw new Error('Timestamp cannot be in the future');
    }

    // Validate timestamp is not too old (more than 1 year)
    const oneYearAgo = new Date(Date.now() - 365 * 24 * 60 * 60 * 1000);
    if (timestamp < oneYearAgo) {
      throw new Error('Timestamp too old (more than 1 year)');
    }
  }
}

module.exports = { MongoDBTimeSeriesManager };

Advanced Time Series Analytics Patterns

Real-Time Aggregation Pipelines

Implement sophisticated real-time analytics using MongoDB aggregation frameworks:

// Advanced analytics and alerting system
class TimeSeriesAnalyticsEngine {
  constructor(timeSeriesManager) {
    this.tsManager = timeSeriesManager;
    this.db = timeSeriesManager.db;
    this.alertRules = new Map();
    this.analyticsCache = new Map();
  }

  async createAdvancedAnalyticsPipeline(analysisConfig) {
    const {
      deviceFilter = {},
      timeRange = '24h',
      aggregationLevel = 'hour',
      analysisTypes = ['trend', 'anomaly', 'correlation'],
      realTimeEnabled = true
    } = analysisConfig;

    try {
      const timeRangeMs = this.tsManager.parseTimeRange(timeRange);
      const startTime = new Date(Date.now() - timeRangeMs);

      // Build comprehensive analytics pipeline
      const pipeline = [
        // Stage 1: Filter data by time and device criteria
        {
          $match: {
            timestamp: { $gte: startTime },
            ...this.buildDeviceFilter(deviceFilter)
          }
        },

        // Stage 2: Add time bucketing for aggregation
        {
          $addFields: {
            time_bucket: this.getTimeBucketExpression(aggregationLevel),
            hour_of_day: { $hour: '$timestamp' },
            day_of_week: { $dayOfWeek: '$timestamp' },
            is_business_hours: {
              $and: [
                { $gte: [{ $hour: '$timestamp' }, 8] },
                { $lte: [{ $hour: '$timestamp' }, 18] }
              ]
            }
          }
        },

        // Stage 3: Unwind measurements for individual analysis
        {
          $addFields: {
            measurement_array: {
              $objectToArray: '$measurements'
            }
          }
        },

        {
          $unwind: '$measurement_array'
        },

        // Stage 4: Group by time bucket, device, and measurement type
        {
          $group: {
            _id: {
              time_bucket: '$time_bucket',
              device_id: '$device.id',
              measurement_type: '$measurement_array.k',
              facility: '$device.facility',
              zone: '$device.zone'
            },

            // Statistical aggregations
            count: { $sum: 1 },
            avg_value: { $avg: '$measurement_array.v.value' },
            min_value: { $min: '$measurement_array.v.value' },
            max_value: { $max: '$measurement_array.v.value' },
            sum_value: { $sum: '$measurement_array.v.value' },
            stddev_value: { $stdDevPop: '$measurement_array.v.value' },

            // Data quality metrics
            avg_data_quality: { $avg: '$quality_metrics.data_quality_score' },
            min_data_quality: { $min: '$quality_metrics.data_quality_score' },

            // Network performance
            avg_signal_strength: { $avg: '$device.network_info.signal_strength' },

            // Time-based metrics
            first_timestamp: { $min: '$timestamp' },
            last_timestamp: { $max: '$timestamp' },

            // Business context
            business_hours_readings: {
              $sum: { $cond: ['$is_business_hours', 1, 0] }
            },

            // Value arrays for advanced calculations
            values: { $push: '$measurement_array.v.value' },
            timestamps: { $push: '$timestamp' },

            // Metadata
            unit: { $last: '$measurement_array.v.unit' },
            device_info: { $last: '$device' }
          }
        },

        // Stage 5: Calculate advanced metrics
        {
          $addFields: {
            // Variance and coefficient of variation
            variance: { $pow: ['$stddev_value', 2] },
            coefficient_of_variation: {
              $cond: [
                { $ne: ['$avg_value', 0] },
                { $divide: ['$stddev_value', '$avg_value'] },
                0
              ]
            },

            // Range and percentiles (approximated)
            value_range: { $subtract: ['$max_value', '$min_value'] },

            // Data completeness
            expected_readings: {
              $divide: [
                { $subtract: ['$last_timestamp', '$first_timestamp'] },
                { $multiply: [this.getExpectedInterval(aggregationLevel), 1000] }
              ]
            },

            // Time span coverage
            time_span_hours: {
              $divide: [
                { $subtract: ['$last_timestamp', '$first_timestamp'] },
                3600000
              ]
            },

            // Business hours coverage
            business_hours_percentage: {
              $multiply: [
                { $divide: ['$business_hours_readings', '$count'] },
                100
              ]
            }
          }
        },

        // Stage 6: Calculate trend indicators
        {
          $addFields: {
            // Simple trend approximation
            trend_direction: {
              $switch: {
                branches: [
                  {
                    case: { $gt: ['$coefficient_of_variation', 0.5] },
                    then: 'highly_variable'
                  },
                  {
                    case: { $gt: ['$max_value', { $multiply: ['$avg_value', 1.2] }] },
                    then: 'trending_high'
                  },
                  {
                    case: { $lt: ['$min_value', { $multiply: ['$avg_value', 0.8] }] },
                    then: 'trending_low'
                  }
                ],
                default: 'stable'
              }
            },

            // Anomaly detection flags
            anomaly_indicators: {
              $let: {
                vars: {
                  three_sigma_upper: { $add: ['$avg_value', { $multiply: ['$stddev_value', 3] }] },
                  three_sigma_lower: { $subtract: ['$avg_value', { $multiply: ['$stddev_value', 3] }] }
                },
                in: {
                  has_outliers: {
                    $or: [
                      { $gt: ['$max_value', '$$three_sigma_upper'] },
                      { $lt: ['$min_value', '$$three_sigma_lower'] }
                    ]
                  },
                  outlier_percentage: {
                    $multiply: [
                      {
                        $divide: [
                          {
                            $size: {
                              $filter: {
                                input: '$values',
                                cond: {
                                  $or: [
                                    { $gt: ['$$this', '$$three_sigma_upper'] },
                                    { $lt: ['$$this', '$$three_sigma_lower'] }
                                  ]
                                }
                              }
                            }
                          },
                          '$count'
                        ]
                      },
                      100
                    ]
                  }
                }
              }
            },

            // Performance indicators
            performance_score: {
              $multiply: [
                // Data quality component (40%)
                { $multiply: [{ $divide: ['$avg_data_quality', 100] }, 0.4] },

                // Connectivity component (30%)
                { $multiply: [{ $divide: [{ $add: ['$avg_signal_strength', 100] }, 50] }, 0.3] },

                // Completeness component (30%)
                { $multiply: [{ $min: [{ $divide: ['$count', '$expected_readings'] }, 1] }, 0.3] },

                100
              ]
            }
          }
        },

        // Stage 7: Add comparative context
        {
          $lookup: {
            from: 'sensor_readings',
            let: {
              device_id: '$_id.device_id',
              measurement_type: '$_id.measurement_type',
              current_start: '$first_timestamp'
            },
            pipeline: [
              {
                $match: {
                  $expr: {
                    $and: [
                      { $eq: ['$device.id', '$$device_id'] },
                      { $lt: ['$timestamp', '$$current_start'] },
                      { $gte: ['$timestamp', { $subtract: ['$$current_start', timeRangeMs] }] }
                    ]
                  }
                }
              },
              {
                $group: {
                  _id: null,
                  historical_avg: { $avg: { $getField: { field: '$$measurement_type', input: '$measurements' } } }
                }
              }
            ],
            as: 'historical_context'
          }
        },

        // Stage 8: Final calculations and categorization
        {
          $addFields: {
            // Historical comparison
            historical_avg: {
              $ifNull: [
                { $arrayElemAt: ['$historical_context.historical_avg', 0] },
                '$avg_value'
              ]
            },

            // Calculate change from historical baseline
            historical_change_percentage: {
              $let: {
                vars: {
                  historical: {
                    $ifNull: [
                      { $arrayElemAt: ['$historical_context.historical_avg', 0] },
                      '$avg_value'
                    ]
                  }
                },
                in: {
                  $cond: [
                    { $ne: ['$$historical', 0] },
                    {
                      $multiply: [
                        { $divide: [{ $subtract: ['$avg_value', '$$historical'] }, '$$historical'] },
                        100
                      ]
                    },
                    0
                  ]
                }
              }
            },

            // Overall health assessment
            health_status: {
              $switch: {
                branches: [
                  {
                    case: { $lt: ['$performance_score', 50] },
                    then: 'critical'
                  },
                  {
                    case: { $lt: ['$performance_score', 70] },
                    then: 'warning'
                  },
                  {
                    case: { $gt: ['$anomaly_indicators.outlier_percentage', 10] },
                    then: 'anomalous'
                  }
                ],
                default: 'healthy'
              }
            },

            // Analysis timestamp
            analyzed_at: new Date(),
            analysis_duration_ms: { $subtract: [new Date(), startTime] }
          }
        },

        // Stage 9: Sort by relevance
        {
          $sort: {
            performance_score: 1, // Worst performers first
            'anomaly_indicators.outlier_percentage': -1,
            '_id.time_bucket': -1
          }
        }
      ];

      const results = await this.tsManager.sensorData.aggregate(pipeline).toArray();

      // Process results for real-time actions
      if (realTimeEnabled) {
        await this.processAnalyticsForAlerts(results);
      }

      // Cache results for dashboard performance
      const cacheKey = this.generateAnalyticsCacheKey(analysisConfig);
      this.analyticsCache.set(cacheKey, {
        results: results,
        generated_at: new Date(),
        config: analysisConfig
      });

      return {
        success: true,
        analysis_config: analysisConfig,
        results: results,
        summary: this.generateAnalyticsSummary(results),
        generated_at: new Date(),
        cache_key: cacheKey
      };

    } catch (error) {
      console.error('Advanced analytics pipeline failed:', error);
      return {
        success: false,
        error: error.message,
        analysis_config: analysisConfig
      };
    }
  }

  buildDeviceFilter(deviceFilter) {
    const filter = {};

    if (deviceFilter.device_ids && deviceFilter.device_ids.length > 0) {
      filter['device.id'] = { $in: deviceFilter.device_ids };
    }

    if (deviceFilter.facilities && deviceFilter.facilities.length > 0) {
      filter['device.facility'] = { $in: deviceFilter.facilities };
    }

    if (deviceFilter.zones && deviceFilter.zones.length > 0) {
      filter['device.zone'] = { $in: deviceFilter.zones };
    }

    if (deviceFilter.sensor_types && deviceFilter.sensor_types.length > 0) {
      filter['device.sensor_type'] = { $in: deviceFilter.sensor_types };
    }

    if (deviceFilter.location_radius) {
      const { center, radius_meters } = deviceFilter.location_radius;
      filter['device.location'] = {
        $geoWithin: {
          $centerSphere: [[center.lng, center.lat], radius_meters / 6378137] // Earth radius in meters
        }
      };
    }

    return filter;
  }

  getTimeBucketExpression(aggregationLevel) {
    const buckets = {
      minute: { $dateTrunc: { date: '$timestamp', unit: 'minute' } },
      hour: { $dateTrunc: { date: '$timestamp', unit: 'hour' } },
      day: { $dateTrunc: { date: '$timestamp', unit: 'day' } },
      week: {
        $dateAdd: {
          startDate: { $dateTrunc: { date: '$timestamp', unit: 'week', startOfWeek: 'monday' } },
          unit: 'day',
          amount: 0
        }
      },
      month: { $dateTrunc: { date: '$timestamp', unit: 'month' } }
    };

    return buckets[aggregationLevel] || buckets.hour;
  }

  getExpectedInterval(aggregationLevel) {
    const intervals = {
      minute: 60,     // 60 seconds
      hour: 3600,     // 3600 seconds  
      day: 86400,     // 86400 seconds
      week: 604800,   // 604800 seconds
      month: 2592000  // ~30 days in seconds
    };

    return intervals[aggregationLevel] || intervals.hour;
  }

  async processAnalyticsForAlerts(analyticsResults) {
    for (const result of analyticsResults) {
      // Check for alert conditions
      if (result.health_status === 'critical') {
        await this.createAnalyticsAlert('critical_performance', result);
      }

      if (result.anomaly_indicators.outlier_percentage > 15) {
        await this.createAnalyticsAlert('anomaly_detected', result);
      }

      if (Math.abs(result.historical_change_percentage) > 50) {
        await this.createAnalyticsAlert('significant_trend_change', result);
      }

      if (result.performance_score < 30) {
        await this.createAnalyticsAlert('poor_performance', result);
      }
    }
  }

  async createAnalyticsAlert(alertType, analyticsData) {
    const alertsCollection = this.db.collection('analytics_alerts');

    const alert = {
      _id: new ObjectId(),
      alert_type: alertType,
      device_id: analyticsData._id.device_id,
      measurement_type: analyticsData._id.measurement_type,
      facility: analyticsData._id.facility,
      zone: analyticsData._id.zone,

      // Alert details
      severity: this.calculateAlertSeverity(alertType, analyticsData),
      description: this.generateAlertDescription(alertType, analyticsData),

      // Analytics context
      analytics_data: {
        time_bucket: analyticsData._id.time_bucket,
        performance_score: analyticsData.performance_score,
        health_status: analyticsData.health_status,
        anomaly_indicators: analyticsData.anomaly_indicators,
        historical_change_percentage: analyticsData.historical_change_percentage,
        avg_value: analyticsData.avg_value,
        trend_direction: analyticsData.trend_direction
      },

      // Timestamps
      created_at: new Date(),
      acknowledged: false,
      resolved: false
    };

    await alertsCollection.insertOne(alert);
    console.log(`Analytics Alert Created: ${alertType} for device ${analyticsData._id.device_id}`);
  }

  calculateAlertSeverity(alertType, analyticsData) {
    switch (alertType) {
      case 'critical_performance':
        return analyticsData.performance_score < 20 ? 'critical' : 'high';

      case 'anomaly_detected':
        return analyticsData.anomaly_indicators.outlier_percentage > 25 ? 'high' : 'medium';

      case 'significant_trend_change':
        return Math.abs(analyticsData.historical_change_percentage) > 100 ? 'high' : 'medium';

      case 'poor_performance':
        return analyticsData.performance_score < 20 ? 'high' : 'medium';

      default:
        return 'medium';
    }
  }

  generateAlertDescription(alertType, analyticsData) {
    const device = analyticsData._id.device_id;
    const measurement = analyticsData._id.measurement_type;

    switch (alertType) {
      case 'critical_performance':
        return `Critical performance degradation detected for ${device} ${measurement} sensor. Performance score: ${Math.round(analyticsData.performance_score)}%`;

      case 'anomaly_detected':
        return `Anomalous readings detected for ${device} ${measurement}. ${Math.round(analyticsData.anomaly_indicators.outlier_percentage)}% of readings are outliers`;

      case 'significant_trend_change':
        return `Significant trend change for ${device} ${measurement}. ${Math.round(analyticsData.historical_change_percentage)}% change from historical baseline`;

      case 'poor_performance':
        return `Poor performance detected for ${device} ${measurement}. Performance score: ${Math.round(analyticsData.performance_score)}%`;

      default:
        return `Analytics alert for ${device} ${measurement}`;
    }
  }

  generateAnalyticsSummary(results) {
    if (results.length === 0) {
      return { total_devices: 0, total_measurements: 0 };
    }

    const summary = {
      total_analyses: results.length,
      unique_devices: new Set(results.map(r => r._id.device_id)).size,
      unique_measurements: new Set(results.map(r => r._id.measurement_type)).size,
      unique_facilities: new Set(results.map(r => r._id.facility)).size,

      // Health distribution
      health_status_distribution: {},

      // Performance metrics
      avg_performance_score: 0,
      min_performance_score: 100,
      max_performance_score: 0,

      // Anomaly statistics
      anomalous_analyses: 0,
      avg_outlier_percentage: 0,

      // Trend distribution
      trend_distribution: {},

      // Time range
      earliest_bucket: null,
      latest_bucket: null
    };

    // Calculate distributions and averages
    results.forEach(result => {
      // Health status distribution
      const status = result.health_status;
      summary.health_status_distribution[status] = (summary.health_status_distribution[status] || 0) + 1;

      // Performance metrics
      summary.avg_performance_score += result.performance_score;
      summary.min_performance_score = Math.min(summary.min_performance_score, result.performance_score);
      summary.max_performance_score = Math.max(summary.max_performance_score, result.performance_score);

      // Anomaly tracking
      if (result.anomaly_indicators.has_outliers) {
        summary.anomalous_analyses++;
      }
      summary.avg_outlier_percentage += result.anomaly_indicators.outlier_percentage;

      // Trend distribution
      const trend = result.trend_direction;
      summary.trend_distribution[trend] = (summary.trend_distribution[trend] || 0) + 1;

      // Time range
      const bucket = result._id.time_bucket;
      if (!summary.earliest_bucket || bucket < summary.earliest_bucket) {
        summary.earliest_bucket = bucket;
      }
      if (!summary.latest_bucket || bucket > summary.latest_bucket) {
        summary.latest_bucket = bucket;
      }
    });

    // Calculate averages
    summary.avg_performance_score = Math.round(summary.avg_performance_score / results.length);
    summary.avg_outlier_percentage = Math.round(summary.avg_outlier_percentage / results.length);
    summary.anomaly_rate = Math.round((summary.anomalous_analyses / results.length) * 100);

    return summary;
  }

  generateAnalyticsCacheKey(analysisConfig) {
    const keyData = {
      devices: JSON.stringify(analysisConfig.deviceFilter || {}),
      timeRange: analysisConfig.timeRange,
      aggregationLevel: analysisConfig.aggregationLevel,
      analysisTypes: JSON.stringify(analysisConfig.analysisTypes || [])
    };

    const crypto = require('crypto');
    return crypto.createHash('md5').update(JSON.stringify(keyData)).digest('hex');
  }
}

module.exports = { TimeSeriesAnalyticsEngine };

SQL-Style Time Series Operations with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB time series operations:

-- QueryLeaf Time Series operations with SQL-familiar syntax

-- Insert time series data with automatic optimization
INSERT INTO sensor_readings (
  timestamp,
  device_id,
  sensor_type,
  measurements,
  location,
  quality_metrics
)
VALUES (
  CURRENT_TIMESTAMP,
  'device_001',
  'temperature',
  JSON_BUILD_OBJECT(
    'temperature', JSON_BUILD_OBJECT('value', 23.5, 'unit', 'celsius'),
    'humidity', JSON_BUILD_OBJECT('value', 65.2, 'unit', 'percent')
  ),
  ST_GeomFromText('POINT(-74.0060 40.7128)', 4326),
  JSON_BUILD_OBJECT(
    'data_quality_score', 95,
    'sensor_health', 'normal',
    'signal_strength', -45
  )
);

-- Advanced time series analytics with window functions
WITH hourly_analytics AS (
  SELECT 
    device_id,
    sensor_type,
    DATE_TRUNC('hour', timestamp) as hour_bucket,

    -- Basic statistics
    COUNT(*) as reading_count,
    AVG(measurements->>'temperature'->>'value') as avg_temperature,
    MIN(measurements->>'temperature'->>'value') as min_temperature,
    MAX(measurements->>'temperature'->>'value') as max_temperature,
    STDDEV(measurements->>'temperature'->>'value') as temp_stddev,

    -- Time series specific aggregations
    FIRST_VALUE(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id, DATE_TRUNC('hour', timestamp)
      ORDER BY timestamp ASC
      ROWS UNBOUNDED PRECEDING
    ) as first_reading,

    LAST_VALUE(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id, DATE_TRUNC('hour', timestamp) 
      ORDER BY timestamp ASC
      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as last_reading,

    -- Moving averages
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
    ) as moving_avg_6_readings,

    -- Data quality metrics
    AVG(quality_metrics->>'data_quality_score') as avg_data_quality,
    MIN(quality_metrics->>'signal_strength') as min_signal_strength,

    -- Geographical aggregation
    device.facility,
    device.zone,
    ST_AsText(AVG(device.location)) as avg_location

  FROM sensor_readings
  WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
    AND device.sensor_type = 'temperature'
  GROUP BY 
    device_id, 
    sensor_type, 
    DATE_TRUNC('hour', timestamp),
    device.facility,
    device.zone
),

trend_analysis AS (
  -- Calculate trends and changes over time
  SELECT 
    *,

    -- Hour-over-hour trend calculation
    LAG(avg_temperature, 1) OVER (
      PARTITION BY device_id 
      ORDER BY hour_bucket
    ) as prev_hour_temp,

    -- Trend direction
    CASE 
      WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) + temp_stddev THEN 'rising_fast'
      WHEN avg_temperature > LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) THEN 'rising'
      WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) - temp_stddev THEN 'falling_fast'
      WHEN avg_temperature < LAG(avg_temperature, 1) OVER (
        PARTITION BY device_id ORDER BY hour_bucket
      ) THEN 'falling'
      ELSE 'stable'
    END as trend_direction,

    -- Anomaly detection using statistical boundaries
    CASE 
      WHEN ABS(avg_temperature - AVG(avg_temperature) OVER (
        PARTITION BY device_id 
        ORDER BY hour_bucket 
        ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
      )) > 3 * STDDEV(avg_temperature) OVER (
        PARTITION BY device_id 
        ORDER BY hour_bucket 
        ROWS BETWEEN 23 PRECEDING AND CURRENT ROW
      ) THEN true
      ELSE false
    END as is_anomaly,

    -- Performance scoring
    CASE 
      WHEN avg_data_quality >= 90 AND min_signal_strength >= -60 THEN 'excellent'
      WHEN avg_data_quality >= 80 AND min_signal_strength >= -70 THEN 'good'
      WHEN avg_data_quality >= 70 AND min_signal_strength >= -80 THEN 'fair'
      ELSE 'poor'
    END as performance_rating,

    -- Operational status
    CASE 
      WHEN reading_count < 50 THEN 'low_frequency'  -- Expected: 60 readings per hour
      WHEN reading_count > 70 THEN 'high_frequency'
      ELSE 'normal_frequency'
    END as operational_status

  FROM hourly_analytics
),

facility_overview AS (
  -- Facility-level aggregations and insights
  SELECT 
    facility,
    zone,
    hour_bucket,

    -- Device and measurement counts
    COUNT(DISTINCT device_id) as active_devices,
    SUM(reading_count) as total_readings,

    -- Temperature analytics
    AVG(avg_temperature) as facility_avg_temp,
    MIN(min_temperature) as facility_min_temp,
    MAX(max_temperature) as facility_max_temp,

    -- Performance metrics
    AVG(avg_data_quality) as facility_data_quality,
    AVG(min_signal_strength) as facility_avg_signal,

    -- Status distribution
    COUNT(*) FILTER (WHERE performance_rating = 'excellent') as excellent_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'good') as good_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'fair') as fair_devices,
    COUNT(*) FILTER (WHERE performance_rating = 'poor') as poor_devices,

    -- Anomaly and trend insights
    COUNT(*) FILTER (WHERE is_anomaly = true) as anomalous_devices,
    COUNT(*) FILTER (WHERE trend_direction LIKE '%rising%') as rising_trend_devices,
    COUNT(*) FILTER (WHERE trend_direction LIKE '%falling%') as falling_trend_devices,

    -- Operational health
    COUNT(*) FILTER (WHERE operational_status = 'normal_frequency') as normal_operation_devices,
    COUNT(*) FILTER (WHERE operational_status = 'low_frequency') as low_frequency_devices,

    -- Geographic insights (if location data available)
    COUNT(DISTINCT avg_location) as location_diversity

  FROM trend_analysis
  GROUP BY facility, zone, hour_bucket
)

-- Final comprehensive time series analytics dashboard
SELECT 
  f.facility,
  f.zone,
  f.hour_bucket,

  -- Device and data summary
  f.active_devices,
  f.total_readings,
  ROUND(f.total_readings::numeric / NULLIF(f.active_devices, 0), 0) as avg_readings_per_device,

  -- Environmental metrics
  ROUND(f.facility_avg_temp, 2) as avg_temperature,
  ROUND(f.facility_min_temp, 2) as min_temperature,
  ROUND(f.facility_max_temp, 2) as max_temperature,
  ROUND(f.facility_max_temp - f.facility_min_temp, 2) as temperature_range,

  -- Performance assessment
  ROUND(f.facility_data_quality, 1) as data_quality_percentage,
  ROUND(f.facility_avg_signal, 0) as avg_signal_strength,

  -- Health score calculation
  ROUND(
    (f.excellent_devices * 100 + f.good_devices * 80 + f.fair_devices * 60 + f.poor_devices * 40) 
    / NULLIF(f.active_devices, 0), 
    1
  ) as facility_health_score,

  -- Status distribution percentages
  ROUND((f.excellent_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as excellent_percentage,
  ROUND((f.good_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as good_percentage,
  ROUND((f.fair_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as fair_percentage,
  ROUND((f.poor_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as poor_percentage,

  -- Trend and anomaly insights
  f.anomalous_devices,
  ROUND((f.anomalous_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as anomaly_percentage,
  f.rising_trend_devices,
  f.falling_trend_devices,

  -- Operational status
  f.normal_operation_devices,
  f.low_frequency_devices,
  ROUND((f.normal_operation_devices::numeric / NULLIF(f.active_devices, 0)) * 100, 1) as operational_health_percentage,

  -- Alert conditions
  CASE 
    WHEN f.anomalous_devices > (f.active_devices * 0.2) THEN 'high_anomaly_alert'
    WHEN f.poor_devices > (f.active_devices * 0.3) THEN 'poor_performance_alert'
    WHEN f.low_frequency_devices > (f.active_devices * 0.4) THEN 'connectivity_alert'
    WHEN f.facility_avg_temp > 40 OR f.facility_avg_temp < 0 THEN 'environmental_alert'
    ELSE 'normal'
  END as alert_status,

  -- Recommendations
  CASE 
    WHEN f.poor_devices > (f.active_devices * 0.2) THEN 'Investigate device performance issues'
    WHEN f.anomalous_devices > (f.active_devices * 0.1) THEN 'Review anomalous readings for pattern analysis'
    WHEN f.facility_data_quality < 80 THEN 'Improve data quality monitoring and sensor calibration'
    WHEN f.facility_avg_signal < -70 THEN 'Consider network infrastructure improvements'
    ELSE 'System operating within normal parameters'
  END as recommendation,

  -- Metadata
  CURRENT_TIMESTAMP as report_generated_at,
  '24h_analysis' as analysis_type

FROM facility_overview f
WHERE f.hour_bucket >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
ORDER BY 
  f.facility, 
  f.zone, 
  f.hour_bucket DESC;

-- Real-time alerting with time series patterns
WITH real_time_thresholds AS (
  SELECT 
    device_id,
    sensor_type,

    -- Current reading
    measurements->>'temperature'->>'value' as current_temp,
    measurements->>'humidity'->>'value' as current_humidity,
    quality_metrics->>'data_quality_score' as current_quality,

    -- Historical context (last hour average)
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      RANGE BETWEEN INTERVAL '1 hour' PRECEDING AND INTERVAL '1 minute' PRECEDING
    ) as historical_avg_temp,

    -- Recent trend (last 5 readings)
    AVG(measurements->>'temperature'->>'value') OVER (
      PARTITION BY device_id
      ORDER BY timestamp
      ROWS BETWEEN 4 PRECEDING AND CURRENT ROW
    ) as recent_avg_temp,

    -- Device metadata
    device.facility,
    device.zone,
    device.location,
    timestamp,

    -- Network health
    quality_metrics->>'signal_strength' as signal_strength

  FROM sensor_readings
  WHERE timestamp >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
    AND device.sensor_type = 'temperature'
),

alert_conditions AS (
  SELECT 
    *,

    -- Threshold breaches
    CASE 
      WHEN current_temp > 80 THEN 'critical_high_temperature'
      WHEN current_temp < -10 THEN 'critical_low_temperature'
      WHEN current_temp > 60 THEN 'high_temperature_warning'
      WHEN current_temp < 5 THEN 'low_temperature_warning'
      ELSE null
    END as temperature_alert,

    -- Rapid changes
    CASE 
      WHEN ABS(current_temp - recent_avg_temp) > 10 THEN 'rapid_temperature_change'
      WHEN ABS(current_temp - historical_avg_temp) > 15 THEN 'significant_deviation'
      ELSE null
    END as change_alert,

    -- Data quality issues
    CASE 
      WHEN current_quality < 50 THEN 'critical_data_quality'
      WHEN current_quality < 70 THEN 'poor_data_quality'
      WHEN signal_strength < -90 THEN 'poor_connectivity'
      ELSE null
    END as quality_alert,

    -- Combined severity assessment
    CASE 
      WHEN current_temp > 80 OR current_temp < -10 OR current_quality < 50 THEN 'critical'
      WHEN current_temp > 60 OR current_temp < 5 OR current_quality < 70 OR ABS(current_temp - recent_avg_temp) > 10 THEN 'warning'
      WHEN ABS(current_temp - historical_avg_temp) > 15 OR signal_strength < -80 THEN 'info'
      ELSE 'normal'
    END as overall_severity

  FROM real_time_thresholds
)

-- Generate active alerts with context
SELECT 
  device_id,
  facility,
  zone,
  ST_AsText(location) as device_location,
  timestamp as alert_timestamp,
  overall_severity,

  -- Primary alert
  COALESCE(temperature_alert, change_alert, quality_alert, 'normal') as primary_alert_type,

  -- Alert message
  CASE 
    WHEN temperature_alert IS NOT NULL THEN 
      CONCAT('Temperature alert: ', current_temp, '°C detected')
    WHEN change_alert IS NOT NULL THEN 
      CONCAT('Temperature change alert: ', ROUND(ABS(current_temp - recent_avg_temp), 1), '°C change detected')
    WHEN quality_alert IS NOT NULL THEN 
      CONCAT('Data quality alert: ', current_quality, '% quality score')
    ELSE 'System normal'
  END as alert_message,

  -- Current readings
  ROUND(current_temp, 2) as current_temperature,
  ROUND(current_humidity, 1) as current_humidity,
  current_quality as data_quality_percentage,
  signal_strength as signal_strength_dbm,

  -- Context
  ROUND(historical_avg_temp, 2) as hourly_avg_temperature,
  ROUND(recent_avg_temp, 2) as recent_avg_temperature,
  ROUND(ABS(current_temp - historical_avg_temp), 2) as deviation_from_hourly_avg,

  -- Action required
  CASE overall_severity
    WHEN 'critical' THEN 'IMMEDIATE ACTION REQUIRED'
    WHEN 'warning' THEN 'Investigation recommended'
    WHEN 'info' THEN 'Monitor for trends'
    ELSE 'No action required'
  END as recommended_action,

  -- Contact priority
  CASE overall_severity
    WHEN 'critical' THEN 'Notify operations team immediately'
    WHEN 'warning' THEN 'Escalate to facility manager'
    ELSE 'Log for review'
  END as escalation_level

FROM alert_conditions
WHERE overall_severity IN ('critical', 'warning', 'info')
ORDER BY 
  CASE overall_severity 
    WHEN 'critical' THEN 1 
    WHEN 'warning' THEN 2 
    WHEN 'info' THEN 3 
  END,
  timestamp DESC;

-- Time series data lifecycle management
WITH data_lifecycle_analysis AS (
  SELECT 
    DATE_TRUNC('day', timestamp) as date_bucket,
    device.facility,
    COUNT(*) as daily_record_count,
    AVG(quality_metrics->>'data_quality_score') as avg_daily_quality,

    -- Data size estimation (approximate)
    COUNT(*) * 1024 as estimated_bytes_per_day, -- Rough estimate

    -- Retention category
    CASE 
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '7 days' THEN 'hot_data'
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '90 days' THEN 'warm_data'
      WHEN DATE_TRUNC('day', timestamp) >= CURRENT_DATE - INTERVAL '365 days' THEN 'cold_data'
      ELSE 'archive_data'
    END as retention_category,

    -- Archive recommendations
    CASE 
      WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '2 years' THEN 'candidate_for_deletion'
      WHEN DATE_TRUNC('day', timestamp) < CURRENT_DATE - INTERVAL '1 year' 
           AND AVG(quality_metrics->>'data_quality_score') < 70 THEN 'candidate_for_archive'
      ELSE 'keep_active'
    END as lifecycle_recommendation

  FROM sensor_readings
  WHERE timestamp >= CURRENT_DATE - INTERVAL '2 years'
  GROUP BY DATE_TRUNC('day', timestamp), device.facility
)

SELECT 
  retention_category,
  COUNT(*) as day_buckets,
  SUM(daily_record_count) as total_records,
  ROUND(AVG(avg_daily_quality), 1) as avg_quality_score,

  -- Storage estimates
  ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024), 1) as estimated_mb,
  ROUND(SUM(estimated_bytes_per_day) / (1024 * 1024 * 1024), 2) as estimated_gb,

  -- Lifecycle recommendations
  SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_deletion' THEN daily_record_count ELSE 0 END) as records_for_deletion,
  SUM(CASE WHEN lifecycle_recommendation = 'candidate_for_archive' THEN daily_record_count ELSE 0 END) as records_for_archive,
  SUM(CASE WHEN lifecycle_recommendation = 'keep_active' THEN daily_record_count ELSE 0 END) as records_to_keep,

  -- Storage optimization potential
  ROUND(
    SUM(CASE WHEN lifecycle_recommendation IN ('candidate_for_deletion', 'candidate_for_archive') 
             THEN estimated_bytes_per_day ELSE 0 END) / (1024 * 1024), 1
  ) as potential_storage_savings_mb

FROM data_lifecycle_analysis
GROUP BY retention_category
ORDER BY 
  CASE retention_category 
    WHEN 'hot_data' THEN 1 
    WHEN 'warm_data' THEN 2 
    WHEN 'cold_data' THEN 3 
    WHEN 'archive_data' THEN 4 
  END;

-- QueryLeaf provides comprehensive time series capabilities:
-- 1. High-performance data ingestion with automatic time series optimization
-- 2. Advanced analytics using familiar SQL window functions and aggregations
-- 3. Real-time alerting and threshold monitoring with SQL expressions
-- 4. Facility and device-level dashboards using complex analytical queries
-- 5. Trend analysis and anomaly detection through statistical SQL functions
-- 6. Data lifecycle management with retention and archiving recommendations
-- 7. Geospatial analytics for location-aware IoT deployments
-- 8. Integration with MongoDB's native time series compression and bucketing

Best Practices for Production Time Series Deployments

Performance Optimization Strategies

Essential optimization techniques for high-throughput time series workloads:

  1. Time Series Collection Configuration: Choose optimal granularity and bucket settings based on data patterns
  2. Index Strategy: Create compound indexes optimized for time-range and device queries
  3. Data Retention: Implement automated lifecycle policies for different data temperatures
  4. Aggregation Performance: Design materialized views for frequently accessed analytics
  5. Real-Time Processing: Optimize change streams and triggers for low-latency analytics
  6. Compression Settings: Configure appropriate compression algorithms for time series data patterns

IoT Architecture Design

Design principles for scalable IoT time series systems:

  1. Device Management: Implement device registration, health monitoring, and metadata management
  2. Network Optimization: Design efficient data transmission protocols for IoT constraints
  3. Edge Processing: Implement edge analytics to reduce data transmission and latency
  4. Fault Tolerance: Design robust error handling and offline data synchronization
  5. Security Implementation: Implement device authentication, encryption, and access controls
  6. Scalability Planning: Plan for horizontal scaling across geographic regions and device growth

Conclusion

MongoDB Time Series Collections provide enterprise-grade IoT data management capabilities that address the unique challenges of sensor data ingestion, real-time analytics, and long-term historical analysis. The purpose-built time series optimizations eliminate the complexity and performance limitations of traditional database approaches while delivering sophisticated analytics and monitoring capabilities at IoT scale.

Key MongoDB Time Series advantages include:

  • Optimized Storage: Automatic bucketing and compression specifically designed for time-stamped data
  • High-Velocity Ingestion: Purpose-built write optimization for high-frequency sensor data streams
  • Advanced Analytics: Sophisticated aggregation pipelines for real-time and historical analytics
  • Automatic Lifecycle Management: Built-in data retention and archiving capabilities
  • Scalable Architecture: Horizontal scaling optimized for time series query patterns
  • SQL Accessibility: Familiar time series operations through QueryLeaf's SQL interface

Whether you're building IoT monitoring systems, industrial sensor networks, environmental tracking applications, or real-time analytics platforms, MongoDB Time Series Collections with QueryLeaf's SQL interface provide the foundation for efficient, scalable, and maintainable time series data management that can adapt to evolving IoT requirements while maintaining familiar database interaction patterns.

QueryLeaf Integration: QueryLeaf automatically optimizes SQL-style time series operations for MongoDB's specialized time series collections, enabling developers to leverage advanced IoT analytics through familiar SQL syntax. Complex sensor data aggregations, real-time alerting logic, and trend analysis queries are seamlessly translated into MongoDB's high-performance time series operations, making sophisticated IoT analytics accessible without requiring specialized time series expertise.

The combination of MongoDB's time series optimizations with SQL-familiar operations creates an ideal platform for IoT applications requiring both high-performance data processing and familiar database interaction patterns, ensuring your IoT systems can scale efficiently while maintaining data accessibility and analytical capabilities.

MongoDB ETL and Data Pipeline Processing: High-Performance Data Transformation and Stream Processing with SQL-Familiar Pipeline Architecture

Modern data-driven organizations require sophisticated ETL (Extract, Transform, Load) processes that can handle diverse data sources, perform complex transformations, and deliver processed data to multiple downstream systems in real-time. Traditional ETL tools often struggle with the volume, variety, and velocity requirements of contemporary data workflows, particularly when dealing with semi-structured data, real-time streaming sources, and the need for flexible schema evolution.

MongoDB's aggregation framework, combined with change streams and flexible document storage, provides a powerful foundation for building high-performance ETL pipelines that can process data at scale while maintaining the familiar SQL-style operations that development teams understand. This approach enables efficient data transformation, real-time processing capabilities, and seamless integration with existing data infrastructure.

The Traditional ETL Challenge

Conventional ETL architectures face significant limitations when dealing with modern data requirements:

-- Traditional PostgreSQL ETL limitations
-- Fixed schema constraints limit data source flexibility

CREATE TABLE raw_customer_data (
    customer_id BIGINT PRIMARY KEY,
    email VARCHAR(255),
    first_name VARCHAR(100),
    last_name VARCHAR(100),
    registration_date DATE,

    -- Static schema can't accommodate varying data structures
    profile_data JSONB  -- Limited JSON support
);

-- Complex transformation logic requires stored procedures
CREATE OR REPLACE FUNCTION transform_customer_data()
RETURNS TABLE(
    customer_key BIGINT,
    full_name VARCHAR(201),
    email_domain VARCHAR(100),
    registration_month VARCHAR(7),
    profile_score INTEGER
) AS $$
BEGIN
    RETURN QUERY
    SELECT 
        c.customer_id,
        CONCAT(c.first_name, ' ', c.last_name),
        SUBSTRING(c.email FROM POSITION('@' IN c.email) + 1),
        TO_CHAR(c.registration_date, 'YYYY-MM'),
        CASE 
            WHEN c.profile_data->>'premium' = 'true' THEN 100
            WHEN c.profile_data->>'verified' = 'true' THEN 50
            ELSE 25
        END
    FROM raw_customer_data c
    WHERE c.registration_date >= CURRENT_DATE - INTERVAL '30 days';
END;
$$ LANGUAGE plpgsql;

-- Batch processing limitations
INSERT INTO transformed_customers 
SELECT * FROM transform_customer_data();

-- Problems:
-- 1. Rigid schema requirements
-- 2. Limited real-time processing
-- 3. Complex stored procedure logic
-- 4. Poor scaling for large datasets
-- 5. Limited support for nested/complex data structures

MongoDB ETL pipelines address these limitations with flexible aggregation-based transformations:

// MongoDB flexible ETL pipeline
const customerTransformPipeline = [
  // Extract: Flexible data ingestion from multiple sources
  {
    $match: {
      registration_date: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
      status: { $ne: "deleted" }
    }
  },

  // Transform: Complex data transformation with aggregation operators
  {
    $addFields: {
      full_name: { $concat: ["$first_name", " ", "$last_name"] },
      email_domain: {
        $substr: [
          "$email",
          { $add: [{ $indexOfBytes: ["$email", "@"] }, 1] },
          { $strLenCP: "$email" }
        ]
      },
      registration_month: {
        $dateToString: { format: "%Y-%m", date: "$registration_date" }
      },
      profile_score: {
        $switch: {
          branches: [
            { case: { $eq: ["$profile.premium", true] }, then: 100 },
            { case: { $eq: ["$profile.verified", true] }, then: 50 }
          ],
          default: 25
        }
      },
      // Handle complex nested transformations
      preferences: {
        $map: {
          input: "$profile.preferences",
          as: "pref",
          in: {
            category: "$$pref.type",
            enabled: "$$pref.active",
            weight: { $multiply: ["$$pref.priority", 10] }
          }
        }
      }
    }
  },

  // Load: Flexible output with computed fields
  {
    $project: {
      customer_key: "$customer_id",
      full_name: 1,
      email_domain: 1,
      registration_month: 1,
      profile_score: 1,
      preferences: 1,
      transformation_timestamp: new Date(),
      source_system: "customer_api"
    }
  }
];

// Process with high-performance aggregation
const transformedCustomers = await db.customers.aggregate(customerTransformPipeline).toArray();

// Benefits:
// - Flexible schema handling
// - Complex nested data transformation
// - High-performance parallel processing
// - Real-time processing capabilities
// - Rich transformation operators

ETL Pipeline Architecture

Data Extraction Patterns

Implement flexible data extraction from multiple sources:

// Comprehensive data extraction service
class DataExtractionService {
  constructor(db, config) {
    this.db = db;
    this.config = config;
    this.rawCollection = db.collection('raw_data');
    this.metadataCollection = db.collection('etl_metadata');
  }

  async extractFromAPI(sourceConfig, extractionId) {
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'api',
      source_config: sourceConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0,
      errors: []
    };

    try {
      // API data extraction with pagination
      let hasMore = true;
      let offset = 0;
      const batchSize = sourceConfig.batch_size || 1000;

      while (hasMore) {
        const response = await this.fetchAPIData(sourceConfig, offset, batchSize);

        if (response.data && response.data.length > 0) {
          // Prepare documents for insertion
          const documents = response.data.map(record => ({
            ...record,
            _extraction_metadata: {
              extraction_id: extractionId,
              source_url: sourceConfig.url,
              extracted_at: new Date(),
              batch_offset: offset,
              raw_record: true
            }
          }));

          // Bulk insert with ordered operations
          await this.rawCollection.insertMany(documents, {
            ordered: false,
            writeConcern: { w: 1, j: true }
          });

          extractionMetadata.records_processed += documents.length;
          offset += batchSize;
          hasMore = response.has_more;
        } else {
          hasMore = false;
        }

        // Update progress
        await this.updateExtractionProgress(extractionId, extractionMetadata);
      }

      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async extractFromDatabase(dbConfig, query, extractionId) {
    // Database extraction with change tracking
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'database',
      source_config: dbConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0
    };

    try {
      // Get last extraction timestamp for incremental updates
      const lastExtraction = await this.metadataCollection.findOne(
        {
          source_type: 'database',
          'source_config.connection_string': dbConfig.connection_string,
          status: 'completed'
        },
        { sort: { end_time: -1 } }
      );

      // Build incremental query
      let incrementalQuery = { ...query };
      if (lastExtraction && dbConfig.incremental_field) {
        incrementalQuery[dbConfig.incremental_field] = {
          $gt: lastExtraction.end_time
        };
      }

      // Extract with cursor for memory efficiency
      const cursor = this.db.collection(dbConfig.collection).find(incrementalQuery);
      const batchSize = 1000;
      let batch = [];
      let recordCount = 0;

      for await (const doc of cursor) {
        // Add extraction metadata
        const enrichedDoc = {
          ...doc,
          _extraction_metadata: {
            extraction_id: extractionId,
            source_database: dbConfig.database,
            source_collection: dbConfig.collection,
            extracted_at: new Date()
          }
        };

        batch.push(enrichedDoc);

        if (batch.length >= batchSize) {
          await this.rawCollection.insertMany(batch, { ordered: false });
          recordCount += batch.length;
          batch = [];

          // Update progress
          extractionMetadata.records_processed = recordCount;
          await this.updateExtractionProgress(extractionId, extractionMetadata);
        }
      }

      // Insert final batch
      if (batch.length > 0) {
        await this.rawCollection.insertMany(batch, { ordered: false });
        recordCount += batch.length;
      }

      extractionMetadata.records_processed = recordCount;
      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async extractFromFiles(fileConfig, extractionId) {
    // File-based extraction (CSV, JSON, XML, etc.)
    const extractionMetadata = {
      extraction_id: extractionId,
      source_type: 'file',
      source_config: fileConfig,
      start_time: new Date(),
      status: 'in_progress',
      files_processed: 0,
      records_processed: 0
    };

    try {
      const files = await this.getFilesFromSource(fileConfig);

      for (const filePath of files) {
        const fileData = await this.parseFile(filePath, fileConfig.format);

        if (fileData && fileData.length > 0) {
          const enrichedDocuments = fileData.map(record => ({
            ...record,
            _extraction_metadata: {
              extraction_id: extractionId,
              source_file: filePath,
              extracted_at: new Date(),
              file_format: fileConfig.format
            }
          }));

          // Batch insert file data
          const batchSize = 1000;
          for (let i = 0; i < enrichedDocuments.length; i += batchSize) {
            const batch = enrichedDocuments.slice(i, i + batchSize);
            await this.rawCollection.insertMany(batch, { ordered: false });
            extractionMetadata.records_processed += batch.length;
          }
        }

        extractionMetadata.files_processed++;
        await this.updateExtractionProgress(extractionId, extractionMetadata);
      }

      extractionMetadata.status = 'completed';
      extractionMetadata.end_time = new Date();

    } catch (error) {
      extractionMetadata.status = 'failed';
      extractionMetadata.error = error.message;
      extractionMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.metadataCollection.replaceOne(
        { extraction_id: extractionId },
        extractionMetadata,
        { upsert: true }
      );
    }

    return extractionMetadata;
  }

  async fetchAPIData(config, offset, limit) {
    // Implement API-specific data fetching logic
    // This would integrate with actual API clients
    const url = `${config.url}?offset=${offset}&limit=${limit}`;
    // Return { data: [...], has_more: boolean }
    return { data: [], has_more: false }; // Placeholder
  }

  async parseFile(filePath, format) {
    // Implement file parsing logic for various formats
    // CSV, JSON, XML, Parquet, etc.
    return []; // Placeholder
  }

  async getFilesFromSource(config) {
    // Get file list from various sources (S3, FTP, local filesystem)
    return []; // Placeholder
  }

  async updateExtractionProgress(extractionId, metadata) {
    await this.metadataCollection.updateOne(
      { extraction_id: extractionId },
      { $set: metadata },
      { upsert: true }
    );
  }
}

Data Transformation Engine

Build sophisticated data transformation pipelines:

// Advanced data transformation service
class DataTransformationService {
  constructor(db) {
    this.db = db;
    this.rawCollection = db.collection('raw_data');
    this.transformedCollection = db.collection('transformed_data');
    this.errorCollection = db.collection('transformation_errors');
  }

  async executeTransformationPipeline(pipelineConfig, transformationId) {
    const transformationMetadata = {
      transformation_id: transformationId,
      pipeline_name: pipelineConfig.name,
      start_time: new Date(),
      status: 'in_progress',
      records_processed: 0,
      records_transformed: 0,
      errors_encountered: 0,
      stages: []
    };

    try {
      // Build dynamic aggregation pipeline
      const aggregationPipeline = this.buildAggregationPipeline(pipelineConfig);

      // Execute transformation with error handling
      const transformationResults = await this.executeWithErrorHandling(
        aggregationPipeline, 
        transformationId,
        transformationMetadata
      );

      transformationMetadata.records_transformed = transformationResults.successCount;
      transformationMetadata.errors_encountered = transformationResults.errorCount;
      transformationMetadata.status = 'completed';
      transformationMetadata.end_time = new Date();

      return transformationMetadata;

    } catch (error) {
      transformationMetadata.status = 'failed';
      transformationMetadata.error = error.message;
      transformationMetadata.end_time = new Date();
      throw error;
    }
  }

  buildAggregationPipeline(config) {
    const pipeline = [];

    // Stage 1: Data filtering and source selection
    if (config.source_filter) {
      pipeline.push({
        $match: {
          ...config.source_filter,
          '_extraction_metadata.extraction_id': { $exists: true }
        }
      });
    }

    // Stage 2: Data cleansing and validation
    if (config.cleansing_rules) {
      pipeline.push({
        $addFields: {
          // Clean and validate data
          cleaned_data: {
            $let: {
              vars: {
                cleaned: {
                  $switch: {
                    branches: config.cleansing_rules.map(rule => ({
                      case: rule.condition,
                      then: rule.transformation
                    })),
                    default: "$$ROOT"
                  }
                }
              },
              in: "$$cleaned"
            }
          }
        }
      });
    }

    // Stage 3: Data enrichment and computed fields
    if (config.enrichment_rules) {
      const enrichmentFields = {};

      config.enrichment_rules.forEach(rule => {
        enrichmentFields[rule.target_field] = rule.computation;
      });

      pipeline.push({
        $addFields: enrichmentFields
      });
    }

    // Stage 4: Nested document transformations
    if (config.nested_transformations) {
      config.nested_transformations.forEach(transformation => {
        if (transformation.type === 'array_processing') {
          pipeline.push({
            $addFields: {
              [transformation.target_field]: {
                $map: {
                  input: `$${transformation.source_field}`,
                  as: "item",
                  in: transformation.item_transformation
                }
              }
            }
          });
        } else if (transformation.type === 'object_flattening') {
          pipeline.push({
            $addFields: this.buildFlatteningTransformation(transformation)
          });
        }
      });
    }

    // Stage 5: Data aggregation and grouping
    if (config.aggregation_rules) {
      config.aggregation_rules.forEach(rule => {
        if (rule.type === 'group') {
          pipeline.push({
            $group: {
              _id: rule.group_by,
              ...rule.aggregations
            }
          });
        } else if (rule.type === 'bucket') {
          pipeline.push({
            $bucket: {
              groupBy: rule.group_by,
              boundaries: rule.boundaries,
              default: rule.default_bucket,
              output: rule.output
            }
          });
        }
      });
    }

    // Stage 6: Output formatting and projection
    if (config.output_format) {
      pipeline.push({
        $project: {
          ...config.output_format.fields,
          _transformation_metadata: {
            transformation_id: config.transformation_id,
            pipeline_name: config.name,
            transformed_at: new Date(),
            source_extraction_id: "$_extraction_metadata.extraction_id"
          }
        }
      });
    }

    return pipeline;
  }

  async executeWithErrorHandling(pipeline, transformationId, metadata) {
    let successCount = 0;
    let errorCount = 0;
    const batchSize = 1000;

    try {
      // Use aggregation cursor for memory-efficient processing
      const cursor = this.rawCollection.aggregate(pipeline, {
        allowDiskUse: true,
        cursor: { batchSize }
      });

      let batch = [];

      for await (const document of cursor) {
        try {
          // Additional validation can be performed here
          if (this.validateTransformedDocument(document)) {
            batch.push(document);
            successCount++;
          } else {
            await this.logTransformationError(
              transformationId,
              document,
              'validation_failed',
              'Document failed validation rules'
            );
            errorCount++;
          }

          // Process batch when full
          if (batch.length >= batchSize) {
            await this.insertTransformedBatch(batch, transformationId);
            batch = [];
          }

        } catch (docError) {
          await this.logTransformationError(
            transformationId,
            document,
            'processing_error',
            docError.message
          );
          errorCount++;
        }
      }

      // Insert remaining documents
      if (batch.length > 0) {
        await this.insertTransformedBatch(batch, transformationId);
      }

      return { successCount, errorCount };

    } catch (pipelineError) {
      throw new Error(`Pipeline execution failed: ${pipelineError.message}`);
    }
  }

  buildFlatteningTransformation(config) {
    const flattenedFields = {};

    // Flatten nested object structure
    const flattenObject = (obj, prefix = '') => {
      for (const [key, value] of Object.entries(obj)) {
        const newKey = prefix ? `${prefix}.${key}` : key;

        if (typeof value === 'object' && !Array.isArray(value)) {
          flattenObject(value, newKey);
        } else {
          flattenedFields[newKey.replace('.', '_')] = `$${newKey}`;
        }
      }
    };

    flattenObject(config.source_structure);
    return flattenedFields;
  }

  async insertTransformedBatch(batch, transformationId) {
    try {
      await this.transformedCollection.insertMany(batch, {
        ordered: false,
        writeConcern: { w: 1, j: true }
      });
    } catch (error) {
      // Handle partial failures in batch
      if (error.writeErrors) {
        for (const writeError of error.writeErrors) {
          await this.logTransformationError(
            transformationId,
            batch[writeError.index],
            'insert_error',
            writeError.errmsg
          );
        }
      } else {
        throw error;
      }
    }
  }

  validateTransformedDocument(doc) {
    // Implement document validation logic
    // Check required fields, data types, business rules, etc.
    return true; // Placeholder
  }

  async logTransformationError(transformationId, document, errorType, errorMessage) {
    const errorDoc = {
      transformation_id: transformationId,
      error_type: errorType,
      error_message: errorMessage,
      failed_document_id: document._id,
      failed_document_sample: JSON.stringify(document).substring(0, 1000),
      timestamp: new Date()
    };

    await this.errorCollection.insertOne(errorDoc);
  }

  // Complex transformation functions
  async executeCustomTransformations(documents, transformationConfig) {
    return documents.map(doc => {
      let transformedDoc = { ...doc };

      // Text processing transformations
      if (transformationConfig.text_processing) {
        transformedDoc = this.applyTextTransformations(transformedDoc, transformationConfig.text_processing);
      }

      // Date and time transformations
      if (transformationConfig.date_processing) {
        transformedDoc = this.applyDateTransformations(transformedDoc, transformationConfig.date_processing);
      }

      // Numerical computations
      if (transformationConfig.numerical_processing) {
        transformedDoc = this.applyNumericalTransformations(transformedDoc, transformationConfig.numerical_processing);
      }

      return transformedDoc;
    });
  }

  applyTextTransformations(doc, config) {
    // Text cleaning, normalization, extraction
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (fieldValue && typeof fieldValue === 'string') {
        switch (rule.operation) {
          case 'normalize':
            this.setNestedValue(doc, rule.field, fieldValue.toLowerCase().trim());
            break;
          case 'extract_email':
            const emailMatch = fieldValue.match(/[\w\.-]+@[\w\.-]+\.\w+/);
            if (emailMatch) {
              this.setNestedValue(doc, rule.target_field, emailMatch[0]);
            }
            break;
          case 'extract_phone':
            const phoneMatch = fieldValue.match(/(\+?\d{1,3}[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}/);
            if (phoneMatch) {
              this.setNestedValue(doc, rule.target_field, phoneMatch[0]);
            }
            break;
          case 'split':
            const parts = fieldValue.split(rule.delimiter);
            this.setNestedValue(doc, rule.target_field, parts);
            break;
        }
      }
    });

    return doc;
  }

  applyDateTransformations(doc, config) {
    // Date parsing, formatting, calculations
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (fieldValue) {
        switch (rule.operation) {
          case 'parse_date':
            const parsedDate = new Date(fieldValue);
            if (!isNaN(parsedDate.getTime())) {
              this.setNestedValue(doc, rule.target_field, parsedDate);
            }
            break;
          case 'extract_components':
            const date = new Date(fieldValue);
            if (!isNaN(date.getTime())) {
              this.setNestedValue(doc, `${rule.target_field}_year`, date.getFullYear());
              this.setNestedValue(doc, `${rule.target_field}_month`, date.getMonth() + 1);
              this.setNestedValue(doc, `${rule.target_field}_day`, date.getDate());
            }
            break;
          case 'age_calculation':
            const birthDate = new Date(fieldValue);
            const age = Math.floor((Date.now() - birthDate.getTime()) / (1000 * 60 * 60 * 24 * 365));
            this.setNestedValue(doc, rule.target_field, age);
            break;
        }
      }
    });

    return doc;
  }

  applyNumericalTransformations(doc, config) {
    // Numerical calculations, conversions, aggregations
    config.forEach(rule => {
      const fieldValue = this.getNestedValue(doc, rule.field);
      if (typeof fieldValue === 'number') {
        switch (rule.operation) {
          case 'currency_conversion':
            const convertedValue = fieldValue * rule.exchange_rate;
            this.setNestedValue(doc, rule.target_field, Math.round(convertedValue * 100) / 100);
            break;
          case 'percentage_calculation':
            const total = this.getNestedValue(doc, rule.total_field);
            if (total && total !== 0) {
              const percentage = (fieldValue / total) * 100;
              this.setNestedValue(doc, rule.target_field, Math.round(percentage * 100) / 100);
            }
            break;
          case 'range_classification':
            let classification = 'unknown';
            for (const range of rule.ranges) {
              if (fieldValue >= range.min && fieldValue <= range.max) {
                classification = range.label;
                break;
              }
            }
            this.setNestedValue(doc, rule.target_field, classification);
            break;
        }
      }
    });

    return doc;
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }
}

Data Loading and Output Management

Implement flexible data loading strategies:

// Data loading and output service
class DataLoadingService {
  constructor(db) {
    this.db = db;
    this.transformedCollection = db.collection('transformed_data');
    this.outputMetadataCollection = db.collection('output_metadata');
  }

  async loadToMongoDB(targetConfig, loadId) {
    const loadMetadata = {
      load_id: loadId,
      target_type: 'mongodb',
      target_config: targetConfig,
      start_time: new Date(),
      status: 'in_progress',
      records_loaded: 0,
      errors: []
    };

    try {
      const targetCollection = this.db.collection(targetConfig.collection);

      // Configure loading strategy
      const loadingStrategy = targetConfig.strategy || 'append';

      if (loadingStrategy === 'replace') {
        // Clear target collection before loading
        await targetCollection.deleteMany({});
      }

      // Load data in batches
      const batchSize = targetConfig.batch_size || 1000;
      const pipeline = this.buildLoadingPipeline(targetConfig);
      const cursor = this.transformedCollection.aggregate(pipeline, {
        allowDiskUse: true,
        cursor: { batchSize }
      });

      let batch = [];
      let recordCount = 0;

      for await (const doc of cursor) {
        // Apply final transformations for target schema
        const finalDoc = this.applyTargetTransformations(doc, targetConfig);

        batch.push(finalDoc);

        if (batch.length >= batchSize) {
          await this.insertBatch(targetCollection, batch, loadingStrategy);
          recordCount += batch.length;
          batch = [];

          // Update progress
          loadMetadata.records_loaded = recordCount;
          await this.updateLoadProgress(loadId, loadMetadata);
        }
      }

      // Insert final batch
      if (batch.length > 0) {
        await this.insertBatch(targetCollection, batch, loadingStrategy);
        recordCount += batch.length;
      }

      // Create indexes if specified
      if (targetConfig.indexes) {
        await this.createTargetIndexes(targetCollection, targetConfig.indexes);
      }

      loadMetadata.records_loaded = recordCount;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  async loadToWarehouse(warehouseConfig, loadId) {
    // Load data to external data warehouse (Snowflake, Redshift, BigQuery)
    const loadMetadata = {
      load_id: loadId,
      target_type: 'warehouse',
      target_config: warehouseConfig,
      start_time: new Date(),
      status: 'in_progress'
    };

    try {
      // Export data to format compatible with warehouse
      const exportFormat = warehouseConfig.format || 'parquet';
      const exportPath = await this.exportToFile(warehouseConfig, exportFormat);

      // Upload to warehouse staging area
      await this.uploadToWarehouse(exportPath, warehouseConfig);

      // Execute warehouse loading commands
      const loadResults = await this.executeWarehouseLoad(warehouseConfig);

      loadMetadata.records_loaded = loadResults.recordCount;
      loadMetadata.warehouse_table = loadResults.tableName;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  async loadToAPI(apiConfig, loadId) {
    // Push data to external APIs
    const loadMetadata = {
      load_id: loadId,
      target_type: 'api',
      target_config: apiConfig,
      start_time: new Date(),
      status: 'in_progress',
      api_calls_made: 0,
      records_sent: 0
    };

    try {
      const batchSize = apiConfig.batch_size || 100;
      const pipeline = this.buildLoadingPipeline(apiConfig);
      const cursor = this.transformedCollection.aggregate(pipeline);

      let batch = [];
      let apiCallCount = 0;
      let recordCount = 0;

      for await (const doc of cursor) {
        const apiDoc = this.formatForAPI(doc, apiConfig);
        batch.push(apiDoc);

        if (batch.length >= batchSize) {
          await this.sendToAPI(batch, apiConfig);
          apiCallCount++;
          recordCount += batch.length;
          batch = [];

          // Rate limiting
          if (apiConfig.rate_limit_delay) {
            await this.delay(apiConfig.rate_limit_delay);
          }

          // Update progress
          loadMetadata.api_calls_made = apiCallCount;
          loadMetadata.records_sent = recordCount;
          await this.updateLoadProgress(loadId, loadMetadata);
        }
      }

      // Send final batch
      if (batch.length > 0) {
        await this.sendToAPI(batch, apiConfig);
        apiCallCount++;
        recordCount += batch.length;
      }

      loadMetadata.api_calls_made = apiCallCount;
      loadMetadata.records_sent = recordCount;
      loadMetadata.status = 'completed';
      loadMetadata.end_time = new Date();

    } catch (error) {
      loadMetadata.status = 'failed';
      loadMetadata.error = error.message;
      loadMetadata.end_time = new Date();
      throw error;
    } finally {
      await this.outputMetadataCollection.replaceOne(
        { load_id: loadId },
        loadMetadata,
        { upsert: true }
      );
    }

    return loadMetadata;
  }

  buildLoadingPipeline(config) {
    const pipeline = [];

    // Filter data for loading
    if (config.filter) {
      pipeline.push({ $match: config.filter });
    }

    // Sort for consistent ordering
    if (config.sort) {
      pipeline.push({ $sort: config.sort });
    }

    // Limit if specified
    if (config.limit) {
      pipeline.push({ $limit: config.limit });
    }

    // Project fields for target format
    if (config.projection) {
      pipeline.push({ $project: config.projection });
    }

    return pipeline;
  }

  applyTargetTransformations(doc, config) {
    let transformedDoc = { ...doc };

    // Apply target-specific field mappings
    if (config.field_mappings) {
      const mappedDoc = {};
      for (const [sourceField, targetField] of Object.entries(config.field_mappings)) {
        const value = this.getNestedValue(transformedDoc, sourceField);
        if (value !== undefined) {
          this.setNestedValue(mappedDoc, targetField, value);
        }
      }
      transformedDoc = { ...transformedDoc, ...mappedDoc };
    }

    // Apply data type conversions
    if (config.type_conversions) {
      config.type_conversions.forEach(conversion => {
        const value = this.getNestedValue(transformedDoc, conversion.field);
        if (value !== undefined) {
          const convertedValue = this.convertDataType(value, conversion.target_type, conversion.options);
          this.setNestedValue(transformedDoc, conversion.field, convertedValue);
        }
      });
    }

    return transformedDoc;
  }

  async insertBatch(collection, batch, strategy) {
    if (strategy === 'upsert') {
      // Perform upsert operations
      const bulkOps = batch.map(doc => ({
        replaceOne: {
          filter: { [doc._id ? '_id' : 'unique_key']: doc._id || doc.unique_key },
          replacement: doc,
          upsert: true
        }
      }));
      await collection.bulkWrite(bulkOps, { ordered: false });
    } else {
      // Regular insert
      await collection.insertMany(batch, { ordered: false });
    }
  }

  async createTargetIndexes(collection, indexDefinitions) {
    for (const indexDef of indexDefinitions) {
      try {
        await collection.createIndex(indexDef.fields, indexDef.options || {});
      } catch (error) {
        console.warn(`Failed to create index: ${error.message}`);
      }
    }
  }

  convertDataType(value, targetType, options = {}) {
    switch (targetType) {
      case 'string':
        return String(value);
      case 'number':
        return Number(value);
      case 'date':
        return new Date(value);
      case 'boolean':
        return Boolean(value);
      case 'array':
        return Array.isArray(value) ? value : [value];
      case 'object':
        return typeof value === 'object' ? value : { value };
      default:
        return value;
    }
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }

  async updateLoadProgress(loadId, metadata) {
    await this.outputMetadataCollection.updateOne(
      { load_id: loadId },
      { $set: metadata },
      { upsert: true }
    );
  }

  async exportToFile(config, format) {
    // Implement file export logic
    return '/tmp/export.parquet'; // Placeholder
  }

  async uploadToWarehouse(filePath, config) {
    // Implement warehouse upload logic
  }

  async executeWarehouseLoad(config) {
    // Execute warehouse-specific loading commands
    return { recordCount: 0, tableName: config.table }; // Placeholder
  }

  formatForAPI(doc, config) {
    // Format document according to API requirements
    return doc; // Placeholder
  }

  async sendToAPI(batch, config) {
    // Send data to external API
  }

  delay(ms) {
    return new Promise(resolve => setTimeout(resolve, ms));
  }
}

Real-Time Stream Processing

Change Stream ETL

Implement real-time ETL using MongoDB change streams:

// Real-time ETL using change streams
class StreamETLProcessor {
  constructor(db, config) {
    this.db = db;
    this.config = config;
    this.changeStreams = new Map();
    this.transformationService = new DataTransformationService(db);
    this.loadingService = new DataLoadingService(db);
  }

  async startStreamProcessing(streamConfig) {
    const sourceCollection = this.db.collection(streamConfig.source_collection);

    // Configure change stream options
    const changeStreamOptions = {
      fullDocument: 'updateLookup',
      resumeAfter: streamConfig.resume_token,
      maxAwaitTimeMS: 1000
    };

    // Create change stream with pipeline filter
    const pipeline = [];
    if (streamConfig.operation_filter) {
      pipeline.push({
        $match: {
          'operationType': { $in: streamConfig.operation_filter }
        }
      });
    }

    if (streamConfig.document_filter) {
      pipeline.push({
        $match: {
          'fullDocument': streamConfig.document_filter
        }
      });
    }

    const changeStream = sourceCollection.watch(pipeline, changeStreamOptions);
    this.changeStreams.set(streamConfig.stream_id, changeStream);

    // Process change events
    changeStream.on('change', async (changeEvent) => {
      try {
        await this.processChangeEvent(changeEvent, streamConfig);
      } catch (error) {
        console.error('Error processing change event:', error);
        await this.handleStreamError(error, changeEvent, streamConfig);
      }
    });

    changeStream.on('error', async (error) => {
      console.error('Change stream error:', error);
      await this.handleStreamError(error, null, streamConfig);
    });

    console.log(`Started stream processing for ${streamConfig.stream_id}`);
    return changeStream;
  }

  async processChangeEvent(changeEvent, streamConfig) {
    const processingMetadata = {
      stream_id: streamConfig.stream_id,
      change_event_id: changeEvent._id,
      operation_type: changeEvent.operationType,
      processing_time: new Date(),
      status: 'processing'
    };

    try {
      let documentToProcess = null;

      // Extract document based on operation type
      switch (changeEvent.operationType) {
        case 'insert':
        case 'replace':
          documentToProcess = changeEvent.fullDocument;
          break;
        case 'update':
          documentToProcess = changeEvent.fullDocument;
          // Include update description for delta processing
          documentToProcess._updateDescription = changeEvent.updateDescription;
          break;
        case 'delete':
          documentToProcess = changeEvent.fullDocumentBeforeChange || 
                             { _id: changeEvent.documentKey._id, _deleted: true };
          break;
      }

      if (documentToProcess) {
        // Apply real-time transformations
        const transformedDocument = await this.applyStreamTransformations(
          documentToProcess, 
          streamConfig.transformations,
          changeEvent
        );

        // Apply business rules and validations
        if (streamConfig.business_rules) {
          const validationResult = await this.validateBusinessRules(
            transformedDocument,
            streamConfig.business_rules,
            changeEvent
          );

          if (!validationResult.valid) {
            await this.handleValidationFailure(validationResult, transformedDocument, streamConfig);
            return;
          }
        }

        // Load to target systems
        if (streamConfig.target_systems) {
          await this.loadToTargetSystems(
            transformedDocument,
            streamConfig.target_systems,
            changeEvent
          );
        }

        // Update processing status
        processingMetadata.status = 'completed';
        processingMetadata.records_processed = 1;

      }

      await this.logStreamProcessing(processingMetadata);

    } catch (error) {
      processingMetadata.status = 'failed';
      processingMetadata.error = error.message;
      await this.logStreamProcessing(processingMetadata);
      throw error;
    }
  }

  async applyStreamTransformations(document, transformations, changeEvent) {
    let transformedDoc = { ...document };

    for (const transformation of transformations) {
      switch (transformation.type) {
        case 'field_mapping':
          transformedDoc = this.applyFieldMapping(transformedDoc, transformation.mapping);
          break;

        case 'computed_fields':
          transformedDoc = await this.applyComputedFields(transformedDoc, transformation.computations);
          break;

        case 'enrichment':
          transformedDoc = await this.applyEnrichment(transformedDoc, transformation.enrichment_config);
          break;

        case 'aggregation':
          transformedDoc = await this.applyStreamAggregation(transformedDoc, transformation.aggregation_config, changeEvent);
          break;

        case 'custom_function':
          transformedDoc = await transformation.function(transformedDoc, changeEvent);
          break;
      }
    }

    // Add stream processing metadata
    transformedDoc._stream_metadata = {
      stream_id: changeEvent.streamId,
      change_event_id: changeEvent._id,
      operation_type: changeEvent.operationType,
      processed_at: new Date(),
      cluster_time: changeEvent.clusterTime
    };

    return transformedDoc;
  }

  async applyComputedFields(document, computations) {
    const enrichedDoc = { ...document };

    for (const computation of computations) {
      try {
        let computedValue = null;

        switch (computation.type) {
          case 'lookup':
            computedValue = await this.performLookup(document, computation.config);
            break;

          case 'calculation':
            computedValue = this.performCalculation(document, computation.config);
            break;

          case 'text_processing':
            computedValue = this.performTextProcessing(document, computation.config);
            break;

          case 'date_calculation':
            computedValue = this.performDateCalculation(document, computation.config);
            break;
        }

        if (computedValue !== null) {
          this.setNestedValue(enrichedDoc, computation.target_field, computedValue);
        }

      } catch (error) {
        console.warn(`Failed to compute field ${computation.target_field}:`, error);
      }
    }

    return enrichedDoc;
  }

  async applyEnrichment(document, enrichmentConfig) {
    const enrichedDoc = { ...document };

    for (const enrichment of enrichmentConfig) {
      try {
        const lookupCollection = this.db.collection(enrichment.lookup_collection);
        const lookupKey = this.getNestedValue(document, enrichment.source_field);

        if (lookupKey) {
          const lookupDoc = await lookupCollection.findOne({
            [enrichment.lookup_field]: lookupKey
          });

          if (lookupDoc) {
            // Merge lookup data
            if (enrichment.merge_strategy === 'full') {
              Object.assign(enrichedDoc, lookupDoc);
            } else if (enrichment.merge_strategy === 'selective') {
              enrichment.fields_to_merge.forEach(field => {
                if (lookupDoc[field] !== undefined) {
                  this.setNestedValue(enrichedDoc, enrichment.target_prefix + field, lookupDoc[field]);
                }
              });
            }
          }
        }

      } catch (error) {
        console.warn(`Failed to apply enrichment ${enrichment.name}:`, error);
      }
    }

    return enrichedDoc;
  }

  async applyStreamAggregation(document, aggregationConfig, changeEvent) {
    const aggregatedDoc = { ...document };

    // Real-time aggregation using upsert operations
    for (const aggregation of aggregationConfig) {
      try {
        const aggregationCollection = this.db.collection(aggregation.target_collection);
        const groupingKey = this.buildGroupingKey(document, aggregation.group_by);

        // Build update operations for real-time aggregation
        const updateOps = {};

        aggregation.aggregations.forEach(agg => {
          const sourceValue = this.getNestedValue(document, agg.source_field);

          switch (agg.operation) {
            case 'sum':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[agg.target_field] = sourceValue || 0;
              break;

            case 'count':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[agg.target_field] = 1;
              break;

            case 'avg':
              updateOps.$inc = updateOps.$inc || {};
              updateOps.$inc[`${agg.target_field}_sum`] = sourceValue || 0;
              updateOps.$inc[`${agg.target_field}_count`] = 1;
              break;

            case 'min':
              updateOps.$min = updateOps.$min || {};
              updateOps.$min[agg.target_field] = sourceValue;
              break;

            case 'max':
              updateOps.$max = updateOps.$max || {};
              updateOps.$max[agg.target_field] = sourceValue;
              break;

            case 'addToSet':
              updateOps.$addToSet = updateOps.$addToSet || {};
              updateOps.$addToSet[agg.target_field] = sourceValue;
              break;
          }
        });

        // Set metadata fields
        updateOps.$set = updateOps.$set || {};
        updateOps.$set.last_updated = new Date();
        updateOps.$set.last_change_event = changeEvent._id;

        // Perform upsert operation
        await aggregationCollection.updateOne(
          groupingKey,
          updateOps,
          { upsert: true }
        );

      } catch (error) {
        console.warn(`Failed to apply stream aggregation ${aggregation.name}:`, error);
      }
    }

    return aggregatedDoc;
  }

  buildGroupingKey(document, groupByConfig) {
    const groupingKey = {};

    if (Array.isArray(groupByConfig)) {
      groupByConfig.forEach(field => {
        const value = this.getNestedValue(document, field);
        groupingKey[field.replace('.', '_')] = value;
      });
    } else if (typeof groupByConfig === 'object') {
      Object.entries(groupByConfig).forEach(([key, expression]) => {
        // Support for complex grouping expressions
        groupingKey[key] = this.evaluateExpression(document, expression);
      });
    }

    return groupingKey;
  }

  async validateBusinessRules(document, businessRules, changeEvent) {
    const validationResult = {
      valid: true,
      failures: [],
      warnings: []
    };

    for (const rule of businessRules) {
      try {
        const ruleResult = await this.evaluateBusinessRule(document, rule, changeEvent);

        if (!ruleResult.passed) {
          if (rule.severity === 'error') {
            validationResult.valid = false;
            validationResult.failures.push({
              rule: rule.name,
              message: ruleResult.message
            });
          } else if (rule.severity === 'warning') {
            validationResult.warnings.push({
              rule: rule.name,
              message: ruleResult.message
            });
          }
        }

      } catch (error) {
        validationResult.valid = false;
        validationResult.failures.push({
          rule: rule.name,
          message: `Rule evaluation failed: ${error.message}`
        });
      }
    }

    return validationResult;
  }

  async evaluateBusinessRule(document, rule, changeEvent) {
    switch (rule.type) {
      case 'field_validation':
        return this.validateField(document, rule.config);

      case 'cross_document_validation':
        return await this.validateCrossDocument(document, rule.config);

      case 'temporal_validation':
        return this.validateTemporal(document, changeEvent, rule.config);

      case 'custom_validation':
        return await rule.config.validator(document, changeEvent);

      default:
        return { passed: true };
    }
  }

  validateField(document, config) {
    const fieldValue = this.getNestedValue(document, config.field);

    // Check required fields
    if (config.required && (fieldValue === undefined || fieldValue === null)) {
      return {
        passed: false,
        message: `Required field '${config.field}' is missing`
      };
    }

    // Check data type
    if (fieldValue !== undefined && config.data_type) {
      const actualType = typeof fieldValue;
      if (actualType !== config.data_type) {
        return {
          passed: false,
          message: `Field '${config.field}' expected type ${config.data_type}, got ${actualType}`
        };
      }
    }

    // Check value range
    if (fieldValue !== undefined && config.range) {
      if (fieldValue < config.range.min || fieldValue > config.range.max) {
        return {
          passed: false,
          message: `Field '${config.field}' value ${fieldValue} is outside range [${config.range.min}, ${config.range.max}]`
        };
      }
    }

    // Check allowed values
    if (fieldValue !== undefined && config.allowed_values) {
      if (!config.allowed_values.includes(fieldValue)) {
        return {
          passed: false,
          message: `Field '${config.field}' value '${fieldValue}' is not in allowed values: ${config.allowed_values.join(', ')}`
        };
      }
    }

    return { passed: true };
  }

  async validateCrossDocument(document, config) {
    const referenceCollection = this.db.collection(config.reference_collection);
    const referenceValue = this.getNestedValue(document, config.source_field);

    if (referenceValue) {
      const referenceDoc = await referenceCollection.findOne({
        [config.reference_field]: referenceValue
      });

      if (!referenceDoc && config.required) {
        return {
          passed: false,
          message: `Reference document not found for ${config.source_field}: ${referenceValue}`
        };
      }

      // Additional cross-document validations
      if (referenceDoc && config.additional_checks) {
        for (const check of config.additional_checks) {
          const refValue = this.getNestedValue(referenceDoc, check.reference_field);
          const docValue = this.getNestedValue(document, check.document_field);

          if (!this.compareValues(docValue, refValue, check.comparison)) {
            return {
              passed: false,
              message: `Cross-document validation failed: ${check.message}`
            };
          }
        }
      }
    }

    return { passed: true };
  }

  validateTemporal(document, changeEvent, config) {
    const timestamp = changeEvent.clusterTime || new Date();

    // Check business hours
    if (config.business_hours) {
      const hour = timestamp.getHours();
      if (hour < config.business_hours.start || hour > config.business_hours.end) {
        return {
          passed: false,
          message: `Operation outside business hours: ${hour}`
        };
      }
    }

    // Check rate limits
    if (config.rate_limit) {
      // This would require additional state tracking
      // Implementation depends on specific rate limiting strategy
    }

    return { passed: true };
  }

  compareValues(value1, value2, comparison) {
    switch (comparison) {
      case 'eq': return value1 === value2;
      case 'ne': return value1 !== value2;
      case 'gt': return value1 > value2;
      case 'gte': return value1 >= value2;
      case 'lt': return value1 < value2;
      case 'lte': return value1 <= value2;
      default: return false;
    }
  }

  async loadToTargetSystems(document, targetSystems, changeEvent) {
    for (const target of targetSystems) {
      try {
        switch (target.type) {
          case 'mongodb':
            await this.loadToMongoTarget(document, target.config);
            break;

          case 'elasticsearch':
            await this.loadToElasticsearch(document, target.config);
            break;

          case 'kafka':
            await this.loadToKafka(document, target.config, changeEvent);
            break;

          case 'webhook':
            await this.loadToWebhook(document, target.config);
            break;
        }

      } catch (error) {
        console.error(`Failed to load to target system ${target.name}:`, error);
        await this.handleTargetLoadError(error, document, target, changeEvent);
      }
    }
  }

  async loadToMongoTarget(document, config) {
    const targetCollection = this.db.collection(config.collection);

    if (config.strategy === 'upsert') {
      const filter = {};
      config.unique_fields.forEach(field => {
        filter[field] = this.getNestedValue(document, field);
      });

      await targetCollection.replaceOne(filter, document, { upsert: true });
    } else {
      await targetCollection.insertOne(document);
    }
  }

  async loadToKafka(document, config, changeEvent) {
    // Send to Kafka topic
    const message = {
      key: this.getNestedValue(document, config.key_field),
      value: JSON.stringify(document),
      headers: {
        operation_type: changeEvent.operationType,
        timestamp: new Date().toISOString()
      }
    };

    // This would use a Kafka producer client
    console.log(`Would send to Kafka topic ${config.topic}:`, message);
  }

  async loadToWebhook(document, config) {
    // Send HTTP request to webhook
    const payload = {
      data: document,
      timestamp: new Date().toISOString(),
      source: 'mongodb-etl'
    };

    // This would use an HTTP client
    console.log(`Would send webhook to ${config.url}:`, payload);
  }

  async handleStreamError(error, changeEvent, streamConfig) {
    // Log stream processing errors
    const errorDoc = {
      stream_id: streamConfig.stream_id,
      error_type: 'stream_processing_error',
      error_message: error.message,
      change_event: changeEvent,
      timestamp: new Date()
    };

    await this.db.collection('stream_errors').insertOne(errorDoc);
  }

  async handleValidationFailure(validationResult, document, streamConfig) {
    // Handle business rule validation failures
    const failureDoc = {
      stream_id: streamConfig.stream_id,
      failure_type: 'validation_failure',
      validation_failures: validationResult.failures,
      validation_warnings: validationResult.warnings,
      document: document,
      timestamp: new Date()
    };

    await this.db.collection('validation_failures').insertOne(failureDoc);
  }

  async handleTargetLoadError(error, document, target, changeEvent) {
    // Handle target system loading errors
    const errorDoc = {
      target_system: target.name,
      error_type: 'target_load_error',
      error_message: error.message,
      document: document,
      change_event_id: changeEvent._id,
      timestamp: new Date()
    };

    await this.db.collection('target_load_errors').insertOne(errorDoc);
  }

  performLookup(document, config) {
    // Implement lookup logic
    return null; // Placeholder
  }

  performCalculation(document, config) {
    // Implement calculation logic
    return null; // Placeholder
  }

  performTextProcessing(document, config) {
    // Implement text processing logic
    return null; // Placeholder
  }

  performDateCalculation(document, config) {
    // Implement date calculation logic
    return null; // Placeholder
  }

  evaluateExpression(document, expression) {
    // Implement expression evaluation
    return null; // Placeholder
  }

  applyFieldMapping(document, mapping) {
    // Implement field mapping logic
    return document; // Placeholder
  }

  getNestedValue(obj, path) {
    return path.split('.').reduce((current, prop) => current?.[prop], obj);
  }

  setNestedValue(obj, path, value) {
    const props = path.split('.');
    const lastProp = props.pop();
    const target = props.reduce((current, prop) => {
      if (!current[prop]) current[prop] = {};
      return current[prop];
    }, obj);
    target[lastProp] = value;
  }

  async logStreamProcessing(metadata) {
    await this.db.collection('stream_processing_log').insertOne(metadata);
  }

  async stopStreamProcessing(streamId) {
    const changeStream = this.changeStreams.get(streamId);
    if (changeStream) {
      await changeStream.close();
      this.changeStreams.delete(streamId);
      console.log(`Stopped stream processing for ${streamId}`);
    }
  }
}

SQL-Style ETL with QueryLeaf

QueryLeaf provides familiar SQL-style ETL operations with MongoDB's powerful aggregation capabilities:

-- QueryLeaf ETL operations with SQL-familiar syntax

-- Data extraction with SQL-style filtering and projection
WITH extracted_data AS (
  SELECT 
    customer_id,
    email,
    first_name,
    last_name,
    registration_date,
    profile_data,
    last_login_date,
    CASE 
      WHEN profile_data->>'premium' = 'true' THEN 'premium'
      WHEN profile_data->>'verified' = 'true' THEN 'verified'
      ELSE 'basic'
    END AS customer_tier
  FROM raw_customers
  WHERE registration_date >= CURRENT_DATE - INTERVAL '30 days'
    AND email IS NOT NULL
    AND email LIKE '%@%.%'
)

-- Data transformation with complex calculations
, transformed_data AS (
  SELECT 
    customer_id,
    CONCAT(first_name, ' ', last_name) AS full_name,
    LOWER(email) AS normalized_email,
    SUBSTRING(email FROM POSITION('@' IN email) + 1) AS email_domain,
    DATE_TRUNC('month', registration_date) AS registration_month,
    customer_tier,

    -- Customer lifecycle calculations
    CASE 
      WHEN last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 'active'
      WHEN last_login_date >= CURRENT_DATE - INTERVAL '30 days' THEN 'inactive'
      ELSE 'dormant'
    END AS activity_status,

    -- Age calculation
    EXTRACT(DAYS FROM (CURRENT_DATE - registration_date)) AS days_since_registration,

    -- JSON processing and nested field extraction
    JSON_EXTRACT(profile_data, '$.preferences.notifications') AS notification_preferences,
    ARRAY_LENGTH(JSON_EXTRACT(profile_data, '$.purchase_history')) AS purchase_count,

    -- Computed engagement score
    CASE 
      WHEN customer_tier = 'premium' THEN 100
      WHEN days_since_registration < 30 AND last_login_date >= CURRENT_DATE - INTERVAL '7 days' THEN 75
      WHEN customer_tier = 'verified' THEN 50
      ELSE 25
    END AS engagement_score,

    CURRENT_TIMESTAMP AS transformation_timestamp
  FROM extracted_data
)

-- Data aggregation and analytical computations
, aggregated_metrics AS (
  SELECT 
    email_domain,
    registration_month,
    customer_tier,
    activity_status,
    COUNT(*) as customer_count,
    AVG(engagement_score) as avg_engagement_score,
    COUNT(CASE WHEN activity_status = 'active' THEN 1 END) as active_customers,
    COUNT(CASE WHEN days_since_registration < 7 THEN 1 END) as new_customers,

    -- Advanced aggregations
    PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY engagement_score) as median_engagement,
    STDDEV(engagement_score) as engagement_variance,

    -- Array aggregations
    ARRAY_AGG(customer_id ORDER BY engagement_score DESC LIMIT 10) as top_customers,

    -- JSON aggregation for complex data structures
    JSON_OBJECT_AGG(
      customer_tier, 
      JSON_OBJECT(
        'count', COUNT(*),
        'avg_score', AVG(engagement_score)
      )
    ) as tier_metrics

  FROM transformed_data
  GROUP BY email_domain, registration_month, customer_tier, activity_status
  HAVING COUNT(*) >= 5  -- Filter out small groups
)

-- Final data loading with upsert semantics
INSERT INTO customer_analytics (
  email_domain,
  registration_month, 
  customer_tier,
  activity_status,
  customer_count,
  avg_engagement_score,
  active_customers,
  new_customers,
  median_engagement,
  engagement_variance,
  top_customers,
  tier_metrics,
  last_updated,
  etl_run_id
)
SELECT 
  email_domain,
  registration_month,
  customer_tier, 
  activity_status,
  customer_count,
  ROUND(avg_engagement_score, 2),
  active_customers,
  new_customers,
  ROUND(median_engagement, 2),
  ROUND(engagement_variance, 2),
  top_customers,
  tier_metrics,
  CURRENT_TIMESTAMP,
  'etl_run_' || EXTRACT(EPOCH FROM CURRENT_TIMESTAMP)
FROM aggregated_metrics
ON CONFLICT (email_domain, registration_month, customer_tier, activity_status)
DO UPDATE SET
  customer_count = EXCLUDED.customer_count,
  avg_engagement_score = EXCLUDED.avg_engagement_score,
  active_customers = EXCLUDED.active_customers,
  new_customers = EXCLUDED.new_customers,
  median_engagement = EXCLUDED.median_engagement,
  engagement_variance = EXCLUDED.engagement_variance,
  top_customers = EXCLUDED.top_customers,
  tier_metrics = EXCLUDED.tier_metrics,
  last_updated = EXCLUDED.last_updated,
  etl_run_id = EXCLUDED.etl_run_id;

-- Real-time streaming ETL with change data capture
CREATE OR REPLACE TRIGGER customer_changes_trigger
AFTER INSERT OR UPDATE OR DELETE ON customers
FOR EACH ROW
BEGIN
  -- Capture change event
  INSERT INTO customer_change_stream (
    change_type,
    customer_id,
    old_data,
    new_data,
    change_timestamp,
    change_sequence
  ) VALUES (
    TG_OP,  -- INSERT, UPDATE, or DELETE
    COALESCE(NEW.customer_id, OLD.customer_id),
    CASE WHEN TG_OP = 'DELETE' THEN ROW_TO_JSON(OLD) ELSE NULL END,
    CASE WHEN TG_OP != 'DELETE' THEN ROW_TO_JSON(NEW) ELSE NULL END,
    CURRENT_TIMESTAMP,
    nextval('change_sequence')
  );

  -- Trigger downstream processing
  PERFORM pg_notify('customer_changed', 
    JSON_BUILD_OBJECT(
      'operation', TG_OP,
      'customer_id', COALESCE(NEW.customer_id, OLD.customer_id),
      'timestamp', CURRENT_TIMESTAMP
    )::TEXT
  );
END;

-- Advanced window functions for trend analysis
WITH customer_trends AS (
  SELECT 
    customer_id,
    registration_date,
    engagement_score,

    -- Running totals and moving averages
    SUM(engagement_score) OVER (
      PARTITION BY email_domain 
      ORDER BY registration_date 
      ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_engagement,

    AVG(engagement_score) OVER (
      PARTITION BY customer_tier
      ORDER BY registration_date
      ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
    ) as moving_avg_engagement,

    -- Ranking and percentile calculations
    PERCENT_RANK() OVER (
      PARTITION BY registration_month 
      ORDER BY engagement_score
    ) as engagement_percentile,

    ROW_NUMBER() OVER (
      PARTITION BY email_domain, customer_tier 
      ORDER BY engagement_score DESC
    ) as tier_rank,

    -- Lead/Lag for sequential analysis
    LAG(engagement_score, 1) OVER (
      PARTITION BY customer_id 
      ORDER BY last_login_date
    ) as previous_engagement,

    -- First/Last value analytics
    FIRST_VALUE(engagement_score) OVER (
      PARTITION BY customer_id 
      ORDER BY registration_date 
      ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING
    ) as initial_engagement

  FROM transformed_data
)

-- Pivot tables for cross-dimensional analysis
SELECT *
FROM (
  SELECT 
    registration_month,
    customer_tier,
    activity_status,
    customer_count
  FROM aggregated_metrics
) AS source_data
PIVOT (
  SUM(customer_count)
  FOR activity_status IN ('active', 'inactive', 'dormant')
) AS pivoted_activity;

-- Time-series analysis for trend detection
WITH monthly_trends AS (
  SELECT 
    DATE_TRUNC('month', registration_date) as month,
    customer_tier,
    COUNT(*) as registrations,
    AVG(engagement_score) as avg_engagement,

    -- Year-over-year comparisons
    LAG(COUNT(*), 12) OVER (
      PARTITION BY customer_tier 
      ORDER BY DATE_TRUNC('month', registration_date)
    ) as registrations_year_ago,

    -- Growth rate calculations
    CASE 
      WHEN LAG(COUNT(*), 1) OVER (
        PARTITION BY customer_tier 
        ORDER BY DATE_TRUNC('month', registration_date)
      ) > 0 THEN
        ROUND(
          ((COUNT(*)::FLOAT / LAG(COUNT(*), 1) OVER (
            PARTITION BY customer_tier 
            ORDER BY DATE_TRUNC('month', registration_date)
          )) - 1) * 100, 2
        )
      ELSE NULL
    END as month_over_month_growth

  FROM customers
  WHERE registration_date >= CURRENT_DATE - INTERVAL '24 months'
  GROUP BY DATE_TRUNC('month', registration_date), customer_tier
)

-- Data quality monitoring and validation
, quality_metrics AS (
  SELECT 
    'customers' as table_name,
    COUNT(*) as total_records,
    COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END) as missing_email,
    COUNT(CASE WHEN first_name IS NULL OR first_name = '' THEN 1 END) as missing_first_name,
    COUNT(CASE WHEN email !~ '^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}$' THEN 1 END) as invalid_email,
    COUNT(CASE WHEN registration_date > CURRENT_DATE THEN 1 END) as future_registration,

    -- Data completeness percentage
    ROUND(
      (COUNT(*) - COUNT(CASE WHEN email IS NULL OR email = '' THEN 1 END)) * 100.0 / COUNT(*), 
      2
    ) as email_completeness_pct,

    -- Data freshness
    MAX(registration_date) as latest_registration,
    MIN(registration_date) as earliest_registration,
    EXTRACT(DAYS FROM (CURRENT_DATE - MAX(registration_date))) as days_since_last_record

  FROM customers
)

-- Output final results with data lineage tracking
SELECT 
  m.*,
  q.total_records,
  q.email_completeness_pct,
  q.days_since_last_record,
  'etl_pipeline_v2.1' as pipeline_version,
  CURRENT_TIMESTAMP as processed_at
FROM monthly_trends m
CROSS JOIN quality_metrics q
ORDER BY m.month DESC, m.customer_tier;

-- QueryLeaf automatically handles:
-- 1. MongoDB aggregation pipeline generation for complex SQL operations
-- 2. Nested document processing and JSON operations
-- 3. Change stream integration for real-time ETL
-- 4. Parallel processing and optimization for large datasets
-- 5. Error handling and data validation
-- 6. Integration with external systems and data formats

ETL Monitoring and Management

Performance Monitoring

Implement comprehensive ETL monitoring:

// ETL monitoring and performance tracking service
class ETLMonitoringService {
  constructor(db) {
    this.db = db;
    this.metricsCollection = db.collection('etl_metrics');
    this.alertsCollection = db.collection('etl_alerts');
    this.performanceCollection = db.collection('etl_performance');
  }

  async trackPipelineExecution(pipelineId, executionMetadata) {
    const metrics = {
      pipeline_id: pipelineId,
      execution_id: executionMetadata.execution_id,
      start_time: executionMetadata.start_time,
      end_time: executionMetadata.end_time,
      duration_ms: executionMetadata.end_time - executionMetadata.start_time,
      status: executionMetadata.status,

      // Data volume metrics
      records_extracted: executionMetadata.records_extracted || 0,
      records_transformed: executionMetadata.records_transformed || 0,
      records_loaded: executionMetadata.records_loaded || 0,
      records_failed: executionMetadata.records_failed || 0,

      // Performance metrics
      extraction_duration_ms: executionMetadata.extraction_duration || 0,
      transformation_duration_ms: executionMetadata.transformation_duration || 0,
      loading_duration_ms: executionMetadata.loading_duration || 0,

      // Resource utilization
      memory_peak_mb: executionMetadata.memory_peak || 0,
      cpu_usage_percent: executionMetadata.cpu_usage || 0,
      disk_io_mb: executionMetadata.disk_io || 0,

      // Error information
      error_count: executionMetadata.errors?.length || 0,
      error_details: executionMetadata.errors || [],

      timestamp: new Date()
    };

    await this.metricsCollection.insertOne(metrics);

    // Check for performance anomalies
    await this.checkPerformanceAlerts(pipelineId, metrics);

    return metrics;
  }

  async generatePipelineReport(pipelineId, timeRange = 7) {
    const sinceDate = new Date(Date.now() - timeRange * 24 * 60 * 60 * 1000);

    const reportPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: {
            date: { $dateToString: { format: "%Y-%m-%d", date: "$timestamp" } }
          },

          // Execution metrics
          total_executions: { $sum: 1 },
          successful_executions: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failed_executions: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },

          // Duration statistics
          avg_duration_ms: { $avg: "$duration_ms" },
          max_duration_ms: { $max: "$duration_ms" },
          min_duration_ms: { $min: "$duration_ms" },

          // Data volume statistics
          total_records_processed: { $sum: "$records_transformed" },
          avg_records_per_execution: { $avg: "$records_transformed" },

          // Performance metrics
          avg_memory_usage_mb: { $avg: "$memory_peak_mb" },
          avg_cpu_usage: { $avg: "$cpu_usage_percent" },

          // Error analysis
          total_errors: { $sum: "$error_count" },
          unique_error_types: { $addToSet: "$error_details.error_type" }
        }
      },
      {
        $addFields: {
          success_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$successful_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          },
          throughput_records_per_minute: {
            $round: [
              { $divide: [
                "$avg_records_per_execution",
                { $divide: ["$avg_duration_ms", 60000] }
              ]},
              0
            ]
          }
        }
      },
      {
        $sort: { "_id.date": -1 }
      }
    ];

    const dailyMetrics = await this.metricsCollection.aggregate(reportPipeline).toArray();

    // Generate overall statistics
    const overallStats = await this.generateOverallStatistics(pipelineId, sinceDate);

    // Generate trend analysis
    const trendAnalysis = await this.generateTrendAnalysis(pipelineId, sinceDate);

    return {
      pipeline_id: pipelineId,
      report_period_days: timeRange,
      generated_at: new Date(),
      daily_metrics: dailyMetrics,
      overall_statistics: overallStats,
      trend_analysis: trendAnalysis,
      recommendations: await this.generateRecommendations(pipelineId, dailyMetrics, overallStats)
    };
  }

  async generateOverallStatistics(pipelineId, sinceDate) {
    const statsPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: null,
          total_executions: { $sum: 1 },
          successful_executions: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failed_executions: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },
          avg_duration_minutes: {
            $avg: { $divide: ["$duration_ms", 60000] }
          },
          total_records_processed: { $sum: "$records_transformed" },
          total_errors: { $sum: "$error_count" },

          // Performance percentiles
          duration_p50: { $percentile: { input: "$duration_ms", p: [0.5], method: 'approximate' } },
          duration_p95: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
          duration_p99: { $percentile: { input: "$duration_ms", p: [0.99], method: 'approximate' } },

          memory_p95: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },

          // First and last execution
          first_execution: { $min: "$timestamp" },
          last_execution: { $max: "$timestamp" }
        }
      },
      {
        $addFields: {
          success_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$successful_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          },
          avg_throughput_records_per_hour: {
            $round: [
              { $divide: [
                "$total_records_processed",
                { $divide: [
                  { $subtract: ["$last_execution", "$first_execution"] },
                  3600000  // Convert ms to hours
                ]}
              ]},
              0
            ]
          },
          error_rate: {
            $round: [
              { $multiply: [
                { $divide: ["$failed_executions", "$total_executions"] },
                100
              ]},
              2
            ]
          }
        }
      }
    ];

    const stats = await this.metricsCollection.aggregate(statsPipeline).toArray();
    return stats[0] || {};
  }

  async generateTrendAnalysis(pipelineId, sinceDate) {
    const trendPipeline = [
      {
        $match: {
          pipeline_id: pipelineId,
          timestamp: { $gte: sinceDate }
        }
      },
      {
        $group: {
          _id: {
            week: { $week: "$timestamp" },
            year: { $year: "$timestamp" }
          },
          avg_duration: { $avg: "$duration_ms" },
          avg_records_processed: { $avg: "$records_transformed" },
          success_rate: {
            $avg: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          execution_count: { $sum: 1 }
        }
      },
      {
        $sort: { "_id.year": 1, "_id.week": 1 }
      },
      {
        $group: {
          _id: null,
          weekly_data: {
            $push: {
              week: "$_id.week",
              year: "$_id.year",
              avg_duration: "$avg_duration",
              avg_records: "$avg_records_processed",
              success_rate: "$success_rate",
              execution_count: "$execution_count"
            }
          }
        }
      },
      {
        $addFields: {
          // Calculate trends using linear regression approximation
          duration_trend: {
            $let: {
              vars: {
                n: { $size: "$weekly_data" },
                data: "$weekly_data"
              },
              in: {
                $cond: {
                  if: { $gte: ["$$n", 3] },
                  then: {
                    // Simple trend calculation (last value vs first value)
                    $subtract: [
                      { $arrayElemAt: ["$$data.avg_duration", -1] },
                      { $arrayElemAt: ["$$data.avg_duration", 0] }
                    ]
                  },
                  else: null
                }
              }
            }
          },
          performance_trend: {
            $let: {
              vars: {
                n: { $size: "$weekly_data" },
                data: "$weekly_data"
              },
              in: {
                $cond: {
                  if: { $gte: ["$$n", 3] },
                  then: {
                    $subtract: [
                      { $arrayElemAt: ["$$data.avg_records", -1] },
                      { $arrayElemAt: ["$$data.avg_records", 0] }
                    ]
                  },
                  else: null
                }
              }
            }
          }
        }
      }
    ];

    const trendData = await this.metricsCollection.aggregate(trendPipeline).toArray();
    return trendData[0] || { weekly_data: [], duration_trend: null, performance_trend: null };
  }

  async generateRecommendations(pipelineId, dailyMetrics, overallStats) {
    const recommendations = [];

    // Performance recommendations
    if (overallStats.success_rate < 95) {
      recommendations.push({
        type: 'reliability',
        priority: 'high',
        message: `Pipeline success rate is ${overallStats.success_rate}%. Consider implementing error handling and retry logic.`,
        suggested_actions: [
          'Add retry mechanisms for transient failures',
          'Implement circuit breakers for external dependencies',
          'Add validation checks for input data quality'
        ]
      });
    }

    if (overallStats.avg_duration_minutes > 60) {
      recommendations.push({
        type: 'performance',
        priority: 'medium',
        message: `Average execution time is ${Math.round(overallStats.avg_duration_minutes)} minutes. Consider optimization.`,
        suggested_actions: [
          'Implement parallel processing for transformation steps',
          'Optimize database queries and indexing',
          'Consider breaking pipeline into smaller, concurrent jobs'
        ]
      });
    }

    // Resource utilization recommendations
    if (overallStats.memory_p95 > 8000) {  // 8GB
      recommendations.push({
        type: 'resource',
        priority: 'medium',
        message: `Memory usage is high (95th percentile: ${Math.round(overallStats.memory_p95)}MB).`,
        suggested_actions: [
          'Implement streaming processing for large datasets',
          'Optimize data structures and reduce memory footprint',
          'Consider increasing available memory resources'
        ]
      });
    }

    // Data quality recommendations
    if (overallStats.error_rate > 5) {
      recommendations.push({
        type: 'data_quality',
        priority: 'high',
        message: `High error rate detected (${overallStats.error_rate}%).`,
        suggested_actions: [
          'Implement comprehensive data validation',
          'Add data profiling to identify quality issues',
          'Set up data quality monitoring dashboards'
        ]
      });
    }

    // Operational recommendations
    const recentFailures = dailyMetrics.filter(day => day.failed_executions > 0).length;
    if (recentFailures > dailyMetrics.length * 0.3) {
      recommendations.push({
        type: 'operational',
        priority: 'high',
        message: `Frequent failures detected in ${recentFailures} of the last ${dailyMetrics.length} days.`,
        suggested_actions: [
          'Set up real-time alerting for pipeline failures',
          'Implement automated recovery procedures',
          'Schedule regular pipeline health checks'
        ]
      });
    }

    return recommendations;
  }

  async checkPerformanceAlerts(pipelineId, currentMetrics) {
    // Define alert thresholds
    const alertThresholds = await this.getAlertThresholds(pipelineId);

    const alerts = [];

    // Duration alert
    if (currentMetrics.duration_ms > alertThresholds.max_duration_ms) {
      alerts.push({
        type: 'performance_degradation',
        severity: 'warning',
        message: `Pipeline execution took ${currentMetrics.duration_ms}ms, exceeding threshold of ${alertThresholds.max_duration_ms}ms`,
        metric_value: currentMetrics.duration_ms,
        threshold: alertThresholds.max_duration_ms
      });
    }

    // Failure rate alert
    if (currentMetrics.status === 'failed') {
      const recentFailures = await this.getRecentFailureCount(pipelineId, 24); // Last 24 hours
      if (recentFailures >= alertThresholds.max_failures_per_day) {
        alerts.push({
          type: 'high_failure_rate',
          severity: 'critical',
          message: `Pipeline has failed ${recentFailures} times in the last 24 hours`,
          metric_value: recentFailures,
          threshold: alertThresholds.max_failures_per_day
        });
      }
    }

    // Memory usage alert
    if (currentMetrics.memory_peak_mb > alertThresholds.max_memory_mb) {
      alerts.push({
        type: 'high_memory_usage',
        severity: 'warning',
        message: `Memory usage peaked at ${currentMetrics.memory_peak_mb}MB, exceeding threshold`,
        metric_value: currentMetrics.memory_peak_mb,
        threshold: alertThresholds.max_memory_mb
      });
    }

    // Data volume anomaly
    if (currentMetrics.records_transformed < alertThresholds.min_records_expected * 0.5) {
      alerts.push({
        type: 'low_data_volume',
        severity: 'warning',
        message: `Processed only ${currentMetrics.records_transformed} records, significantly below expected ${alertThresholds.min_records_expected}`,
        metric_value: currentMetrics.records_transformed,
        threshold: alertThresholds.min_records_expected
      });
    }

    // Store alerts
    for (const alert of alerts) {
      const alertDoc = {
        ...alert,
        pipeline_id: pipelineId,
        execution_id: currentMetrics.execution_id,
        timestamp: new Date(),
        status: 'active'
      };

      await this.alertsCollection.insertOne(alertDoc);

      // Send notifications
      await this.sendAlertNotification(alertDoc);
    }

    return alerts;
  }

  async getAlertThresholds(pipelineId) {
    // Get baseline performance metrics for threshold calculation
    const baseline = await this.metricsCollection.aggregate([
      {
        $match: {
          pipeline_id: pipelineId,
          status: 'completed',
          timestamp: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) }
        }
      },
      {
        $group: {
          _id: null,
          avg_duration: { $avg: "$duration_ms" },
          p95_duration: { $percentile: { input: "$duration_ms", p: [0.95], method: 'approximate' } },
          avg_memory: { $avg: "$memory_peak_mb" },
          p95_memory: { $percentile: { input: "$memory_peak_mb", p: [0.95], method: 'approximate' } },
          avg_records: { $avg: "$records_transformed" }
        }
      }
    ]).toArray();

    if (baseline.length === 0) {
      // Default thresholds for new pipelines
      return {
        max_duration_ms: 3600000,  // 1 hour
        max_memory_mb: 4000,       // 4GB
        max_failures_per_day: 3,
        min_records_expected: 1000
      };
    }

    const stats = baseline[0];
    return {
      max_duration_ms: Math.max(stats.p95_duration * 1.5, stats.avg_duration * 2),
      max_memory_mb: Math.max(stats.p95_memory * 1.5, stats.avg_memory * 2),
      max_failures_per_day: 3,
      min_records_expected: Math.max(stats.avg_records * 0.1, 100)
    };
  }

  async getRecentFailureCount(pipelineId, hours) {
    const since = new Date(Date.now() - hours * 60 * 60 * 1000);
    const result = await this.metricsCollection.countDocuments({
      pipeline_id: pipelineId,
      status: 'failed',
      timestamp: { $gte: since }
    });

    return result;
  }

  async sendAlertNotification(alert) {
    // Implement notification logic (email, Slack, PagerDuty, etc.)
    console.log('ALERT:', alert.message);

    // This would integrate with actual notification services
    // Example: await this.slackService.sendAlert(alert);
    // Example: await this.emailService.sendAlert(alert);
  }

  async getDashboardMetrics(pipelineIds = [], timeRange = 24) {
    const sinceDate = new Date(Date.now() - timeRange * 60 * 60 * 1000);

    const matchStage = {
      timestamp: { $gte: sinceDate }
    };

    if (pipelineIds.length > 0) {
      matchStage.pipeline_id = { $in: pipelineIds };
    }

    const dashboardPipeline = [
      { $match: matchStage },
      {
        $group: {
          _id: {
            pipeline_id: "$pipeline_id",
            hour: { $dateToString: { format: "%Y-%m-%d %H:00", date: "$timestamp" } }
          },
          executions: { $sum: 1 },
          successes: {
            $sum: { $cond: [{ $eq: ["$status", "completed"] }, 1, 0] }
          },
          failures: {
            $sum: { $cond: [{ $eq: ["$status", "failed"] }, 1, 0] }
          },
          avg_duration: { $avg: "$duration_ms" },
          total_records: { $sum: "$records_transformed" },
          total_errors: { $sum: "$error_count" }
        }
      },
      {
        $group: {
          _id: "$_id.pipeline_id",
          hourly_metrics: {
            $push: {
              hour: "$_id.hour",
              executions: "$executions",
              success_rate: {
                $round: [
                  { $multiply: [{ $divide: ["$successes", "$executions"] }, 100] },
                  1
                ]
              },
              avg_duration_minutes: { $round: [{ $divide: ["$avg_duration", 60000] }, 1] },
              total_records: "$total_records",
              total_errors: "$total_errors"
            }
          },
          total_executions: { $sum: "$executions" },
          overall_success_rate: {
            $round: [
              { $multiply: [
                { $divide: [{ $sum: "$successes" }, { $sum: "$executions" }] },
                100
              ]},
              1
            ]
          },
          total_records_processed: { $sum: "$total_records" },
          total_errors: { $sum: "$total_errors" }
        }
      },
      { $sort: { "_id": 1 } }
    ];

    const dashboardData = await this.metricsCollection.aggregate(dashboardPipeline).toArray();

    return {
      time_range_hours: timeRange,
      generated_at: new Date(),
      pipeline_metrics: dashboardData
    };
  }
}

Best Practices for ETL Implementation

ETL Design Principles

Essential guidelines for building robust ETL pipelines:

  1. Idempotency: Design pipelines that can be safely re-run without side effects
  2. Error Handling: Implement comprehensive error handling and recovery mechanisms
  3. Data Validation: Validate data quality at every stage of the pipeline
  4. Monitoring: Track performance, data quality, and operational metrics
  5. Scalability: Design for horizontal scaling and parallel processing
  6. Documentation: Maintain clear documentation of data transformations and business logic

Performance Optimization

Optimize ETL pipeline performance:

  1. Parallel Processing: Use MongoDB's aggregation framework for concurrent data processing
  2. Incremental Loading: Process only changed data to reduce processing time
  3. Index Optimization: Create appropriate indexes for extraction and lookup operations
  4. Batch Size Tuning: Optimize batch sizes for memory and throughput balance
  5. Resource Management: Monitor and optimize CPU, memory, and I/O utilization
  6. Caching: Cache frequently accessed reference data and transformation results

QueryLeaf ETL Integration

QueryLeaf enables familiar SQL-style ETL development while leveraging MongoDB's powerful aggregation capabilities. This integration provides teams with the flexibility to implement complex data transformations using familiar SQL patterns while benefiting from MongoDB's document-oriented storage and processing advantages.

Key QueryLeaf ETL benefits include:

  • SQL Familiarity: Write ETL logic using familiar SQL syntax and patterns
  • MongoDB Performance: Leverage MongoDB's high-performance aggregation pipeline
  • Flexible Schema: Handle semi-structured and evolving data schemas effortlessly
  • Real-Time Processing: Integrate change streams for real-time ETL processing
  • Scalable Architecture: Build ETL pipelines that scale horizontally with data growth

Whether you're migrating from traditional ETL tools or building new data processing workflows, MongoDB with QueryLeaf's SQL interface provides a powerful foundation for modern ETL architectures that can handle the complexity and scale requirements of contemporary data environments.

Conclusion

MongoDB ETL and data pipeline processing capabilities provide enterprise-grade data transformation and processing infrastructure that addresses the challenges of modern data workflows. Combined with QueryLeaf's familiar SQL interface, MongoDB enables teams to build sophisticated ETL pipelines while preserving development patterns and query approaches they already understand.

The combination of MongoDB's flexible document model, powerful aggregation framework, and real-time change streams creates an ideal platform for handling diverse data sources, complex transformations, and scalable processing requirements. QueryLeaf's SQL-style syntax makes these capabilities accessible to broader development teams while maintaining the performance and flexibility advantages of MongoDB's native architecture.

Whether you're building batch ETL processes, real-time streaming pipelines, or hybrid architectures, MongoDB with QueryLeaf provides the tools and patterns necessary to implement robust, scalable, and maintainable data processing solutions that can adapt and evolve with your organization's growing data requirements.

MongoDB Change Data Capture and Event-Driven Architecture: Real-Time Data Processing and System Integration

Modern distributed systems require real-time data synchronization and event-driven communication to maintain consistency across microservices, trigger automated workflows, and enable responsive user experiences. Traditional databases provide limited change capture capabilities that require complex polling mechanisms, trigger-based solutions, or external tools that add significant operational overhead and latency to data processing pipelines.

MongoDB Change Data Capture through Change Streams provides native, real-time monitoring of database changes that enables building sophisticated event-driven architectures without external dependencies. Unlike traditional databases that require complex trigger setups or third-party CDC tools, MongoDB's Change Streams deliver ordered, resumable streams of data changes that can power real-time analytics, data synchronization, and reactive application architectures.

The Traditional Change Detection Challenge

Implementing change detection and event-driven patterns in traditional databases requires complex infrastructure:

-- Traditional PostgreSQL change detection - complex trigger-based approach

-- Change tracking table for audit and CDC
CREATE TABLE data_change_log (
    change_id SERIAL PRIMARY KEY,
    table_name VARCHAR(100) NOT NULL,
    operation_type VARCHAR(10) NOT NULL, -- INSERT, UPDATE, DELETE
    record_id UUID NOT NULL,
    old_values JSONB,
    new_values JSONB,
    changed_columns TEXT[],
    change_timestamp TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
    user_context JSONB,
    transaction_id BIGINT,

    -- CDC processing metadata
    processed BOOLEAN DEFAULT FALSE,
    processed_at TIMESTAMP,
    processing_errors TEXT[],
    retry_count INTEGER DEFAULT 0,

    -- Event routing information
    event_type VARCHAR(100),
    event_source VARCHAR(100),
    correlation_id UUID
);

-- Complex trigger function for change capture
CREATE OR REPLACE FUNCTION capture_table_changes() 
RETURNS TRIGGER AS $$
DECLARE
    old_record JSONB := '{}';
    new_record JSONB := '{}';
    changed_cols TEXT[] := '{}';
    col_name TEXT;
    event_type_val VARCHAR(100);
    correlation_id_val UUID;
BEGIN
    -- Determine operation type and build change record
    IF TG_OP = 'DELETE' THEN
        old_record := row_to_json(OLD)::JSONB;
        event_type_val := TG_TABLE_NAME || '_deleted';

        -- Extract correlation ID from old record if available
        correlation_id_val := (old_record->>'correlation_id')::UUID;

        INSERT INTO data_change_log (
            table_name, operation_type, record_id, old_values, 
            event_type, event_source, correlation_id, transaction_id
        ) VALUES (
            TG_TABLE_NAME, 'DELETE', (old_record->>'id')::UUID, old_record,
            event_type_val, 'database_trigger', correlation_id_val, txid_current()
        );

        RETURN OLD;

    ELSIF TG_OP = 'UPDATE' THEN
        old_record := row_to_json(OLD)::JSONB;
        new_record := row_to_json(NEW)::JSONB;
        event_type_val := TG_TABLE_NAME || '_updated';

        -- Identify changed columns
        FOR col_name IN 
            SELECT column_name 
            FROM information_schema.columns 
            WHERE table_name = TG_TABLE_NAME 
                AND table_schema = TG_TABLE_SCHEMA
        LOOP
            IF (old_record->>col_name) IS DISTINCT FROM (new_record->>col_name) THEN
                changed_cols := array_append(changed_cols, col_name);
            END IF;
        END LOOP;

        -- Only log if there are actual changes
        IF array_length(changed_cols, 1) > 0 THEN
            correlation_id_val := COALESCE(
                (new_record->>'correlation_id')::UUID,
                (old_record->>'correlation_id')::UUID
            );

            INSERT INTO data_change_log (
                table_name, operation_type, record_id, old_values, new_values,
                changed_columns, event_type, event_source, correlation_id, transaction_id
            ) VALUES (
                TG_TABLE_NAME, 'UPDATE', (new_record->>'id')::UUID, old_record, new_record,
                changed_cols, event_type_val, 'database_trigger', correlation_id_val, txid_current()
            );
        END IF;

        RETURN NEW;

    ELSIF TG_OP = 'INSERT' THEN
        new_record := row_to_json(NEW)::JSONB;
        event_type_val := TG_TABLE_NAME || '_created';
        correlation_id_val := (new_record->>'correlation_id')::UUID;

        INSERT INTO data_change_log (
            table_name, operation_type, record_id, new_values,
            event_type, event_source, correlation_id, transaction_id
        ) VALUES (
            TG_TABLE_NAME, 'INSERT', (new_record->>'id')::UUID, new_record,
            event_type_val, 'database_trigger', correlation_id_val, txid_current()
        );

        RETURN NEW;
    END IF;

    RETURN NULL;
END;
$$ LANGUAGE plpgsql;

-- Apply triggers to tables that need change tracking
CREATE TRIGGER users_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON users
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

CREATE TRIGGER orders_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON orders
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

CREATE TRIGGER products_change_trigger
    AFTER INSERT OR UPDATE OR DELETE ON products
    FOR EACH ROW EXECUTE FUNCTION capture_table_changes();

-- Complex change processing and event dispatch
CREATE OR REPLACE FUNCTION process_pending_changes()
RETURNS INTEGER AS $$
DECLARE
    change_record RECORD;
    processed_count INTEGER := 0;
    event_payload JSONB;
    webhook_url TEXT;
    http_response INTEGER;
    max_retries INTEGER := 3;
BEGIN
    -- Process unprocessed changes in chronological order
    FOR change_record IN 
        SELECT * FROM data_change_log 
        WHERE processed = FALSE 
            AND retry_count < max_retries
        ORDER BY change_timestamp ASC
        LIMIT 1000 -- Process in batches
    LOOP
        BEGIN
            -- Build event payload for external systems
            event_payload := jsonb_build_object(
                'eventId', change_record.change_id,
                'eventType', change_record.event_type,
                'eventSource', change_record.event_source,
                'eventTime', change_record.change_timestamp,
                'correlationId', change_record.correlation_id,
                'data', jsonb_build_object(
                    'tableName', change_record.table_name,
                    'operationType', change_record.operation_type,
                    'recordId', change_record.record_id,
                    'oldValues', change_record.old_values,
                    'newValues', change_record.new_values,
                    'changedColumns', change_record.changed_columns
                ),
                'metadata', jsonb_build_object(
                    'transactionId', change_record.transaction_id,
                    'processingAttempt', change_record.retry_count + 1,
                    'processingTime', CURRENT_TIMESTAMP
                )
            );

            -- Route events based on event type (simplified webhook example)
            webhook_url := CASE 
                WHEN change_record.event_type LIKE '%_user_%' THEN 'http://user-service/api/events'
                WHEN change_record.event_type LIKE '%_order_%' THEN 'http://order-service/api/events'
                WHEN change_record.event_type LIKE '%_product_%' THEN 'http://catalog-service/api/events'
                ELSE 'http://default-event-handler/api/events'
            END;

            -- Simulate HTTP webhook call (would use actual HTTP extension in practice)
            -- SELECT http_post(webhook_url, event_payload::TEXT, 'application/json') INTO http_response;
            http_response := 200; -- Simulated success

            IF http_response BETWEEN 200 AND 299 THEN
                -- Mark as successfully processed
                UPDATE data_change_log 
                SET processed = TRUE,
                    processed_at = CURRENT_TIMESTAMP,
                    processing_errors = NULL
                WHERE change_id = change_record.change_id;

                processed_count := processed_count + 1;
            ELSE
                -- Record processing failure
                UPDATE data_change_log 
                SET retry_count = retry_count + 1,
                    processing_errors = array_append(
                        COALESCE(processing_errors, '{}'), 
                        'HTTP ' || http_response || ' at ' || CURRENT_TIMESTAMP
                    )
                WHERE change_id = change_record.change_id;
            END IF;

        EXCEPTION WHEN OTHERS THEN
            -- Record processing exception
            UPDATE data_change_log 
            SET retry_count = retry_count + 1,
                processing_errors = array_append(
                    COALESCE(processing_errors, '{}'), 
                    'Exception: ' || SQLERRM || ' at ' || CURRENT_TIMESTAMP
                )
            WHERE change_id = change_record.change_id;
        END;
    END LOOP;

    RETURN processed_count;
END;
$$ LANGUAGE plpgsql;

-- Scheduled job to process changes (requires external cron setup)
-- */5 * * * * psql -d production -c "SELECT process_pending_changes();"

-- Complex monitoring for change processing pipeline
SELECT 
    table_name,
    operation_type,
    event_type,

    -- Processing statistics
    COUNT(*) as total_changes,
    COUNT(*) FILTER (WHERE processed = TRUE) as processed_changes,
    COUNT(*) FILTER (WHERE processed = FALSE) as pending_changes,
    COUNT(*) FILTER (WHERE retry_count >= 3) as failed_changes,

    -- Performance metrics
    AVG(EXTRACT(EPOCH FROM (processed_at - change_timestamp))) as avg_processing_latency_seconds,
    MAX(EXTRACT(EPOCH FROM (processed_at - change_timestamp))) as max_processing_latency_seconds,

    -- Error analysis
    COUNT(*) FILTER (WHERE processing_errors IS NOT NULL) as changes_with_errors,
    AVG(retry_count) as avg_retry_count,

    -- Time-based analysis
    MIN(change_timestamp) as oldest_change,
    MAX(change_timestamp) as newest_change,

    -- Health indicators
    ROUND(
        COUNT(*) FILTER (WHERE processed = TRUE)::DECIMAL / COUNT(*) * 100, 
        2
    ) as success_rate_percent

FROM data_change_log
WHERE change_timestamp >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
GROUP BY table_name, operation_type, event_type
ORDER BY total_changes DESC;

-- Problems with traditional change data capture:
-- 1. Complex trigger infrastructure requiring careful maintenance and testing
-- 2. Performance overhead from trigger execution on every database operation  
-- 3. Manual event routing and delivery logic with limited reliability guarantees
-- 4. Difficulty handling high-throughput scenarios without impacting database performance
-- 5. Complex error handling and retry logic for failed event deliveries
-- 6. Limited ordering guarantees for related changes across multiple tables
-- 7. Challenges with transaction boundaries and event atomicity
-- 8. Manual setup and maintenance of change processing infrastructure
-- 9. Limited scalability for high-volume change streams
-- 10. Complex monitoring and alerting for change processing pipeline health

MongoDB provides native Change Data Capture through Change Streams with real-time event processing:

// MongoDB Change Data Capture - native real-time event-driven architecture
const { MongoClient } = require('mongodb');

// Advanced MongoDB Change Data Capture Manager
class MongoChangeDataCaptureManager {
  constructor() {
    this.client = null;
    this.db = null;
    this.changeStreams = new Map();
    this.eventHandlers = new Map();
    this.processingMetrics = new Map();
    this.eventQueue = [];
    this.isProcessing = false;
  }

  async initialize() {
    console.log('Initializing MongoDB Change Data Capture Manager...');

    // Connect with optimized settings for change streams
    this.client = new MongoClient(process.env.MONGODB_URI || 'mongodb://localhost:27017', {
      // Replica set required for change streams
      replicaSet: process.env.MONGODB_REPLICA_SET || 'rs0',

      // Connection pool settings for change streams
      maxPoolSize: 20,
      minPoolSize: 5,
      maxIdleTimeMS: 60000,

      // Read preferences for change streams
      readPreference: 'primary',
      readConcern: { level: 'majority' },

      // Write concern for reliable change stream processing
      writeConcern: { w: 'majority', j: true },

      // Compression for change stream data
      compressors: ['zlib'],

      appName: 'ChangeDataCaptureManager'
    });

    await this.client.connect();
    this.db = this.client.db('ecommerce');

    // Initialize event handlers and change stream configurations
    await this.setupEventHandlers();
    await this.initializeChangeStreams();

    console.log('✅ MongoDB Change Data Capture Manager initialized');
  }

  async setupEventHandlers() {
    console.log('Setting up event handlers for different change types...');

    // User-related event handlers
    this.eventHandlers.set('user_created', async (changeEvent) => {
      await this.handleUserCreated(changeEvent);
    });

    this.eventHandlers.set('user_updated', async (changeEvent) => {
      await this.handleUserUpdated(changeEvent);
    });

    this.eventHandlers.set('user_deleted', async (changeEvent) => {
      await this.handleUserDeleted(changeEvent);
    });

    // Order-related event handlers
    this.eventHandlers.set('order_created', async (changeEvent) => {
      await this.handleOrderCreated(changeEvent);
    });

    this.eventHandlers.set('order_status_updated', async (changeEvent) => {
      await this.handleOrderStatusUpdated(changeEvent);
    });

    this.eventHandlers.set('order_cancelled', async (changeEvent) => {
      await this.handleOrderCancelled(changeEvent);
    });

    // Product catalog event handlers
    this.eventHandlers.set('product_created', async (changeEvent) => {
      await this.handleProductCreated(changeEvent);
    });

    this.eventHandlers.set('product_updated', async (changeEvent) => {
      await this.handleProductUpdated(changeEvent);
    });

    this.eventHandlers.set('inventory_updated', async (changeEvent) => {
      await this.handleInventoryUpdated(changeEvent);
    });

    console.log('✅ Event handlers configured');
  }

  async initializeChangeStreams() {
    console.log('Initializing MongoDB change streams...');

    // Watch users collection for account-related events
    await this.createChangeStream('users', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processUserChanges.bind(this));

    // Watch orders collection for order lifecycle events
    await this.createChangeStream('orders', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processOrderChanges.bind(this));

    // Watch products collection for catalog changes
    await this.createChangeStream('products', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processProductChanges.bind(this));

    // Watch inventory collection for stock changes
    await this.createChangeStream('inventory', {
      fullDocument: 'updateLookup',
      fullDocumentBeforeChange: 'whenAvailable'
    }, this.processInventoryChanges.bind(this));

    console.log('✅ Change streams initialized and watching for changes');
  }

  async createChangeStream(collectionName, options, changeHandler) {
    try {
      const collection = this.db.collection(collectionName);
      const changeStream = collection.watch([], options);

      // Store change stream for management
      this.changeStreams.set(collectionName, {
        stream: changeStream,
        collection: collectionName,
        options: options,
        handler: changeHandler,
        createdAt: new Date(),
        isActive: true,
        errorCount: 0,
        lastError: null,
        processedEvents: 0
      });

      // Set up change event processing
      changeStream.on('change', async (changeDoc) => {
        try {
          await changeHandler(changeDoc);

          // Update metrics
          const streamInfo = this.changeStreams.get(collectionName);
          streamInfo.processedEvents++;
          streamInfo.lastProcessedAt = new Date();

        } catch (error) {
          console.error(`Error processing change for ${collectionName}:`, error);
          this.recordStreamError(collectionName, error);
        }
      });

      // Handle stream errors
      changeStream.on('error', (error) => {
        console.error(`Change stream error for ${collectionName}:`, error);
        this.recordStreamError(collectionName, error);
        this.handleStreamError(collectionName, error);
      });

      // Handle stream close
      changeStream.on('close', () => {
        console.warn(`Change stream closed for ${collectionName}`);
        const streamInfo = this.changeStreams.get(collectionName);
        if (streamInfo) {
          streamInfo.isActive = false;
          streamInfo.closedAt = new Date();
        }
      });

      console.log(`✅ Change stream created for collection: ${collectionName}`);

    } catch (error) {
      console.error(`Error creating change stream for ${collectionName}:`, error);
      throw error;
    }
  }

  async processUserChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    // Build standardized event object
    const event = {
      eventId: changeDoc._id,
      eventType: `user_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      // Document information
      documentId: documentKey._id,
      operationType: operationType,

      // Document data
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      // Change metadata
      namespace: changeDoc.ns,
      transactionId: changeDoc.txnNumber,
      sessionId: changeDoc.lsid,

      // Processing metadata
      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Add operation-specific data
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};
      event.removedFields = changeDoc.updateDescription?.removedFields || [];

      // Detect specific user events
      if (event.updatedFields.status) {
        event.eventType = `user_status_changed`;
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status
        };
      }

      if (event.updatedFields.email) {
        event.eventType = `user_email_changed`;
        event.emailChange = {
          from: fullDocumentBeforeChange?.email,
          to: fullDocument?.email
        };
      }
    }

    // Route to appropriate event handler
    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      console.warn(`No handler found for event type: ${event.eventType}`);
      await this.handleGenericEvent(event);
    }
  }

  async processOrderChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `order_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      namespace: changeDoc.ns,
      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Detect order-specific events
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Order status changes
      if (event.updatedFields.status) {
        event.eventType = 'order_status_updated';
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status,
          orderId: fullDocument?.orderNumber,
          customerId: fullDocument?.customerId
        };

        // Specific status-based events
        if (fullDocument?.status === 'cancelled') {
          event.eventType = 'order_cancelled';
        } else if (fullDocument?.status === 'shipped') {
          event.eventType = 'order_shipped';
        } else if (fullDocument?.status === 'delivered') {
          event.eventType = 'order_delivered';
        }
      }

      // Payment status changes
      if (event.updatedFields['payment.status']) {
        event.eventType = 'order_payment_updated';
        event.paymentChange = {
          from: fullDocumentBeforeChange?.payment?.status,
          to: fullDocument?.payment?.status
        };
      }
    }

    // Route to handler
    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  async processProductChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `product_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Detect product-specific events
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Price changes
      if (event.updatedFields.price) {
        event.eventType = 'product_price_changed';
        event.priceChange = {
          from: fullDocumentBeforeChange?.price,
          to: fullDocument?.price,
          sku: fullDocument?.sku,
          changePercent: fullDocumentBeforeChange?.price ? 
            ((fullDocument.price - fullDocumentBeforeChange.price) / fullDocumentBeforeChange.price * 100) : null
        };
      }

      // Status changes (active/inactive)
      if (event.updatedFields.status) {
        event.eventType = 'product_status_changed';
        event.statusChange = {
          from: fullDocumentBeforeChange?.status,
          to: fullDocument?.status,
          sku: fullDocument?.sku
        };
      }
    }

    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  async processInventoryChanges(changeDoc) {
    const { operationType, fullDocument, fullDocumentBeforeChange, documentKey } = changeDoc;

    const event = {
      eventId: changeDoc._id,
      eventType: `inventory_${operationType}`,
      eventTime: changeDoc.clusterTime,
      source: 'mongodb_change_stream',

      documentId: documentKey._id,
      operationType: operationType,
      currentDocument: fullDocument,
      previousDocument: fullDocumentBeforeChange,

      receivedAt: new Date(),
      processingStatus: 'pending'
    };

    // Inventory-specific event detection
    if (operationType === 'update') {
      event.updatedFields = changeDoc.updateDescription?.updatedFields || {};

      // Stock level changes
      if (event.updatedFields.stockQuantity !== undefined) {
        event.eventType = 'inventory_updated';
        event.stockChange = {
          from: fullDocumentBeforeChange?.stockQuantity || 0,
          to: fullDocument?.stockQuantity || 0,
          productId: fullDocument?.productId,
          sku: fullDocument?.sku,
          change: (fullDocument?.stockQuantity || 0) - (fullDocumentBeforeChange?.stockQuantity || 0)
        };

        // Low stock alerts
        if (fullDocument?.stockQuantity <= (fullDocument?.lowStockThreshold || 10)) {
          event.eventType = 'inventory_low_stock';
          event.lowStockAlert = {
            currentStock: fullDocument?.stockQuantity,
            threshold: fullDocument?.lowStockThreshold,
            productId: fullDocument?.productId
          };
        }

        // Out of stock alerts  
        if (fullDocument?.stockQuantity <= 0 && (fullDocumentBeforeChange?.stockQuantity || 0) > 0) {
          event.eventType = 'inventory_out_of_stock';
        }
      }
    }

    const handler = this.eventHandlers.get(event.eventType);
    if (handler) {
      await handler(event);
    } else {
      await this.handleGenericEvent(event);
    }
  }

  // Event handler implementations
  async handleUserCreated(event) {
    console.log(`Processing user created event: ${event.currentDocument.email}`);

    try {
      // Send welcome email
      await this.sendWelcomeEmail(event.currentDocument);

      // Create user profile in analytics system
      await this.createAnalyticsProfile(event.currentDocument);

      // Add to mailing list
      await this.addToMailingList(event.currentDocument);

      // Log event processing
      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling user created event:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleOrderStatusUpdated(event) {
    console.log(`Processing order status update: ${event.statusChange.from} -> ${event.statusChange.to}`);

    try {
      // Send status update notification
      await this.sendOrderStatusNotification(event);

      // Update order analytics
      await this.updateOrderAnalytics(event);

      // Trigger fulfillment workflows
      if (event.statusChange.to === 'confirmed') {
        await this.triggerFulfillmentWorkflow(event.currentDocument);
      }

      // Update inventory reservations
      if (event.statusChange.to === 'cancelled') {
        await this.releaseInventoryReservation(event.currentDocument);
      }

      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling order status update:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleInventoryUpdated(event) {
    console.log(`Processing inventory update: ${event.stockChange.sku} stock changed by ${event.stockChange.change}`);

    try {
      // Update search index with new stock levels
      await this.updateSearchIndex(event.currentDocument);

      // Notify interested customers about restocking
      if (event.stockChange.change > 0 && event.stockChange.from === 0) {
        await this.notifyRestocking(event.currentDocument);
      }

      // Update real-time inventory dashboard
      await this.updateInventoryDashboard(event);

      // Trigger reorder notifications for low stock
      if (event.eventType === 'inventory_low_stock') {
        await this.triggerReorderAlert(event.lowStockAlert);
      }

      await this.logEventProcessed(event, 'success');

    } catch (error) {
      console.error('Error handling inventory update:', error);
      await this.logEventProcessed(event, 'error', error.message);
      throw error;
    }
  }

  async handleGenericEvent(event) {
    console.log(`Processing generic event: ${event.eventType}`);

    // Store event for audit purposes
    await this.db.collection('event_audit_log').insertOne({
      eventId: event.eventId,
      eventType: event.eventType,
      eventTime: event.eventTime,
      documentId: event.documentId,
      operationType: event.operationType,
      processedAt: new Date(),
      handlerType: 'generic'
    });
  }

  // Helper methods for event processing
  async sendWelcomeEmail(user) {
    // Integration with email service
    console.log(`Sending welcome email to ${user.email}`);
    // await emailService.sendWelcomeEmail(user);
  }

  async sendOrderStatusNotification(event) {
    // Integration with notification service
    console.log(`Sending order notification for order ${event.currentDocument.orderNumber}`);
    // await notificationService.sendOrderUpdate(event);
  }

  async updateSearchIndex(inventoryDoc) {
    // Integration with search service
    console.log(`Updating search index for product ${inventoryDoc.sku}`);
    // await searchService.updateProductInventory(inventoryDoc);
  }

  async logEventProcessed(event, status, errorMessage = null) {
    await this.db.collection('event_processing_log').insertOne({
      eventId: event.eventId,
      eventType: event.eventType,
      documentId: event.documentId,
      processingStatus: status,
      processedAt: new Date(),
      receivedAt: event.receivedAt,
      processingDuration: Date.now() - event.receivedAt.getTime(),
      errorMessage: errorMessage
    });
  }

  recordStreamError(collectionName, error) {
    const streamInfo = this.changeStreams.get(collectionName);
    if (streamInfo) {
      streamInfo.errorCount++;
      streamInfo.lastError = {
        message: error.message,
        timestamp: new Date(),
        stack: error.stack
      };
    }
  }

  async handleStreamError(collectionName, error) {
    console.error(`Handling stream error for ${collectionName}:`, error);

    // Attempt to restart the change stream
    setTimeout(async () => {
      try {
        const streamInfo = this.changeStreams.get(collectionName);
        if (streamInfo && !streamInfo.isActive) {
          console.log(`Attempting to restart change stream for ${collectionName}`);
          await this.createChangeStream(
            collectionName, 
            streamInfo.options, 
            streamInfo.handler
          );
        }
      } catch (restartError) {
        console.error(`Failed to restart change stream for ${collectionName}:`, restartError);
      }
    }, 5000); // Wait 5 seconds before restart attempt
  }

  async getChangeStreamMetrics() {
    const metrics = {
      timestamp: new Date(),
      streams: {},
      systemHealth: 'unknown',
      totalEventsProcessed: 0,
      activeStreams: 0
    };

    for (const [collectionName, streamInfo] of this.changeStreams) {
      metrics.streams[collectionName] = {
        collection: collectionName,
        isActive: streamInfo.isActive,
        createdAt: streamInfo.createdAt,
        processedEvents: streamInfo.processedEvents,
        errorCount: streamInfo.errorCount,
        lastError: streamInfo.lastError,
        lastProcessedAt: streamInfo.lastProcessedAt,

        healthStatus: streamInfo.isActive ? 
          (streamInfo.errorCount < 5 ? 'healthy' : 'warning') : 'inactive'
      };

      metrics.totalEventsProcessed += streamInfo.processedEvents;
      if (streamInfo.isActive) metrics.activeStreams++;
    }

    // Determine system health
    const totalStreams = this.changeStreams.size;
    if (metrics.activeStreams === totalStreams) {
      metrics.systemHealth = 'healthy';
    } else if (metrics.activeStreams > totalStreams / 2) {
      metrics.systemHealth = 'degraded';
    } else {
      metrics.systemHealth = 'critical';
    }

    return metrics;
  }

  async shutdown() {
    console.log('Shutting down MongoDB Change Data Capture Manager...');

    // Close all change streams
    for (const [collectionName, streamInfo] of this.changeStreams) {
      try {
        if (streamInfo.stream && streamInfo.isActive) {
          await streamInfo.stream.close();
          console.log(`✅ Closed change stream for ${collectionName}`);
        }
      } catch (error) {
        console.error(`Error closing change stream for ${collectionName}:`, error);
      }
    }

    // Close MongoDB connection
    if (this.client) {
      await this.client.close();
      console.log('✅ MongoDB connection closed');
    }

    this.changeStreams.clear();
    this.eventHandlers.clear();
    this.processingMetrics.clear();
  }
}

// Export the change data capture manager
module.exports = { MongoChangeDataCaptureManager };

// Benefits of MongoDB Change Data Capture:
// - Native real-time change streams eliminate polling and trigger complexity
// - Ordered, resumable event streams ensure reliable event processing
// - Full document context provides complete change information
// - Built-in error handling and automatic reconnection capabilities
// - Transaction-aware change detection with ACID guarantees
// - Scalable event processing without performance impact on source database
// - Flexible event routing and transformation capabilities
// - Production-ready monitoring and metrics for change stream health
// - Zero external dependencies for change data capture functionality
// - SQL-compatible event processing patterns through QueryLeaf integration

SQL-Style Change Data Capture with QueryLeaf

QueryLeaf provides familiar SQL syntax for MongoDB change data capture and event processing:

-- QueryLeaf Change Data Capture with SQL-familiar syntax

-- Create change stream monitors
CREATE CHANGE STREAM user_changes 
ON COLLECTION users
WITH OPTIONS (
  full_document = 'updateLookup',
  full_document_before_change = 'whenAvailable',
  resume_token_collection = 'change_stream_tokens'
)
AS SELECT 
  change_id,
  operation_type,
  document_id,
  cluster_time as event_time,

  -- Document data
  full_document as current_document,
  full_document_before_change as previous_document,

  -- Change details
  updated_fields,
  removed_fields,

  -- Event classification
  CASE operation_type
    WHEN 'insert' THEN 'user_created'
    WHEN 'update' THEN 
      CASE 
        WHEN updated_fields ? 'status' THEN 'user_status_changed'
        WHEN updated_fields ? 'email' THEN 'user_email_changed'
        ELSE 'user_updated'
      END
    WHEN 'delete' THEN 'user_deleted'
  END as event_type,

  -- Processing metadata
  CURRENT_TIMESTAMP as received_at,
  'pending' as processing_status

FROM mongodb_change_stream;

-- Query change stream events
SELECT 
  event_type,
  event_time,
  document_id,
  operation_type,

  -- Extract specific field changes
  current_document->>'email' as current_email,
  previous_document->>'email' as previous_email,
  current_document->>'status' as current_status,
  previous_document->>'status' as previous_status,

  -- Change analysis
  CASE 
    WHEN operation_type = 'update' AND updated_fields ? 'status' THEN
      JSON_OBJECT(
        'field', 'status',
        'from', previous_document->>'status',
        'to', current_document->>'status',
        'change_type', 'status_transition'
      )
  END as change_details,

  processing_status,
  received_at

FROM user_changes
WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '1 hour'
ORDER BY event_time DESC;

-- Event processing pipeline with SQL
WITH processed_events AS (
  SELECT 
    change_id,
    event_type,
    document_id,

    -- Route events to handlers
    CASE event_type
      WHEN 'user_created' THEN 'user_management_service'
      WHEN 'user_status_changed' THEN 'notification_service'
      WHEN 'order_status_updated' THEN 'order_fulfillment_service'
      WHEN 'inventory_updated' THEN 'inventory_management_service'
      ELSE 'default_event_handler'
    END as target_service,

    -- Event priority
    CASE event_type
      WHEN 'order_cancelled' THEN 'high'
      WHEN 'inventory_out_of_stock' THEN 'high'
      WHEN 'user_created' THEN 'medium'
      ELSE 'low'
    END as priority,

    -- Event payload
    JSON_OBJECT(
      'eventId', change_id,
      'eventType', event_type,
      'documentId', document_id,
      'currentDocument', current_document,
      'previousDocument', previous_document,
      'changeDetails', change_details,
      'eventTime', event_time,
      'receivedAt', received_at
    ) as event_payload

  FROM user_changes
  WHERE processing_status = 'pending'
),

event_routing AS (
  SELECT 
    *,
    -- Generate webhook URLs for event delivery
    CONCAT('https://api.example.com/services/', target_service, '/events') as webhook_url,

    -- Retry configuration
    CASE priority
      WHEN 'high' THEN 5
      WHEN 'medium' THEN 3
      ELSE 1
    END as max_retries

  FROM processed_events
)

-- Process events (would integrate with actual webhook delivery)
SELECT 
  change_id,
  event_type,
  target_service,
  priority,
  webhook_url,
  event_payload,
  max_retries,

  -- Processing recommendations
  CASE priority
    WHEN 'high' THEN 'Process immediately with dedicated queue'
    WHEN 'medium' THEN 'Process within 30 seconds'
    ELSE 'Process in batch queue'
  END as processing_strategy

FROM event_routing
ORDER BY 
  CASE priority
    WHEN 'high' THEN 1
    WHEN 'medium' THEN 2
    ELSE 3
  END,
  event_time;

-- Change stream performance monitoring
SELECT 
  stream_name,
  collection_name,

  -- Activity metrics
  total_events_processed,
  events_per_hour,

  -- Event type distribution
  (events_by_type->>'insert')::INTEGER as insert_events,
  (events_by_type->>'update')::INTEGER as update_events,
  (events_by_type->>'delete')::INTEGER as delete_events,

  -- Performance metrics
  ROUND(avg_processing_latency_ms::NUMERIC, 2) as avg_latency_ms,
  ROUND(p95_processing_latency_ms::NUMERIC, 2) as p95_latency_ms,

  -- Error handling
  error_count,
  ROUND(error_rate::NUMERIC * 100, 2) as error_rate_percent,
  last_error_time,

  -- Stream health
  is_active,
  last_heartbeat,

  CASE 
    WHEN NOT is_active THEN 'critical'
    WHEN error_rate > 0.05 THEN 'warning'
    WHEN avg_processing_latency_ms > 1000 THEN 'slow'
    ELSE 'healthy'
  END as health_status

FROM change_stream_metrics
WHERE last_updated >= CURRENT_TIMESTAMP - INTERVAL '5 minutes'
ORDER BY events_per_hour DESC;

-- Event-driven architecture analytics
CREATE VIEW event_driven_analytics AS
WITH event_patterns AS (
  SELECT 
    event_type,
    target_service,
    DATE_TRUNC('hour', event_time) as hour_bucket,

    -- Volume metrics
    COUNT(*) as event_count,
    COUNT(DISTINCT document_id) as unique_documents,

    -- Processing metrics
    AVG(EXTRACT(EPOCH FROM (processed_at - received_at))) as avg_processing_time_seconds,
    COUNT(*) FILTER (WHERE processing_status = 'success') as successful_events,
    COUNT(*) FILTER (WHERE processing_status = 'error') as failed_events,

    -- Event characteristics
    AVG(JSON_LENGTH(event_payload)) as avg_payload_size,
    COUNT(*) FILTER (WHERE priority = 'high') as high_priority_events

  FROM change_stream_events
  WHERE event_time >= CURRENT_TIMESTAMP - INTERVAL '24 hours'
  GROUP BY event_type, target_service, DATE_TRUNC('hour', event_time)
)

SELECT 
  event_type,
  target_service,
  TO_CHAR(hour_bucket, 'YYYY-MM-DD HH24:00') as analysis_hour,

  -- Volume analysis
  event_count,
  unique_documents,
  high_priority_events,

  -- Performance analysis
  ROUND(avg_processing_time_seconds::NUMERIC, 3) as avg_processing_seconds,
  ROUND((successful_events::DECIMAL / event_count * 100)::NUMERIC, 2) as success_rate_percent,

  -- System load indicators
  CASE 
    WHEN event_count > 10000 THEN 'very_high'
    WHEN event_count > 1000 THEN 'high'
    WHEN event_count > 100 THEN 'medium'
    ELSE 'low'
  END as event_volume_category,

  -- Performance assessment
  CASE 
    WHEN avg_processing_time_seconds > 5 THEN 'processing_slow'
    WHEN successful_events::DECIMAL / event_count < 0.95 THEN 'high_error_rate'
    WHEN event_count > 5000 AND avg_processing_time_seconds > 1 THEN 'capacity_strain'
    ELSE 'performing_well'
  END as performance_indicator,

  -- Optimization recommendations
  CASE 
    WHEN high_priority_events > event_count * 0.3 THEN 'Consider dedicated high-priority queue'
    WHEN failed_events > 10 THEN 'Review error handling and retry logic'
    WHEN avg_processing_time_seconds > 2 THEN 'Optimize event processing pipeline'
    WHEN event_count > 1000 AND unique_documents < event_count * 0.1 THEN 'Consider event deduplication'
    ELSE 'Event processing optimized for current load'
  END as optimization_recommendation

FROM event_patterns
ORDER BY event_count DESC, hour_bucket DESC;

-- QueryLeaf provides comprehensive Change Data Capture capabilities:
-- 1. SQL-familiar syntax for creating and managing change streams
-- 2. Real-time event processing with automatic routing and prioritization
-- 3. Comprehensive monitoring and analytics for event-driven architectures
-- 4. Error handling and retry logic integrated into SQL workflows
-- 5. Performance optimization recommendations based on event patterns
-- 6. Integration with MongoDB's native change stream capabilities
-- 7. Enterprise-grade event processing accessible through familiar SQL constructs
-- 8. Scalable event-driven architecture patterns with SQL-style management

Best Practices for MongoDB Change Data Capture

Change Stream Design Patterns

Essential practices for implementing change data capture:

  1. Event Classification: Design clear event taxonomies that map business operations to technical changes
  2. Error Handling Strategy: Implement comprehensive retry logic and dead letter queues for failed events
  3. Performance Monitoring: Establish metrics and alerting for change stream health and processing latency
  4. Resumability: Use resume tokens to ensure reliable event processing across application restarts
  5. Filtering Strategy: Apply appropriate filters to change streams to process only relevant events
  6. Scalability Planning: Design event processing pipelines that can handle high-throughput scenarios

Production Deployment Considerations

Key factors for enterprise change data capture deployments:

  1. Replica Set Requirements: Ensure proper replica set configuration for change stream availability
  2. Resource Planning: Account for change stream resource consumption and event processing overhead
  3. Event Ordering: Understand and leverage MongoDB's event ordering guarantees for related changes
  4. Disaster Recovery: Plan for change stream recovery and event replay scenarios
  5. Security Configuration: Implement proper authentication and authorization for change stream access
  6. Monitoring Integration: Integrate change stream metrics with existing monitoring and alerting systems

Conclusion

MongoDB Change Data Capture through Change Streams provides enterprise-grade real-time event processing that enables sophisticated event-driven architectures without external dependencies. The combination of native change detection, ordered event delivery, and comprehensive error handling enables applications to build reactive systems that respond instantly to data changes.

Key MongoDB Change Data Capture benefits include:

  • Real-Time Processing: Native change streams provide immediate notification of data changes with minimal latency
  • Event Ordering: Guaranteed ordering of related events ensures consistent event processing across services
  • Resumable Streams: Built-in resume token support enables reliable event processing across application restarts
  • Full Context: Complete document information including before and after states for comprehensive change analysis
  • Production Ready: Enterprise-grade error handling, monitoring, and scalability capabilities
  • SQL Compatibility: Familiar change processing patterns accessible through SQL-style operations

Whether you're building microservices architectures, real-time analytics pipelines, or reactive user interfaces, MongoDB Change Data Capture with QueryLeaf's SQL-familiar interface provides the foundation for scalable event-driven systems that maintain consistency and responsiveness while simplifying operational complexity.

QueryLeaf Integration: QueryLeaf automatically optimizes MongoDB Change Data Capture while providing SQL-familiar syntax for creating, monitoring, and processing change streams. Advanced event routing, error handling, and performance analytics are seamlessly accessible through familiar SQL constructs, making sophisticated event-driven architecture both powerful and approachable for SQL-oriented teams.

The combination of MongoDB's intelligent change detection with familiar SQL-style management makes it an ideal platform for applications that require both real-time data processing and operational simplicity, ensuring your event-driven architecture scales efficiently while maintaining familiar development and operational patterns.