diff --git a/packages/contentstack-import/src/commands/cm/stacks/import-recovery.ts b/packages/contentstack-import/src/commands/cm/stacks/import-recovery.ts new file mode 100644 index 000000000..8056389db --- /dev/null +++ b/packages/contentstack-import/src/commands/cm/stacks/import-recovery.ts @@ -0,0 +1,162 @@ +import { Command } from '@contentstack/cli-command'; +import { flags, FlagInput, cliux, log } from '@contentstack/cli-utilities'; +import { ImportRecoveryManager } from '../../../utils'; + +export default class ImportRecoveryCommand extends Command { + static description = 'Analyze and recover from failed import operations'; + + static examples: string[] = [ + 'csdx cm:stacks:import-recovery --data-dir ', + 'csdx cm:stacks:import-recovery --data-dir --clean', + 'csdx cm:stacks:import-recovery --data-dir --report' + ]; + + static flags: FlagInput = { + 'data-dir': flags.string({ + char: 'd', + description: 'The path to the directory containing the import data and state files', + required: true, + }), + clean: flags.boolean({ + description: 'Clean the import state to start fresh (creates backup)', + default: false, + }), + report: flags.boolean({ + description: 'Generate a detailed recovery report', + default: false, + }), + 'output-file': flags.string({ + description: 'Save the recovery report to a file', + }), + }; + + static usage: string = 'cm:stacks:import-recovery --data-dir [--clean] [--report] [--output-file ]'; + + async run(): Promise { + try { + const { flags } = await this.parse(ImportRecoveryCommand); + + const recoveryManager = ImportRecoveryManager.create(flags['data-dir']); + + if (flags.clean) { + await this.cleanImportState(recoveryManager); + return; + } + + if (flags.report) { + await this.generateReport(recoveryManager, flags['output-file']); + return; + } + + // Default: analyze and provide recommendations + await this.analyzeAndRecommend(recoveryManager); + + } catch (error) { + log.error(`Recovery command failed: ${error}`); + cliux.print(`Error: ${error}`, { color: 'red' }); + } + } + + private async analyzeAndRecommend(recoveryManager: ImportRecoveryManager): Promise { + cliux.print('\n๐Ÿ” Analyzing import state...', { color: 'blue' }); + + const info = recoveryManager.analyzeImportState(); + const recommendation = recoveryManager.getRecoveryRecommendation(info); + + // Display state information + cliux.print('\n๐Ÿ“Š Import State Summary:', { color: 'cyan' }); + cliux.print(` State File: ${info.stateFileExists ? 'โœ… Found' : 'โŒ Not Found'}`); + cliux.print(` Assets Processed: ${info.mappingCounts.assets}`); + cliux.print(` Folders Processed: ${info.mappingCounts.folders}`); + cliux.print(` URL Mappings: ${info.mappingCounts.urls}`); + + if (info.lastUpdated) { + const lastUpdated = new Date(info.lastUpdated); + cliux.print(` Last Updated: ${lastUpdated.toLocaleString()}`); + } + + if (info.estimatedProgress) { + cliux.print(` Estimated Progress: ~${info.estimatedProgress}%`); + } + + // Display recommendation + cliux.print(`\n๐Ÿ’ก Recommendation: ${recommendation.action.toUpperCase()}`, { + color: recommendation.action === 'resume' ? 'green' : + recommendation.action === 'restart' ? 'yellow' : 'red' + }); + cliux.print(` ${recommendation.reason}`); + + if (recommendation.commands && recommendation.commands.length > 0) { + cliux.print('\n๐Ÿ“‹ Commands:', { color: 'cyan' }); + recommendation.commands.forEach(cmd => { + if (cmd.startsWith('#')) { + cliux.print(` ${cmd}`, { color: 'gray' }); + } else if (cmd.trim() === '') { + cliux.print(''); + } else { + cliux.print(` ${cmd}`, { color: 'white' }); + } + }); + } + + if (recommendation.warnings && recommendation.warnings.length > 0) { + cliux.print('\nโš ๏ธ Warnings:', { color: 'yellow' }); + recommendation.warnings.forEach(warning => { + cliux.print(` ${warning}`); + }); + } + + cliux.print('\n๐Ÿ’ก Tip: Use --report flag for a detailed analysis or --clean to start fresh\n'); + } + + private async cleanImportState(recoveryManager: ImportRecoveryManager): Promise { + cliux.print('\n๐Ÿงน Cleaning import state...', { color: 'yellow' }); + + const info = recoveryManager.analyzeImportState(); + + if (!info.stateFileExists) { + cliux.print('โœ… No import state found. Nothing to clean.', { color: 'green' }); + return; + } + + if (info.mappingCounts.assets > 0 || info.mappingCounts.folders > 0) { + cliux.print(`โš ๏ธ Warning: This will remove progress for ${info.mappingCounts.assets} assets and ${info.mappingCounts.folders} folders.`, { color: 'yellow' }); + + const confirm = await cliux.confirm('Are you sure you want to clean the import state? (y/N)'); + if (!confirm) { + cliux.print('โŒ Operation cancelled.', { color: 'red' }); + return; + } + } + + const success = recoveryManager.cleanImportState(); + + if (success) { + cliux.print('โœ… Import state cleaned successfully. You can now start a fresh import.', { color: 'green' }); + cliux.print('๐Ÿ’ก A backup of the previous state has been created.', { color: 'blue' }); + } else { + cliux.print('โŒ Failed to clean import state. Check the logs for details.', { color: 'red' }); + } + } + + private async generateReport(recoveryManager: ImportRecoveryManager, outputFile?: string): Promise { + cliux.print('\n๐Ÿ“„ Generating recovery report...', { color: 'blue' }); + + const report = recoveryManager.generateRecoveryReport(); + + if (outputFile) { + try { + const fs = require('fs'); + fs.writeFileSync(outputFile, report); + cliux.print(`โœ… Recovery report saved to: ${outputFile}`, { color: 'green' }); + } catch (error) { + cliux.print(`โŒ Failed to save report to file: ${error}`, { color: 'red' }); + cliux.print('\n๐Ÿ“„ Recovery Report:', { color: 'cyan' }); + cliux.print(report); + } + } else { + cliux.print('\n๐Ÿ“„ Recovery Report:', { color: 'cyan' }); + cliux.print(report); + } + } +} \ No newline at end of file diff --git a/packages/contentstack-import/src/commands/cm/stacks/import.ts b/packages/contentstack-import/src/commands/cm/stacks/import.ts index 9a086e780..56cda7a3c 100644 --- a/packages/contentstack-import/src/commands/cm/stacks/import.ts +++ b/packages/contentstack-import/src/commands/cm/stacks/import.ts @@ -108,6 +108,20 @@ export default class ImportCommand extends Command { description: 'Skips entry publishing during the import process', default: false, }), + 'force-backup': flags.boolean({ + description: 'Forces backup creation even for large datasets that would normally skip backup for memory optimization.', + default: false, + }), + 'disable-memory-optimization': flags.boolean({ + description: 'Disables memory optimization features and uses legacy processing (not recommended for large datasets).', + default: false, + }), + 'memory-threshold': flags.integer({ + description: 'Memory threshold in MB for triggering garbage collection (default: 768MB for large datasets).', + }), + 'asset-concurrency': flags.integer({ + description: 'Number of concurrent asset uploads (default: 10).', + }), }; static usage: string = diff --git a/packages/contentstack-import/src/config/index.ts b/packages/contentstack-import/src/config/index.ts index a18d7d077..4b63517a4 100644 --- a/packages/contentstack-import/src/config/index.ts +++ b/packages/contentstack-import/src/config/index.ts @@ -93,13 +93,23 @@ const config: DefaultConfig = { assetBatchLimit: 1, fileName: 'assets.json', importSameStructure: true, - uploadAssetsConcurrency: 2, + uploadAssetsConcurrency: 10, // Increased from 2 to 10 based on customer success displayExecutionTime: false, - importFoldersConcurrency: 1, + importFoldersConcurrency: 5, // Increased from 1 to 5 for better performance includeVersionedAssets: false, host: 'https://api.contentstack.io', folderValidKeys: ['name', 'parent_uid'], validKeys: ['title', 'parent_uid', 'description', 'tags'], + // New memory management configuration + enableMemoryMonitoring: true, // Enable memory monitoring by default + memoryThresholdMB: 768, // Memory pressure threshold for large datasets + enableIncrementalPersistence: true, // Enable incremental state saving + maxRetries: 5, // Retry logic for failed uploads + retryDelay: 2000, // Delay between retries (ms) + enableRateLimiting: true, // Enable rate limiting + rateLimitDelay: 200, // Delay between API calls (ms) + backupSkipThresholdGB: 1, // Skip backup for datasets larger than 1GB + queueClearInterval: 100, // Clear completed queue items every N items }, 'assets-old': { dirName: 'assets', diff --git a/packages/contentstack-import/src/import/modules/assets.ts b/packages/contentstack-import/src/import/modules/assets.ts index 007f21875..4c15ca98e 100644 --- a/packages/contentstack-import/src/import/modules/assets.ts +++ b/packages/contentstack-import/src/import/modules/assets.ts @@ -18,10 +18,20 @@ import { PATH_CONSTANTS } from '../../constants'; import config from '../../config'; import { ModuleClassParams } from '../../types'; -import { formatDate, PROCESS_NAMES, MODULE_CONTEXTS, MODULE_NAMES, PROCESS_STATUS } from '../../utils'; +import { + formatDate, + PROCESS_NAMES, + MODULE_CONTEXTS, + MODULE_NAMES, + PROCESS_STATUS, + MemoryMonitor, + IncrementalStateManager, + AssetQueue, + AssetProcessor +} from '../../utils'; import BaseClass, { ApiOptions } from './base-class'; -export default class ImportAssets extends BaseClass { +export default class ImportAssets extends BaseClass implements AssetProcessor { private fs: FsUtility; private assetsPath: string; private mapperDirPath: string; @@ -35,6 +45,12 @@ export default class ImportAssets extends BaseClass { private assetsUrlMap: Record = {}; private assetsFolderMap: Record = {}; private rootFolder: { uid: string; name: string; parent_uid: string; created_at: string }; + + // New memory management utilities + private memoryMonitor: MemoryMonitor; + private stateManager: IncrementalStateManager; + private assetQueue: AssetQueue; + private enableMemoryOptimizations: boolean; constructor({ importConfig, stackAPIClient }: ModuleClassParams) { super({ importConfig, stackAPIClient }); @@ -60,6 +76,40 @@ export default class ImportAssets extends BaseClass { ), true, ) as Record; + + // Initialize memory management utilities + this.enableMemoryOptimizations = this.assetConfig.enableMemoryMonitoring !== false; + + if (this.enableMemoryOptimizations) { + // Create memory monitor for large datasets + this.memoryMonitor = MemoryMonitor.createForLargeDataset(this.importConfig.context); + + // Create incremental state manager + this.stateManager = IncrementalStateManager.createForLargeDataset( + this.importConfig.backupDir, + this.importConfig.context + ); + + // Create asset queue with memory management + this.assetQueue = AssetQueue.createForLargeDataset( + this.memoryMonitor, + this.stateManager, + this.importConfig.context + ); + + // Set this class as the asset processor + this.assetQueue.setProcessor(this); + + // Check for existing state and log resume capability + const existingMappings = this.stateManager.getMappingCount(); + if (existingMappings.assets > 0 || existingMappings.folders > 0) { + log.info(`Found existing import state: ${existingMappings.assets} assets, ${existingMappings.folders} folders. Import will resume from where it left off.`, this.importConfig.context); + } + + log.debug('Memory optimization utilities initialized', this.importConfig.context); + } else { + log.debug('Memory optimizations disabled', this.importConfig.context); + } } /** @@ -235,6 +285,153 @@ export default class ImportAssets extends BaseClass { log.debug(`Found ${indexerCount} asset chunks to process`, this.importConfig.context); + // Use memory-efficient processing if enabled + if (this.enableMemoryOptimizations && !isVersion) { + await this.importAssetsMemoryEfficient(fs, indexer, indexerCount, progressProcessName); + } else { + // Fallback to legacy processing for versioned assets or when optimizations are disabled + await this.importAssetsLegacy(fs, indexer, indexerCount, processName, progressProcessName, isVersion); + } + + // Handle state persistence + if (!isVersion) { + if (this.enableMemoryOptimizations) { + // Flush incremental state + await this.stateManager.flushState(); + + // Write legacy mapping files for compatibility + const assetMappings = this.stateManager.getAllMappings('asset'); + const urlMappings = this.stateManager.getUrlMappings(); + + if (!isEmpty(assetMappings)) { + log.debug(`Writing ${Object.keys(assetMappings).length} UID mappings`, this.importConfig.context); + this.fs.writeFile(this.assetUidMapperPath, assetMappings); + } + if (!isEmpty(urlMappings)) { + log.debug(`Writing ${Object.keys(urlMappings).length} URL mappings`, this.importConfig.context); + this.fs.writeFile(this.assetUrlMapperPath, urlMappings); + } + } else { + // Legacy state persistence + if (!isEmpty(this.assetsUidMap)) { + const uidMappingCount = Object.keys(this.assetsUidMap || {}).length; + log.debug(`Writing ${uidMappingCount} UID mappings`, this.importConfig.context); + this.fs.writeFile(this.assetUidMapperPath, this.assetsUidMap); + } + if (!isEmpty(this.assetsUrlMap)) { + const urlMappingCount = Object.keys(this.assetsUrlMap || {}).length; + log.debug(`Writing ${urlMappingCount} URL mappings`, this.importConfig.context); + this.fs.writeFile(this.assetUrlMapperPath, this.assetsUrlMap); + } + } + } + } + + /** + * Memory-efficient asset import using queue-based processing + */ + private async importAssetsMemoryEfficient( + fs: FsUtility, + indexer: Record, + indexerCount: number, + progressProcessName: string + ): Promise { + log.debug('Using memory-efficient asset processing', this.importConfig.context); + + // Set up queue event handlers for progress tracking + this.assetQueue.on('itemCompleted', (item, result) => { + this.progressManager?.tick(true, `asset: ${item.asset.title || item.asset.uid}`, null, progressProcessName); + log.success(`Created asset: '${item.asset.title}'`, this.importConfig.context); + }); + + this.assetQueue.on('itemFailed', (item, error) => { + this.progressManager?.tick( + false, + `asset: ${item.asset.title || item.asset.uid}`, + error?.message || PROCESS_STATUS[PROCESS_NAMES.ASSET_UPLOAD].FAILED, + progressProcessName, + ); + log.error(`${item.asset.title} asset upload failed: ${error}`, this.importConfig.context); + }); + + let processedChunks = 0; + + /* eslint-disable @typescript-eslint/no-unused-vars, guard-for-in */ + for (const index in indexer) { + // Memory check before processing each chunk + if (this.memoryMonitor.checkMemoryPressure()) { + log.debug(`Memory pressure detected at chunk ${index}, triggering GC`, this.importConfig.context); + await this.memoryMonitor.forceGarbageCollection(); + + // Brief pause after GC + await this.sleep(500); + } + + // Log memory stats periodically + if (processedChunks % 10 === 0) { + this.memoryMonitor.logMemoryStats(`Chunk ${index}/${indexerCount}`); + } + + log.debug(`Processing chunk ${index} of ${indexerCount}`, this.importConfig.context); + + const chunk = await fs.readChunkFiles.next().catch((error) => { + handleAndLogError(error, { ...this.importConfig.context }); + }); + + if (chunk) { + const apiContent = orderBy(values(chunk as Record[]), '_version'); + log.debug(`Queueing ${apiContent.length} assets from chunk`, this.importConfig.context); + + // Queue assets individually, skipping already processed ones + let skippedCount = 0; + for (const asset of apiContent) { + // Check if asset was already processed (resume functionality) + if (this.stateManager.hasMapping(asset.uid, 'asset')) { + skippedCount++; + // Still update progress for skipped items + this.progressManager?.tick(true, `asset: ${asset.title || asset.uid} (resumed)`, null, progressProcessName); + continue; + } + + this.assetQueue.enqueue(asset); + } + + if (skippedCount > 0) { + log.debug(`Skipped ${skippedCount} already processed assets from chunk`, this.importConfig.context); + } + + // Clear chunk from memory immediately + (chunk as any) = null; + + // Process queued assets with controlled concurrency + await this.assetQueue.waitForCompletion(); + + // Clear completed items from queue to free memory + const clearedCount = this.assetQueue.clearCompleted(); + if (clearedCount > 0) { + log.debug(`Cleared ${clearedCount} completed items from queue`, this.importConfig.context); + } + + processedChunks++; + } + } + + log.debug(`Memory-efficient processing completed for ${processedChunks} chunks`, this.importConfig.context); + } + + /** + * Legacy asset import processing (fallback) + */ + private async importAssetsLegacy( + fs: FsUtility, + indexer: Record, + indexerCount: number, + processName: string, + progressProcessName: string, + isVersion: boolean + ): Promise { + log.debug('Using legacy asset processing', this.importConfig.context); + const onSuccess = ({ response = {}, apiData: { uid, url, title } = undefined }: any) => { this.assetsUidMap[uid] = response.uid; this.assetsUrlMap[url] = response.url; @@ -309,19 +506,13 @@ export default class ImportAssets extends BaseClass { ); } } + } - if (!isVersion) { - if (!isEmpty(this.assetsUidMap)) { - const uidMappingCount = Object.keys(this.assetsUidMap || {}).length; - log.debug(`Writing ${uidMappingCount} UID mappings`, this.importConfig.context); - this.fs.writeFile(this.assetUidMapperPath, this.assetsUidMap); - } - if (!isEmpty(this.assetsUrlMap)) { - const urlMappingCount = Object.keys(this.assetsUrlMap || {}).length; - log.debug(`Writing ${urlMappingCount} URL mappings`, this.importConfig.context); - this.fs.writeFile(this.assetUrlMapperPath, this.assetsUrlMap); - } - } + /** + * Sleep utility for memory management + */ + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); } /** @@ -605,4 +796,72 @@ export default class ImportAssets extends BaseClass { await action(); progress.completeProcess(name, true); } + + /** + * AssetProcessor interface implementation + * Process a single asset through the API + */ + async processAsset(asset: any): Promise<{ uid: string; url?: string }> { + // Check if asset was already processed (additional safety check) + if (this.enableMemoryOptimizations && this.stateManager.hasMapping(asset.uid, 'asset')) { + const existingUid = this.stateManager.getMapping(asset.uid, 'asset'); + log.debug(`Asset ${asset.uid} already processed, returning existing mapping: ${existingUid}`, this.importConfig.context); + return { uid: existingUid as string, url: asset.url }; + } + + const serializedAsset = this.serializeAssets({ + apiData: asset, + entity: 'create-assets', + resolve: () => {}, + reject: () => {} + }); + + if (!serializedAsset.entity) { + // Asset was skipped during serialization + throw new Error(`Asset ${asset.uid} was skipped during serialization`); + } + + // Use the existing makeAPICall method but wrap it in a promise + return new Promise((resolve, reject) => { + const apiOptions: ApiOptions = { + ...serializedAsset, + resolve: ({ response }) => { + // Update legacy mappings for compatibility + if (this.enableMemoryOptimizations) { + // State manager handles persistence automatically + } else { + // Legacy behavior + this.assetsUidMap[asset.uid] = response.uid; + if (asset.url && response.url) { + this.assetsUrlMap[asset.url] = response.url; + } + } + + resolve({ + uid: response.uid, + url: response.url + }); + }, + reject: ({ error }) => { + // Enhanced error handling with context + const enhancedError = new Error(`Failed to process asset ${asset.uid} (${asset.title}): ${error.message}`); + enhancedError.stack = error.stack; + + // Add asset context to error for better debugging + (enhancedError as any).assetContext = { + uid: asset.uid, + title: asset.title, + filename: asset.filename, + fileSize: asset.file_size, + contentType: asset.content_type + }; + + reject(enhancedError); + }, + includeParamOnCompletion: true + }; + + this.makeAPICall(apiOptions, false); + }); + } } diff --git a/packages/contentstack-import/src/types/default-config.ts b/packages/contentstack-import/src/types/default-config.ts index 2b7c3bd95..f21855cae 100644 --- a/packages/contentstack-import/src/types/default-config.ts +++ b/packages/contentstack-import/src/types/default-config.ts @@ -61,6 +61,16 @@ export default interface DefaultConfig { host: string; folderValidKeys: string[]; validKeys: string[]; + // New memory management options + enableMemoryMonitoring?: boolean; + memoryThresholdMB?: number; + enableIncrementalPersistence?: boolean; + maxRetries?: number; + retryDelay?: number; + enableRateLimiting?: boolean; + rateLimitDelay?: number; + backupSkipThresholdGB?: number; + queueClearInterval?: number; }; 'assets-old': { dirName: string; diff --git a/packages/contentstack-import/src/types/import-config.ts b/packages/contentstack-import/src/types/import-config.ts index 86db5668d..3a74baeab 100644 --- a/packages/contentstack-import/src/types/import-config.ts +++ b/packages/contentstack-import/src/types/import-config.ts @@ -57,6 +57,8 @@ export default interface ImportConfig extends DefaultConfig, ExternalConfig { region: Region; personalizeProjectName?: string; 'exclude-global-modules': false; + // New memory optimization properties + forceBackup?: boolean; context: Context; } diff --git a/packages/contentstack-import/src/utils/asset-queue.ts b/packages/contentstack-import/src/utils/asset-queue.ts new file mode 100644 index 000000000..9bdd2aef4 --- /dev/null +++ b/packages/contentstack-import/src/utils/asset-queue.ts @@ -0,0 +1,377 @@ +import { EventEmitter } from 'events'; +import { log } from '@contentstack/cli-utilities'; +import { MemoryMonitor } from './memory-monitor'; +import { IncrementalStateManager } from './incremental-state'; + +export interface AssetQueueItem { + id: string; + asset: any; + retryCount: number; + status: 'pending' | 'processing' | 'completed' | 'failed'; + createdAt: number; + processedAt?: number; + error?: Error; +} + +export interface AssetQueueConfig { + maxConcurrency: number; + maxRetries: number; + retryDelay: number; + enableRateLimiting: boolean; + rateLimitDelay: number; + memoryMonitor?: MemoryMonitor; + stateManager?: IncrementalStateManager; + context: Record; +} + +export interface AssetProcessor { + processAsset(asset: any): Promise<{ uid: string; url?: string }>; +} + +export class AssetQueue extends EventEmitter { + private queue: AssetQueueItem[] = []; + private activeWorkers: number = 0; + private maxConcurrency: number; + private maxRetries: number; + private retryDelay: number; + private enableRateLimiting: boolean; + private rateLimitDelay: number; + private memoryMonitor?: MemoryMonitor; + private stateManager?: IncrementalStateManager; + private context: Record; + private processor?: AssetProcessor; + private isProcessing: boolean = false; + private completedCount: number = 0; + private failedCount: number = 0; + private lastProcessTime: number = 0; + + constructor(config: AssetQueueConfig) { + super(); + this.maxConcurrency = config.maxConcurrency; + this.maxRetries = config.maxRetries; + this.retryDelay = config.retryDelay; + this.enableRateLimiting = config.enableRateLimiting; + this.rateLimitDelay = config.rateLimitDelay; + this.memoryMonitor = config.memoryMonitor; + this.stateManager = config.stateManager; + this.context = config.context; + + log.debug(`Asset queue initialized with concurrency: ${this.maxConcurrency}, retries: ${this.maxRetries}`, this.context); + } + + /** + * Set the asset processor + */ + setProcessor(processor: AssetProcessor): void { + this.processor = processor; + } + + /** + * Add an asset to the queue + */ + enqueue(asset: any): string { + const id = this.generateId(); + const item: AssetQueueItem = { + id, + asset, + retryCount: 0, + status: 'pending', + createdAt: Date.now() + }; + + this.queue.push(item); + this.emit('enqueued', item); + + log.debug(`Enqueued asset: ${asset.uid || asset.title || id}`, this.context); + + // Start processing if not already running + if (!this.isProcessing) { + this.startProcessing(); + } + + return id; + } + + /** + * Add multiple assets to the queue + */ + enqueueBatch(assets: any[]): string[] { + const ids: string[] = []; + + for (const asset of assets) { + ids.push(this.enqueue(asset)); + } + + log.debug(`Enqueued batch of ${assets.length} assets`, this.context); + return ids; + } + + /** + * Start processing the queue + */ + async startProcessing(): Promise { + if (this.isProcessing) { + return; + } + + if (!this.processor) { + throw new Error('Asset processor not set. Call setProcessor() first.'); + } + + this.isProcessing = true; + this.emit('processingStarted'); + + log.debug('Started asset queue processing', this.context); + + while (this.queue.length > 0 || this.activeWorkers > 0) { + // Check memory pressure before starting new workers + if (this.memoryMonitor?.checkMemoryPressure()) { + log.debug('Memory pressure detected, pausing new workers', this.context); + await this.memoryMonitor.forceGarbageCollection(); + await this.sleep(1000); // Brief pause after GC + } + + // Start new workers if we have capacity and pending items + while (this.activeWorkers < this.maxConcurrency && this.queue.length > 0) { + const item = this.getNextPendingItem(); + if (item) { + this.processItem(item); + } else { + break; // No pending items available + } + } + + // Wait a bit before checking again + await this.sleep(100); + } + + this.isProcessing = false; + this.emit('processingCompleted', { + completed: this.completedCount, + failed: this.failedCount, + total: this.completedCount + this.failedCount + }); + + log.debug(`Asset queue processing completed. Completed: ${this.completedCount}, Failed: ${this.failedCount}`, this.context); + } + + /** + * Process a single queue item + */ + private async processItem(item: AssetQueueItem): Promise { + this.activeWorkers++; + item.status = 'processing'; + item.processedAt = Date.now(); + + this.emit('itemStarted', item); + + try { + // Rate limiting + if (this.enableRateLimiting) { + const timeSinceLastProcess = Date.now() - this.lastProcessTime; + if (timeSinceLastProcess < this.rateLimitDelay) { + await this.sleep(this.rateLimitDelay - timeSinceLastProcess); + } + } + + // Process the asset + const result = await this.processor!.processAsset(item.asset); + + // Update state manager if available + if (this.stateManager) { + this.stateManager.addMapping( + item.asset.uid, + result.uid, + 'asset', + result.url + ); + } + + item.status = 'completed'; + this.completedCount++; + this.lastProcessTime = Date.now(); + + this.emit('itemCompleted', item, result); + log.debug(`Completed asset: ${item.asset.uid || item.asset.title} -> ${result.uid}`, this.context); + + } catch (error) { + item.error = error as Error; + + if (item.retryCount < this.maxRetries) { + // Retry the item + item.retryCount++; + item.status = 'pending'; + + log.debug(`Retrying asset ${item.asset.uid || item.asset.title} (attempt ${item.retryCount}/${this.maxRetries})`, this.context); + + // Add delay before retry + setTimeout(() => { + // Item will be picked up in the next processing cycle + }, this.retryDelay * item.retryCount); + + this.emit('itemRetry', item, error); + } else { + // Max retries reached + item.status = 'failed'; + this.failedCount++; + + this.emit('itemFailed', item, error); + log.error(`Failed to process asset ${item.asset.uid || item.asset.title} after ${this.maxRetries} retries: ${error}`, this.context); + } + } finally { + this.activeWorkers--; + } + } + + /** + * Get the next pending item from the queue + */ + private getNextPendingItem(): AssetQueueItem | null { + const index = this.queue.findIndex(item => item.status === 'pending'); + if (index === -1) { + return null; + } + + return this.queue[index]; + } + + /** + * Get queue statistics + */ + getStats(): { + total: number; + pending: number; + processing: number; + completed: number; + failed: number; + activeWorkers: number; + } { + const stats = { + total: this.queue.length, + pending: 0, + processing: 0, + completed: this.completedCount, + failed: this.failedCount, + activeWorkers: this.activeWorkers + }; + + for (const item of this.queue) { + if (item.status === 'pending') stats.pending++; + else if (item.status === 'processing') stats.processing++; + } + + return stats; + } + + /** + * Clear completed items from the queue to free memory + */ + clearCompleted(): number { + const beforeLength = this.queue.length; + this.queue = this.queue.filter(item => + item.status !== 'completed' && item.status !== 'failed' + ); + const cleared = beforeLength - this.queue.length; + + if (cleared > 0) { + log.debug(`Cleared ${cleared} completed/failed items from queue`, this.context); + } + + return cleared; + } + + /** + * Wait for all items in the queue to complete + */ + async waitForCompletion(): Promise { + return new Promise((resolve) => { + if (!this.isProcessing && this.queue.length === 0) { + resolve(); + return; + } + + this.once('processingCompleted', () => { + resolve(); + }); + }); + } + + /** + * Pause processing + */ + pause(): void { + this.isProcessing = false; + this.emit('paused'); + log.debug('Asset queue processing paused', this.context); + } + + /** + * Resume processing + */ + resume(): void { + if (!this.isProcessing && this.queue.length > 0) { + this.startProcessing(); + } + } + + /** + * Clear the entire queue + */ + clear(): void { + const cleared = this.queue.length; + this.queue = []; + this.completedCount = 0; + this.failedCount = 0; + + log.debug(`Cleared entire queue (${cleared} items)`, this.context); + this.emit('cleared'); + } + + /** + * Generate a unique ID for queue items + */ + private generateId(): string { + return `asset_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } + + /** + * Sleep for specified milliseconds + */ + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + /** + * Create an asset queue with default configuration + */ + static createDefault(context: Record = {}): AssetQueue { + return new AssetQueue({ + maxConcurrency: 10, + maxRetries: 3, + retryDelay: 1000, + enableRateLimiting: true, + rateLimitDelay: 100, + context + }); + } + + /** + * Create an asset queue optimized for large datasets + */ + static createForLargeDataset( + memoryMonitor: MemoryMonitor, + stateManager: IncrementalStateManager, + context: Record = {} + ): AssetQueue { + return new AssetQueue({ + maxConcurrency: 8, // Slightly lower concurrency for memory management + maxRetries: 5, // More retries for reliability + retryDelay: 2000, // Longer retry delay + enableRateLimiting: true, + rateLimitDelay: 200, // Slightly longer rate limit delay + memoryMonitor, + stateManager, + context + }); + } +} \ No newline at end of file diff --git a/packages/contentstack-import/src/utils/backup-handler.ts b/packages/contentstack-import/src/utils/backup-handler.ts index 825c792d1..600a59372 100755 --- a/packages/contentstack-import/src/utils/backup-handler.ts +++ b/packages/contentstack-import/src/utils/backup-handler.ts @@ -1,29 +1,125 @@ import * as path from 'path'; import { copy } from 'fs-extra'; +import { statSync, readdirSync, createReadStream, createWriteStream, mkdirSync } from 'node:fs'; +import { pipeline } from 'node:stream/promises'; import { cliux, sanitizePath, log } from '@contentstack/cli-utilities'; import { fileHelper } from './index'; import { ImportConfig } from '../types'; +/** + * Calculate directory size in bytes + */ +async function getDirectorySize(dirPath: string): Promise { + let totalSize = 0; + + try { + const items = readdirSync(dirPath, { withFileTypes: true }); + + for (const item of items) { + const fullPath = path.join(dirPath, item.name); + + if (item.isDirectory()) { + totalSize += await getDirectorySize(fullPath); + } else if (item.isFile()) { + const stats = statSync(fullPath); + totalSize += stats.size; + } + } + } catch (error) { + log.warn(`Error calculating directory size for ${dirPath}: ${error}`, {}); + } + + return totalSize; +} + +/** + * Stream-based file copy for large files + */ +async function streamCopyFile(src: string, dest: string): Promise { + const readStream = createReadStream(src); + const writeStream = createWriteStream(dest); + + await pipeline(readStream, writeStream); +} + +/** + * Memory-efficient recursive directory copy + */ +async function streamCopyDirectory(src: string, dest: string, context: Record = {}): Promise { + try { + // Create destination directory + mkdirSync(dest, { recursive: true }); + + const items = readdirSync(src, { withFileTypes: true }); + + for (const item of items) { + const srcPath = path.join(src, item.name); + const destPath = path.join(dest, item.name); + + if (item.isDirectory()) { + await streamCopyDirectory(srcPath, destPath, context); + } else if (item.isFile()) { + const stats = statSync(srcPath); + + // Use streaming for files larger than 10MB + if (stats.size > 10 * 1024 * 1024) { + log.debug(`Streaming large file: ${item.name} (${Math.round(stats.size / 1024 / 1024)}MB)`, context); + await streamCopyFile(srcPath, destPath); + } else { + // Use regular copy for smaller files + await copy(srcPath, destPath); + } + } + } + } catch (error) { + log.error(`Error during stream copy: ${error}`, context); + throw error; + } +} + export default async function backupHandler(importConfig: ImportConfig): Promise { - log.debug('Starting backup handler process'); + log.debug('Starting backup handler process', importConfig.context); if (importConfig.hasOwnProperty('useBackedupDir')) { - log.debug(`Using existing backup directory: ${importConfig.useBackedupDir}`); + log.debug(`Using existing backup directory: ${importConfig.useBackedupDir}`, importConfig.context); return importConfig.useBackedupDir; } const sourceDir = importConfig.branchDir || importConfig.contentDir; log.debug( `Using source directory for backup: ${sourceDir} (branchDir: ${importConfig.branchDir}, contentDir: ${importConfig.contentDir})`, + importConfig.context ); + // Check if backup should be skipped for large datasets + const skipBackupThresholdGB = importConfig.modules?.assets?.backupSkipThresholdGB || 1; + const skipBackupThresholdBytes = skipBackupThresholdGB * 1024 * 1024 * 1024; + + // Calculate source directory size + log.debug('Calculating source directory size...', importConfig.context); + const sourceSize = await getDirectorySize(sourceDir); + const sourceSizeGB = sourceSize / (1024 * 1024 * 1024); + + log.debug(`Source directory size: ${sourceSizeGB.toFixed(2)}GB (${sourceSize} bytes)`, importConfig.context); + + // Check if we should skip backup for large datasets + if (sourceSize > skipBackupThresholdBytes && !importConfig.forceBackup) { + const skipBackupMessage = `Large dataset detected (${sourceSizeGB.toFixed(2)}GB > ${skipBackupThresholdGB}GB threshold). Skipping backup creation for memory optimization. Use --force-backup to override.`; + + log.warn(skipBackupMessage, importConfig.context); + cliux.print(skipBackupMessage, { color: 'yellow' }); + + // Return the source directory as the "backup" directory + return sourceDir; + } + let backupDirPath: string; const subDir = isSubDirectory(importConfig, sourceDir); if (subDir) { backupDirPath = path.resolve(sanitizePath(sourceDir), '..', '_backup_' + Math.floor(Math.random() * 1000)); - log.debug(`Detected subdirectory configuration, creating backup at: ${backupDirPath}`); + log.debug(`Detected subdirectory configuration, creating backup at: ${backupDirPath}`, importConfig.context); if (importConfig.createBackupDir) { cliux.print( @@ -36,37 +132,57 @@ export default async function backupHandler(importConfig: ImportConfig): Promise } else { // NOTE: If the backup folder's directory is provided, create it at that location; otherwise, the default path (working directory). backupDirPath = path.join(process.cwd(), '_backup_' + Math.floor(Math.random() * 1000)); - log.debug(`Using default backup directory: ${backupDirPath}`); + log.debug(`Using default backup directory: ${backupDirPath}`, importConfig.context); if (importConfig.createBackupDir) { - log.debug(`Custom backup directory specified: ${importConfig.createBackupDir}`); + log.debug(`Custom backup directory specified: ${importConfig.createBackupDir}`, importConfig.context); if (fileHelper.fileExistsSync(importConfig.createBackupDir)) { - log.debug(`Removing existing backup directory: ${importConfig.createBackupDir}`); + log.debug(`Removing existing backup directory: ${importConfig.createBackupDir}`, importConfig.context); fileHelper.removeDirSync(importConfig.createBackupDir); } - log.debug(`Creating backup directory: ${importConfig.createBackupDir}`); + log.debug(`Creating backup directory: ${importConfig.createBackupDir}`, importConfig.context); fileHelper.makeDirectory(importConfig.createBackupDir); backupDirPath = importConfig.createBackupDir; } } if (backupDirPath) { - log.debug(`Starting content copy to backup directory: ${backupDirPath}`); + log.debug(`Starting content copy to backup directory: ${backupDirPath}`, importConfig.context); log.info('Copying content to the backup directory...', importConfig.context); + // Use streaming copy for large datasets + const useStreamingCopy = sourceSizeGB > 0.5; // Use streaming for datasets > 500MB + + if (useStreamingCopy) { + log.debug(`Using streaming copy for large dataset (${sourceSizeGB.toFixed(2)}GB)`, importConfig.context); + + try { + await streamCopyDirectory(sourceDir, backupDirPath, importConfig.context); + log.debug(`Successfully created backup at: ${backupDirPath}`, importConfig.context); + return backupDirPath; + } catch (error) { + log.error(`Streaming copy failed, falling back to regular copy: ${error}`, importConfig.context); + // Fall through to regular copy + } + } + + // Regular copy (fallback or for smaller datasets) return new Promise((resolve, reject) => { return copy(sourceDir, backupDirPath, (error: any) => { if (error) { return reject(error); } - log.debug(`Successfully created backup at: ${backupDirPath}`); + log.debug(`Successfully created backup at: ${backupDirPath}`, importConfig.context); resolve(backupDirPath); }); }); } + + // Should not reach here, but return sourceDir as fallback + return sourceDir; } /** diff --git a/packages/contentstack-import/src/utils/import-config-handler.ts b/packages/contentstack-import/src/utils/import-config-handler.ts index 0eb0ee297..c0697deba 100644 --- a/packages/contentstack-import/src/utils/import-config-handler.ts +++ b/packages/contentstack-import/src/utils/import-config-handler.ts @@ -126,6 +126,27 @@ const setupConfig = async (importCmdFlags: any): Promise => { config['exclude-global-modules'] = importCmdFlags['exclude-global-modules']; } + // Handle new memory optimization flags + if (importCmdFlags['force-backup']) { + config.forceBackup = importCmdFlags['force-backup']; + } + + if (importCmdFlags['disable-memory-optimization']) { + config.modules.assets.enableMemoryMonitoring = false; + config.modules.assets.enableIncrementalPersistence = false; + log.debug('Memory optimization disabled via command line flag', { ...config }); + } + + if (importCmdFlags['memory-threshold']) { + config.modules.assets.memoryThresholdMB = importCmdFlags['memory-threshold']; + log.debug(`Memory threshold set to ${importCmdFlags['memory-threshold']}MB`, { ...config }); + } + + if (importCmdFlags['asset-concurrency']) { + config.modules.assets.uploadAssetsConcurrency = importCmdFlags['asset-concurrency']; + log.debug(`Asset concurrency set to ${importCmdFlags['asset-concurrency']}`, { ...config }); + } + // Add authentication details to config for context tracking config.authenticationMethod = authenticationMethod; log.debug('Import configuration setup completed.', { ...config }); diff --git a/packages/contentstack-import/src/utils/import-recovery.ts b/packages/contentstack-import/src/utils/import-recovery.ts new file mode 100644 index 000000000..084e0bd89 --- /dev/null +++ b/packages/contentstack-import/src/utils/import-recovery.ts @@ -0,0 +1,219 @@ +import { existsSync, readFileSync } from 'node:fs'; +import { join } from 'node:path'; +import { log } from '@contentstack/cli-utilities'; + +export interface ImportRecoveryInfo { + stateFileExists: boolean; + stateFilePath: string; + mappingCounts: { + assets: number; + folders: number; + urls: number; + }; + lastUpdated?: number; + canResume: boolean; + estimatedProgress?: number; +} + +export interface RecoveryRecommendation { + action: 'resume' | 'restart' | 'investigate'; + reason: string; + commands?: string[]; + warnings?: string[]; +} + +export class ImportRecoveryManager { + private backupDir: string; + private context: Record; + + constructor(backupDir: string, context: Record = {}) { + this.backupDir = backupDir; + this.context = context; + } + + /** + * Analyze the current import state and provide recovery information + */ + analyzeImportState(): ImportRecoveryInfo { + const stateFilePath = join(this.backupDir, '.import-state.json'); + const stateFileExists = existsSync(stateFilePath); + + const info: ImportRecoveryInfo = { + stateFileExists, + stateFilePath, + mappingCounts: { assets: 0, folders: 0, urls: 0 }, + canResume: false + }; + + if (stateFileExists) { + try { + const stateContent = readFileSync(stateFilePath, 'utf8'); + const state = JSON.parse(stateContent); + + info.mappingCounts = { + assets: Object.keys(state.assets || {}).length, + folders: Object.keys(state.folders || {}).length, + urls: Object.keys(state.urls || {}).length + }; + + info.lastUpdated = state.lastUpdated; + info.canResume = info.mappingCounts.assets > 0 || info.mappingCounts.folders > 0; + + // Estimate progress if we have asset count + if (info.mappingCounts.assets > 0) { + // This is a rough estimate - in reality we'd need to know total asset count + info.estimatedProgress = Math.min(95, (info.mappingCounts.assets / 100) * 10); // Very rough estimate + } + + log.debug(`Import state analysis: ${info.mappingCounts.assets} assets, ${info.mappingCounts.folders} folders processed`, this.context); + } catch (error) { + log.warn(`Failed to parse import state file: ${error}`, this.context); + info.canResume = false; + } + } + + return info; + } + + /** + * Provide recovery recommendations based on the current state + */ + getRecoveryRecommendation(info?: ImportRecoveryInfo): RecoveryRecommendation { + if (!info) { + info = this.analyzeImportState(); + } + + // No state file - fresh start + if (!info.stateFileExists) { + return { + action: 'restart', + reason: 'No previous import state found. Starting fresh import.', + commands: ['csdx cm:stacks:import --data-dir --stack-api-key '] + }; + } + + // State file exists but no mappings - likely failed early + if (info.mappingCounts.assets === 0 && info.mappingCounts.folders === 0) { + return { + action: 'restart', + reason: 'Import state file exists but no assets or folders were processed. Likely failed during initialization.', + commands: ['csdx cm:stacks:import --data-dir --stack-api-key '], + warnings: ['Previous state file will be overwritten'] + }; + } + + // Significant progress made - recommend resume + if (info.mappingCounts.assets > 10 || info.mappingCounts.folders > 5) { + return { + action: 'resume', + reason: `Significant progress detected (${info.mappingCounts.assets} assets, ${info.mappingCounts.folders} folders processed). Resuming will skip already imported items.`, + commands: [ + 'csdx cm:stacks:import --data-dir --stack-api-key ', + '# The import will automatically detect and resume from existing state' + ] + }; + } + + // Some progress but not much - could go either way + if (info.mappingCounts.assets > 0 || info.mappingCounts.folders > 0) { + return { + action: 'resume', + reason: `Some progress detected (${info.mappingCounts.assets} assets, ${info.mappingCounts.folders} folders processed). You can resume or restart.`, + commands: [ + '# To resume:', + 'csdx cm:stacks:import --data-dir --stack-api-key ', + '', + '# To restart (will overwrite existing state):', + 'rm .import-state.json', + 'csdx cm:stacks:import --data-dir --stack-api-key ' + ], + warnings: ['If restarting, previously imported assets may be duplicated unless using --replace-existing'] + }; + } + + // Fallback + return { + action: 'investigate', + reason: 'Import state is unclear. Manual investigation recommended.', + commands: [ + 'cat .import-state.json | jq .', + 'ls -la /', + 'csdx cm:stacks:import --help' + ] + }; + } + + /** + * Clean up import state (for fresh restart) + */ + cleanImportState(): boolean { + const stateFilePath = join(this.backupDir, '.import-state.json'); + const backupPath = `${stateFilePath}.backup`; + + try { + if (existsSync(stateFilePath)) { + // Create backup before removing + const stateContent = readFileSync(stateFilePath, 'utf8'); + require('fs').writeFileSync(backupPath, stateContent); + + require('fs').unlinkSync(stateFilePath); + log.info(`Import state cleared. Backup saved to: ${backupPath}`, this.context); + return true; + } + } catch (error) { + log.error(`Failed to clean import state: ${error}`, this.context); + return false; + } + + return true; + } + + /** + * Generate a recovery report for debugging + */ + generateRecoveryReport(): string { + const info = this.analyzeImportState(); + const recommendation = this.getRecoveryRecommendation(info); + + const report = [ + '=== Contentstack Import Recovery Report ===', + '', + `Backup Directory: ${this.backupDir}`, + `State File: ${info.stateFilePath}`, + `State File Exists: ${info.stateFileExists}`, + '', + 'Progress Summary:', + ` Assets Processed: ${info.mappingCounts.assets}`, + ` Folders Processed: ${info.mappingCounts.folders}`, + ` URL Mappings: ${info.mappingCounts.urls}`, + '', + info.lastUpdated ? `Last Updated: ${new Date(info.lastUpdated).toISOString()}` : 'Last Updated: Unknown', + info.estimatedProgress ? `Estimated Progress: ${info.estimatedProgress}%` : '', + '', + `Can Resume: ${info.canResume}`, + '', + 'Recommendation:', + ` Action: ${recommendation.action.toUpperCase()}`, + ` Reason: ${recommendation.reason}`, + '', + 'Commands:', + ...(recommendation.commands || []).map(cmd => ` ${cmd}`), + '', + ...(recommendation.warnings ? [ + 'Warnings:', + ...recommendation.warnings.map(warning => ` โš ๏ธ ${warning}`), + '' + ] : []), + '=== End Report ===', + ]; + + return report.join('\n'); + } + + /** + * Create a recovery manager for a given backup directory + */ + static create(backupDir: string, context: Record = {}): ImportRecoveryManager { + return new ImportRecoveryManager(backupDir, context); + } +} \ No newline at end of file diff --git a/packages/contentstack-import/src/utils/incremental-state.ts b/packages/contentstack-import/src/utils/incremental-state.ts new file mode 100644 index 000000000..8d26972b9 --- /dev/null +++ b/packages/contentstack-import/src/utils/incremental-state.ts @@ -0,0 +1,276 @@ +import { join } from 'node:path'; +import { existsSync, writeFileSync, readFileSync, appendFileSync } from 'node:fs'; +import { log } from '@contentstack/cli-utilities'; + +export interface StateEntry { + oldUid: string; + newUid: string; + type: 'asset' | 'folder'; + timestamp: number; +} + +export interface AssetMappings { + assets: Record; + folders: Record; + urls: Record; +} + +export interface StateManagerConfig { + stateFilePath: string; + batchSize: number; + enableBackup: boolean; + context: Record; +} + +export class IncrementalStateManager { + private stateFilePath: string; + private batchSize: number; + private enableBackup: boolean; + private context: Record; + private pendingWrites: StateEntry[] = []; + private inMemoryMappings: AssetMappings = { + assets: {}, + folders: {}, + urls: {} + }; + private lastPersistTime: number = 0; + private persistPromise: Promise = Promise.resolve(); + + constructor(config: StateManagerConfig) { + this.stateFilePath = config.stateFilePath; + this.batchSize = config.batchSize; + this.enableBackup = config.enableBackup; + this.context = config.context; + + // Load existing state on initialization + this.loadExistingState(); + + log.debug(`Incremental state manager initialized with file: ${this.stateFilePath}`, this.context); + } + + /** + * Add a new mapping to be persisted + */ + addMapping(oldUid: string, newUid: string, type: 'asset' | 'folder', url?: string): void { + const entry: StateEntry = { + oldUid, + newUid, + type, + timestamp: Date.now() + }; + + // Update in-memory mappings immediately + if (type === 'asset') { + this.inMemoryMappings.assets[oldUid] = newUid; + if (url) { + this.inMemoryMappings.urls[url] = newUid; // This should be the new URL, but we'll store the mapping + } + } else if (type === 'folder') { + this.inMemoryMappings.folders[oldUid] = newUid; + } + + // Add to pending writes + this.pendingWrites.push(entry); + + // Check if we should persist + if (this.pendingWrites.length >= this.batchSize) { + this.persistState(); + } + + log.debug(`Added mapping: ${oldUid} -> ${newUid} (${type})`, this.context); + } + + /** + * Get a mapping by old UID + */ + getMapping(oldUid: string, type: 'asset' | 'folder'): string | undefined { + if (type === 'asset') { + return this.inMemoryMappings.assets[oldUid]; + } else if (type === 'folder') { + return this.inMemoryMappings.folders[oldUid]; + } + return undefined; + } + + /** + * Get all mappings of a specific type + */ + getAllMappings(type: 'asset' | 'folder'): Record { + if (type === 'asset') { + return { ...this.inMemoryMappings.assets }; + } else if (type === 'folder') { + return { ...this.inMemoryMappings.folders }; + } + return {}; + } + + /** + * Get URL mappings + */ + getUrlMappings(): Record { + return { ...this.inMemoryMappings.urls }; + } + + /** + * Check if a mapping exists + */ + hasMapping(oldUid: string, type: 'asset' | 'folder'): boolean { + return this.getMapping(oldUid, type) !== undefined; + } + + /** + * Get the count of mappings + */ + getMappingCount(): { assets: number; folders: number; urls: number } { + return { + assets: Object.keys(this.inMemoryMappings.assets).length, + folders: Object.keys(this.inMemoryMappings.folders).length, + urls: Object.keys(this.inMemoryMappings.urls).length + }; + } + + /** + * Persist pending state changes to disk + */ + persistState(): void { + if (this.pendingWrites.length === 0) { + return; + } + + // Chain persistence to avoid concurrent writes + this.persistPromise = this.persistPromise.then(async () => { + try { + const entriesToWrite = [...this.pendingWrites]; + this.pendingWrites = []; // Clear pending writes immediately + + if (entriesToWrite.length === 0) { + return; + } + + // Create backup if enabled + if (this.enableBackup && existsSync(this.stateFilePath)) { + const backupPath = `${this.stateFilePath}.backup`; + const currentContent = readFileSync(this.stateFilePath, 'utf8'); + writeFileSync(backupPath, currentContent); + } + + // Write current complete state + const completeState = { + assets: this.inMemoryMappings.assets, + folders: this.inMemoryMappings.folders, + urls: this.inMemoryMappings.urls, + lastUpdated: Date.now(), + totalEntries: entriesToWrite.length + }; + + writeFileSync(this.stateFilePath, JSON.stringify(completeState, null, 2)); + this.lastPersistTime = Date.now(); + + log.debug(`Persisted ${entriesToWrite.length} state entries to ${this.stateFilePath}`, this.context); + } catch (error) { + log.error(`Failed to persist state: ${error}`, this.context); + // Re-add entries to pending writes for retry + this.pendingWrites.unshift(...this.pendingWrites); + } + }); + } + + /** + * Force immediate persistence of all pending changes + */ + async flushState(): Promise { + this.persistState(); + await this.persistPromise; + log.debug('State flushed to disk', this.context); + } + + /** + * Load existing state from disk + */ + private loadExistingState(): void { + if (!existsSync(this.stateFilePath)) { + log.debug('No existing state file found, starting fresh', this.context); + return; + } + + try { + const content = readFileSync(this.stateFilePath, 'utf8'); + const state = JSON.parse(content); + + if (state.assets) { + this.inMemoryMappings.assets = state.assets; + } + if (state.folders) { + this.inMemoryMappings.folders = state.folders; + } + if (state.urls) { + this.inMemoryMappings.urls = state.urls; + } + + const counts = this.getMappingCount(); + log.debug(`Loaded existing state: ${counts.assets} assets, ${counts.folders} folders, ${counts.urls} URLs`, this.context); + } catch (error) { + log.warn(`Failed to load existing state file: ${error}. Starting fresh.`, this.context); + this.inMemoryMappings = { assets: {}, folders: {}, urls: {} }; + } + } + + /** + * Clear all in-memory mappings (useful for memory management) + */ + clearInMemoryMappings(): void { + const beforeCounts = this.getMappingCount(); + + // Only clear if we have persisted recently + const timeSinceLastPersist = Date.now() - this.lastPersistTime; + if (timeSinceLastPersist > 60000) { // 1 minute + log.warn('Attempting to clear mappings but no recent persist detected', this.context); + return; + } + + this.inMemoryMappings = { assets: {}, folders: {}, urls: {} }; + + log.debug(`Cleared in-memory mappings: ${beforeCounts.assets} assets, ${beforeCounts.folders} folders, ${beforeCounts.urls} URLs`, this.context); + } + + /** + * Get statistics about the state manager + */ + getStats(): { + mappingCounts: { assets: number; folders: number; urls: number }; + pendingWrites: number; + lastPersistTime: number; + stateFileExists: boolean; + } { + return { + mappingCounts: this.getMappingCount(), + pendingWrites: this.pendingWrites.length, + lastPersistTime: this.lastPersistTime, + stateFileExists: existsSync(this.stateFilePath) + }; + } + + /** + * Create a state manager with default configuration + */ + static createDefault(baseDir: string, context: Record = {}): IncrementalStateManager { + return new IncrementalStateManager({ + stateFilePath: join(baseDir, '.import-state.json'), + batchSize: 100, // Persist every 100 mappings + enableBackup: true, + context + }); + } + + /** + * Create a state manager optimized for large datasets + */ + static createForLargeDataset(baseDir: string, context: Record = {}): IncrementalStateManager { + return new IncrementalStateManager({ + stateFilePath: join(baseDir, '.import-state.json'), + batchSize: 50, // More frequent persistence for large datasets + enableBackup: true, + context + }); + } +} \ No newline at end of file diff --git a/packages/contentstack-import/src/utils/index.ts b/packages/contentstack-import/src/utils/index.ts index 49ab1146b..bc08d4f04 100644 --- a/packages/contentstack-import/src/utils/index.ts +++ b/packages/contentstack-import/src/utils/index.ts @@ -32,4 +32,8 @@ export { } from './entries-helper'; export * from './common-helper'; export { lookUpTaxonomy, lookUpTerms } from './taxonomies-helper'; +export { MemoryMonitor, MemoryStats, MemoryMonitorConfig } from './memory-monitor'; +export { IncrementalStateManager, StateEntry, AssetMappings, StateManagerConfig } from './incremental-state'; +export { AssetQueue, AssetQueueItem, AssetQueueConfig, AssetProcessor } from './asset-queue'; +export { ImportRecoveryManager, ImportRecoveryInfo, RecoveryRecommendation } from './import-recovery'; export { MODULE_CONTEXTS, MODULE_NAMES, PROCESS_NAMES, PROCESS_STATUS } from './constants'; diff --git a/packages/contentstack-import/src/utils/memory-monitor.ts b/packages/contentstack-import/src/utils/memory-monitor.ts new file mode 100644 index 000000000..d24b382a5 --- /dev/null +++ b/packages/contentstack-import/src/utils/memory-monitor.ts @@ -0,0 +1,175 @@ +import { log } from '@contentstack/cli-utilities'; + +export interface MemoryStats { + rss: number; + heapTotal: number; + heapUsed: number; + external: number; + arrayBuffers: number; + heapUsedMB: number; + heapTotalMB: number; + rssMB: number; +} + +export interface MemoryMonitorConfig { + thresholdMB: number; + gcCooldownMs: number; + enableLogging: boolean; + logInterval: number; +} + +export class MemoryMonitor { + private threshold: number; + private lastGC: number = 0; + private gcCooldownMs: number; + private enableLogging: boolean; + private logInterval: number; + private lastLogTime: number = 0; + private context: Record; + + constructor(config: MemoryMonitorConfig, context: Record = {}) { + this.threshold = config.thresholdMB * 1024 * 1024; // Convert MB to bytes + this.gcCooldownMs = config.gcCooldownMs; + this.enableLogging = config.enableLogging; + this.logInterval = config.logInterval; + this.context = context; + + log.debug(`Memory monitor initialized with threshold: ${config.thresholdMB}MB`, this.context); + } + + /** + * Check if memory usage exceeds the configured threshold + */ + checkMemoryPressure(): boolean { + const stats = this.getMemoryStats(); + const isOverThreshold = stats.heapUsed > this.threshold; + + if (this.enableLogging && this.shouldLog()) { + log.debug(`Memory check: ${stats.heapUsedMB}MB used, threshold: ${this.threshold / 1024 / 1024}MB, pressure: ${isOverThreshold}`, this.context); + } + + return isOverThreshold; + } + + /** + * Force garbage collection if available and cooldown period has passed + */ + async forceGarbageCollection(): Promise { + const now = Date.now(); + + if (now - this.lastGC < this.gcCooldownMs) { + log.debug(`GC skipped - cooldown period not elapsed (${now - this.lastGC}ms < ${this.gcCooldownMs}ms)`, this.context); + return; + } + + const beforeStats = this.getMemoryStats(); + + if (global.gc) { + log.debug(`Forcing garbage collection - heap before: ${beforeStats.heapUsedMB}MB`, this.context); + global.gc(); + + // Small delay to allow GC to complete + await this.sleep(100); + + const afterStats = this.getMemoryStats(); + const freedMB = beforeStats.heapUsedMB - afterStats.heapUsedMB; + + log.debug(`GC completed - heap after: ${afterStats.heapUsedMB}MB, freed: ${freedMB.toFixed(2)}MB`, this.context); + + this.lastGC = now; + } else { + log.warn('Garbage collection not available. Run with --expose-gc flag for better memory management.', this.context); + } + } + + /** + * Get current memory usage statistics + */ + getMemoryStats(): MemoryStats { + const usage = process.memoryUsage(); + + return { + rss: usage.rss, + heapTotal: usage.heapTotal, + heapUsed: usage.heapUsed, + external: usage.external, + arrayBuffers: usage.arrayBuffers, + heapUsedMB: Math.round(usage.heapUsed / 1024 / 1024 * 100) / 100, + heapTotalMB: Math.round(usage.heapTotal / 1024 / 1024 * 100) / 100, + rssMB: Math.round(usage.rss / 1024 / 1024 * 100) / 100, + }; + } + + /** + * Log memory statistics if logging is enabled and interval has passed + */ + logMemoryStats(label?: string): void { + if (!this.enableLogging) return; + + const stats = this.getMemoryStats(); + const prefix = label ? `${label} - ` : ''; + + log.debug(`${prefix}Memory: ${stats.heapUsedMB}MB used / ${stats.heapTotalMB}MB total (RSS: ${stats.rssMB}MB)`, this.context); + } + + /** + * Check if memory usage is approaching critical levels + */ + isCriticalMemoryPressure(): boolean { + const stats = this.getMemoryStats(); + const criticalThreshold = this.threshold * 1.5; // 50% above normal threshold + + return stats.heapUsed > criticalThreshold; + } + + /** + * Get memory pressure level as a percentage + */ + getMemoryPressureLevel(): number { + const stats = this.getMemoryStats(); + return Math.round((stats.heapUsed / this.threshold) * 100); + } + + /** + * Sleep for specified milliseconds + */ + private sleep(ms: number): Promise { + return new Promise(resolve => setTimeout(resolve, ms)); + } + + /** + * Check if enough time has passed since last log + */ + private shouldLog(): boolean { + const now = Date.now(); + if (now - this.lastLogTime > this.logInterval) { + this.lastLogTime = now; + return true; + } + return false; + } + + /** + * Create a memory monitor with default configuration + */ + static createDefault(context: Record = {}): MemoryMonitor { + return new MemoryMonitor({ + thresholdMB: 1024, // 1GB threshold + gcCooldownMs: 5000, // 5 second cooldown between GC calls + enableLogging: true, + logInterval: 30000, // Log every 30 seconds + }, context); + } + + /** + * Create a memory monitor for large dataset processing + */ + static createForLargeDataset(context: Record = {}): MemoryMonitor { + return new MemoryMonitor({ + thresholdMB: 768, // Lower threshold for large datasets (768MB) + gcCooldownMs: 3000, // More frequent GC (3 seconds) + enableLogging: true, + logInterval: 15000, // More frequent logging (15 seconds) + }, context); + } +} \ No newline at end of file diff --git a/packages/contentstack-import/test/integration/memory-optimized-import.test.ts b/packages/contentstack-import/test/integration/memory-optimized-import.test.ts new file mode 100644 index 000000000..2de13a7bc --- /dev/null +++ b/packages/contentstack-import/test/integration/memory-optimized-import.test.ts @@ -0,0 +1,357 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { join } from 'node:path'; +import { mkdirSync, writeFileSync, existsSync, rmSync } from 'node:fs'; +import ImportAssets from '../../src/import/modules/assets'; +import { MemoryMonitor, IncrementalStateManager, AssetQueue } from '../../src/utils'; + +describe('Memory Optimized Asset Import Integration', () => { + let importAssets: ImportAssets; + let testDir: string; + let mockStackAPIClient: any; + let mockImportConfig: any; + + beforeEach(() => { + // Create test directory + testDir = join(__dirname, 'test-import-data'); + if (existsSync(testDir)) { + rmSync(testDir, { recursive: true, force: true }); + } + mkdirSync(testDir, { recursive: true }); + + // Mock stack API client + mockStackAPIClient = { + asset: sinon.stub().returns({ + create: sinon.stub().resolves({ uid: 'new-asset-uid', url: 'new-asset-url' }), + folder: sinon.stub().returns({ + create: sinon.stub().resolves({ uid: 'new-folder-uid' }) + }) + }) + }; + + // Mock import config + mockImportConfig = { + backupDir: testDir, + context: { test: true }, + modules: { + assets: { + enableMemoryMonitoring: true, + uploadAssetsConcurrency: 2, + importFoldersConcurrency: 1, + memoryThresholdMB: 100, + maxRetries: 3, + retryDelay: 100 + } + } + }; + + // Create test data structure + setupTestData(); + + importAssets = new ImportAssets({ + importConfig: mockImportConfig, + stackAPIClient: mockStackAPIClient + }); + }); + + afterEach(() => { + if (existsSync(testDir)) { + rmSync(testDir, { recursive: true, force: true }); + } + sinon.restore(); + }); + + function setupTestData() { + // Create assets directory structure + const assetsDir = join(testDir, 'assets'); + const mapperDir = join(testDir, 'mapper', 'assets'); + const environmentsDir = join(testDir, 'environments'); + + mkdirSync(assetsDir, { recursive: true }); + mkdirSync(mapperDir, { recursive: true }); + mkdirSync(environmentsDir, { recursive: true }); + + // Create environments.json + writeFileSync(join(environmentsDir, 'environments.json'), JSON.stringify({ + 'env1': { name: 'development' }, + 'env2': { name: 'production' } + })); + + // Create assets.json index file + writeFileSync(join(assetsDir, 'assets.json'), JSON.stringify({ + '1': 'chunk1.json', + '2': 'chunk2.json' + })); + + // Create asset chunk files + const chunk1Assets = { + 'asset1': { + uid: 'asset1', + title: 'Test Asset 1', + filename: 'test1.jpg', + url: 'https://example.com/asset1.jpg', + content_type: 'image/jpeg', + file_size: 1024, + _version: 1, + parent_uid: null, + tags: ['test'] + }, + 'asset2': { + uid: 'asset2', + title: 'Test Asset 2', + filename: 'test2.png', + url: 'https://example.com/asset2.png', + content_type: 'image/png', + file_size: 2048, + _version: 1, + parent_uid: null, + tags: ['test'] + } + }; + + const chunk2Assets = { + 'asset3': { + uid: 'asset3', + title: 'Test Asset 3', + filename: 'test3.pdf', + url: 'https://example.com/asset3.pdf', + content_type: 'application/pdf', + file_size: 4096, + _version: 1, + parent_uid: null, + tags: ['document'] + } + }; + + writeFileSync(join(assetsDir, 'chunk1.json'), JSON.stringify(chunk1Assets)); + writeFileSync(join(assetsDir, 'chunk2.json'), JSON.stringify(chunk2Assets)); + + // Create asset files directory structure + const filesDir = join(assetsDir, 'files'); + mkdirSync(join(filesDir, 'asset1'), { recursive: true }); + mkdirSync(join(filesDir, 'asset2'), { recursive: true }); + mkdirSync(join(filesDir, 'asset3'), { recursive: true }); + + // Create dummy asset files + writeFileSync(join(filesDir, 'asset1', 'test1.jpg'), 'dummy image content'); + writeFileSync(join(filesDir, 'asset2', 'test2.png'), 'dummy image content'); + writeFileSync(join(filesDir, 'asset3', 'test3.pdf'), 'dummy pdf content'); + } + + describe('Memory Optimization Features', () => { + it('should initialize memory management utilities', () => { + expect((importAssets as any).memoryMonitor).to.be.instanceOf(MemoryMonitor); + expect((importAssets as any).stateManager).to.be.instanceOf(IncrementalStateManager); + expect((importAssets as any).assetQueue).to.be.instanceOf(AssetQueue); + }); + + it('should use memory-efficient processing for assets', async () => { + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + // Spy on memory-efficient method + const memoryEfficientSpy = sinon.spy(importAssets as any, 'importAssetsMemoryEfficient'); + + await importAssets.importAssets(); + + expect(memoryEfficientSpy.calledOnce).to.be.true; + }); + + it('should process assets individually through queue', async () => { + const assetQueue = (importAssets as any).assetQueue; + const enqueueSpy = sinon.spy(assetQueue, 'enqueue'); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Should have enqueued 3 assets (from both chunks) + expect(enqueueSpy.callCount).to.equal(3); + }); + + it('should persist state incrementally', async () => { + const stateManager = (importAssets as any).stateManager; + const addMappingSpy = sinon.spy(stateManager, 'addMapping'); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Should have added mappings for processed assets + expect(addMappingSpy.callCount).to.equal(3); + }); + + it('should check memory pressure during processing', async () => { + const memoryMonitor = (importAssets as any).memoryMonitor; + const checkMemoryPressureSpy = sinon.spy(memoryMonitor, 'checkMemoryPressure'); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + expect(checkMemoryPressureSpy.called).to.be.true; + }); + + it('should clear completed queue items to free memory', async () => { + const assetQueue = (importAssets as any).assetQueue; + const clearCompletedSpy = sinon.spy(assetQueue, 'clearCompleted'); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + expect(clearCompletedSpy.called).to.be.true; + }); + }); + + describe('Resume Functionality', () => { + it('should detect existing state and resume', async () => { + // Create existing state file + const stateFilePath = join(testDir, '.import-state.json'); + const existingState = { + assets: { 'asset1': 'existing-asset1' }, + folders: {}, + urls: { 'https://example.com/asset1.jpg': 'new-url1' }, + lastUpdated: Date.now() + }; + writeFileSync(stateFilePath, JSON.stringify(existingState)); + + // Create new import instance to load existing state + const resumeImportAssets = new ImportAssets({ + importConfig: mockImportConfig, + stackAPIClient: mockStackAPIClient + }); + + const stateManager = (resumeImportAssets as any).stateManager; + + // Should have loaded existing state + expect(stateManager.hasMapping('asset1', 'asset')).to.be.true; + expect(stateManager.getMapping('asset1', 'asset')).to.equal('existing-asset1'); + }); + + it('should skip already processed assets', async () => { + // Pre-populate state with one processed asset + const stateManager = (importAssets as any).stateManager; + stateManager.addMapping('asset1', 'existing-asset1', 'asset'); + + const assetQueue = (importAssets as any).assetQueue; + const enqueueSpy = sinon.spy(assetQueue, 'enqueue'); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Should only enqueue 2 assets (asset2 and asset3), skipping asset1 + expect(enqueueSpy.callCount).to.equal(2); + }); + }); + + describe('Error Handling', () => { + it('should handle API errors with retries', async () => { + // Mock API to fail first few times + let callCount = 0; + mockStackAPIClient.asset().create = sinon.stub().callsFake(() => { + callCount++; + if (callCount <= 2) { + return Promise.reject(new Error('API Error')); + } + return Promise.resolve({ uid: 'new-asset-uid', url: 'new-asset-url' }); + }); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Should have retried and eventually succeeded + expect(callCount).to.be.greaterThan(2); + }); + + it('should provide detailed error context', async () => { + // Mock API to always fail + mockStackAPIClient.asset().create = sinon.stub().rejects(new Error('Persistent API Error')); + + const assetQueue = (importAssets as any).assetQueue; + let errorContext: any; + + assetQueue.on('itemFailed', (item: any, error: any) => { + errorContext = error.assetContext; + }); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + expect(errorContext).to.exist; + expect(errorContext.uid).to.exist; + expect(errorContext.title).to.exist; + }); + }); + + describe('Performance Characteristics', () => { + it('should maintain controlled concurrency', async () => { + let maxConcurrent = 0; + let currentConcurrent = 0; + + // Mock API with concurrency tracking + mockStackAPIClient.asset().create = sinon.stub().callsFake(async () => { + currentConcurrent++; + maxConcurrent = Math.max(maxConcurrent, currentConcurrent); + + // Simulate processing time + await new Promise(resolve => setTimeout(resolve, 50)); + + currentConcurrent--; + return { uid: 'new-asset-uid', url: 'new-asset-url' }; + }); + + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Should not exceed configured concurrency (2) + expect(maxConcurrent).to.be.at.most(2); + }); + + it('should write final mapping files for compatibility', async () => { + // Mock progress manager + (importAssets as any).progressManager = { + tick: sinon.stub() + }; + + await importAssets.importAssets(); + + // Check that state was flushed and mapping files were written + const stateManager = (importAssets as any).stateManager; + const flushStateSpy = sinon.spy(stateManager, 'flushState'); + + // The flushState should have been called + expect(stateManager.getMappingCount().assets).to.equal(3); + }); + }); +}); \ No newline at end of file diff --git a/packages/contentstack-import/test/performance/memory-benchmark.test.ts b/packages/contentstack-import/test/performance/memory-benchmark.test.ts new file mode 100644 index 000000000..0eae4b03a --- /dev/null +++ b/packages/contentstack-import/test/performance/memory-benchmark.test.ts @@ -0,0 +1,251 @@ +import { expect } from 'chai'; +import { performance } from 'perf_hooks'; +import { MemoryMonitor, IncrementalStateManager, AssetQueue } from '../../src/utils'; + +describe('Memory Optimization Performance Benchmarks', () => { + let memoryMonitor: MemoryMonitor; + let stateManager: IncrementalStateManager; + let assetQueue: AssetQueue; + + beforeEach(() => { + memoryMonitor = MemoryMonitor.createForLargeDataset(); + stateManager = IncrementalStateManager.createForLargeDataset('/tmp/test'); + assetQueue = AssetQueue.createForLargeDataset(memoryMonitor, stateManager); + }); + + describe('Memory Monitor Performance', () => { + it('should check memory pressure efficiently', () => { + const iterations = 10000; + const start = performance.now(); + + for (let i = 0; i < iterations; i++) { + memoryMonitor.checkMemoryPressure(); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / iterations; + + // Should complete memory checks in reasonable time + expect(avgDuration).to.be.below(0.1); // Less than 0.1ms per check + console.log(`Memory pressure check: ${avgDuration.toFixed(4)}ms average over ${iterations} iterations`); + }); + + it('should get memory stats efficiently', () => { + const iterations = 1000; + const start = performance.now(); + + for (let i = 0; i < iterations; i++) { + memoryMonitor.getMemoryStats(); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / iterations; + + expect(avgDuration).to.be.below(1); // Less than 1ms per stats call + console.log(`Memory stats retrieval: ${avgDuration.toFixed(4)}ms average over ${iterations} iterations`); + }); + }); + + describe('State Manager Performance', () => { + it('should handle large numbers of mappings efficiently', () => { + const mappingCount = 10000; + const start = performance.now(); + + for (let i = 0; i < mappingCount; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / mappingCount; + + expect(avgDuration).to.be.below(0.1); // Less than 0.1ms per mapping + console.log(`State mapping addition: ${avgDuration.toFixed(4)}ms average over ${mappingCount} mappings`); + + // Verify all mappings were stored + expect(stateManager.getMappingCount().assets).to.equal(mappingCount); + }); + + it('should lookup mappings efficiently', () => { + // Pre-populate with mappings + const mappingCount = 1000; + for (let i = 0; i < mappingCount; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + } + + const lookupIterations = 10000; + const start = performance.now(); + + for (let i = 0; i < lookupIterations; i++) { + const assetId = `asset${i % mappingCount}`; + stateManager.getMapping(assetId, 'asset'); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / lookupIterations; + + expect(avgDuration).to.be.below(0.01); // Less than 0.01ms per lookup + console.log(`State mapping lookup: ${avgDuration.toFixed(4)}ms average over ${lookupIterations} lookups`); + }); + }); + + describe('Asset Queue Performance', () => { + it('should enqueue assets efficiently', () => { + const mockProcessor = { + processAsset: async () => ({ uid: 'new-uid', url: 'new-url' }) + }; + assetQueue.setProcessor(mockProcessor); + + const assetCount = 1000; + const assets = Array.from({ length: assetCount }, (_, i) => ({ + uid: `asset${i}`, + title: `Asset ${i}`, + filename: `file${i}.jpg` + })); + + const start = performance.now(); + + for (const asset of assets) { + assetQueue.enqueue(asset); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / assetCount; + + expect(avgDuration).to.be.below(0.1); // Less than 0.1ms per enqueue + console.log(`Asset queue enqueue: ${avgDuration.toFixed(4)}ms average over ${assetCount} assets`); + + const stats = assetQueue.getStats(); + expect(stats.total).to.equal(assetCount); + }); + + it('should handle queue statistics efficiently', () => { + // Pre-populate queue + const assetCount = 1000; + for (let i = 0; i < assetCount; i++) { + assetQueue.enqueue({ uid: `asset${i}`, title: `Asset ${i}` }); + } + + const iterations = 1000; + const start = performance.now(); + + for (let i = 0; i < iterations; i++) { + assetQueue.getStats(); + } + + const end = performance.now(); + const duration = end - start; + const avgDuration = duration / iterations; + + expect(avgDuration).to.be.below(1); // Less than 1ms per stats call + console.log(`Queue stats retrieval: ${avgDuration.toFixed(4)}ms average over ${iterations} calls`); + }); + }); + + describe('Memory Usage Patterns', () => { + it('should maintain reasonable memory usage during large operations', () => { + const initialStats = memoryMonitor.getMemoryStats(); + const initialHeapUsed = initialStats.heapUsed; + + // Simulate processing many assets + const assetCount = 5000; + for (let i = 0; i < assetCount; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + + // Trigger GC periodically + if (i % 1000 === 0 && global.gc) { + global.gc(); + } + } + + const finalStats = memoryMonitor.getMemoryStats(); + const heapGrowth = finalStats.heapUsed - initialHeapUsed; + const growthPerAsset = heapGrowth / assetCount; + + console.log(`Memory growth: ${(heapGrowth / 1024 / 1024).toFixed(2)}MB for ${assetCount} assets`); + console.log(`Growth per asset: ${growthPerAsset.toFixed(2)} bytes`); + + // Memory growth should be reasonable (less than 1KB per asset) + expect(growthPerAsset).to.be.below(1024); + }); + + it('should demonstrate memory efficiency vs legacy approach', () => { + const assetCount = 1000; + + // Simulate legacy approach (accumulating in memory) + const legacyStart = performance.now(); + const legacyMap: Record = {}; + + for (let i = 0; i < assetCount; i++) { + legacyMap[`asset${i}`] = `new-asset${i}`; + } + + const legacyEnd = performance.now(); + const legacyDuration = legacyEnd - legacyStart; + + // Simulate optimized approach (incremental persistence) + const optimizedStart = performance.now(); + + for (let i = 0; i < assetCount; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + } + + const optimizedEnd = performance.now(); + const optimizedDuration = optimizedEnd - optimizedStart; + + console.log(`Legacy approach: ${legacyDuration.toFixed(2)}ms`); + console.log(`Optimized approach: ${optimizedDuration.toFixed(2)}ms`); + + // Optimized approach might be slightly slower due to persistence, + // but should still be reasonable + expect(optimizedDuration).to.be.below(legacyDuration * 10); // Allow 10x overhead for persistence + }); + }); + + describe('Scalability Tests', () => { + it('should handle increasing dataset sizes efficiently', () => { + const testSizes = [100, 500, 1000, 5000]; + const results: Array<{ size: number; duration: number; memoryUsed: number }> = []; + + for (const size of testSizes) { + const startStats = memoryMonitor.getMemoryStats(); + const start = performance.now(); + + // Process assets of this size + for (let i = 0; i < size; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + } + + const end = performance.now(); + const endStats = memoryMonitor.getMemoryStats(); + + const duration = end - start; + const memoryUsed = endStats.heapUsed - startStats.heapUsed; + + results.push({ size, duration, memoryUsed }); + + console.log(`Size: ${size}, Duration: ${duration.toFixed(2)}ms, Memory: ${(memoryUsed / 1024 / 1024).toFixed(2)}MB`); + + // Clear state for next test + stateManager.clearInMemoryMappings(); + if (global.gc) global.gc(); + } + + // Verify that performance scales reasonably (not exponentially) + for (let i = 1; i < results.length; i++) { + const prev = results[i - 1]; + const curr = results[i]; + + const sizeRatio = curr.size / prev.size; + const durationRatio = curr.duration / prev.duration; + + // Duration should scale roughly linearly (allow 2x overhead) + expect(durationRatio).to.be.below(sizeRatio * 2); + } + }); + }); +}); \ No newline at end of file diff --git a/packages/contentstack-import/test/unit/utils/asset-queue.test.ts b/packages/contentstack-import/test/unit/utils/asset-queue.test.ts new file mode 100644 index 000000000..4417474d3 --- /dev/null +++ b/packages/contentstack-import/test/unit/utils/asset-queue.test.ts @@ -0,0 +1,251 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { AssetQueue, AssetProcessor } from '../../../src/utils/asset-queue'; +import { MemoryMonitor } from '../../../src/utils/memory-monitor'; +import { IncrementalStateManager } from '../../../src/utils/incremental-state'; + +describe('AssetQueue', () => { + let assetQueue: AssetQueue; + let mockProcessor: AssetProcessor; + let memoryMonitor: MemoryMonitor; + let stateManager: IncrementalStateManager; + + beforeEach(() => { + // Mock processor + mockProcessor = { + processAsset: sinon.stub().resolves({ uid: 'new-uid', url: 'new-url' }) + }; + + // Mock memory monitor + memoryMonitor = { + checkMemoryPressure: sinon.stub().returns(false), + forceGarbageCollection: sinon.stub().resolves() + } as any; + + // Mock state manager + stateManager = { + addMapping: sinon.stub() + } as any; + + assetQueue = new AssetQueue({ + maxConcurrency: 2, + maxRetries: 3, + retryDelay: 100, + enableRateLimiting: false, + rateLimitDelay: 0, + memoryMonitor, + stateManager, + context: { test: true } + }); + + assetQueue.setProcessor(mockProcessor); + }); + + describe('constructor', () => { + it('should initialize with correct configuration', () => { + expect(assetQueue).to.be.instanceOf(AssetQueue); + }); + + it('should create default queue', () => { + const defaultQueue = AssetQueue.createDefault(); + expect(defaultQueue).to.be.instanceOf(AssetQueue); + }); + + it('should create queue for large datasets', () => { + const largeDatasetQueue = AssetQueue.createForLargeDataset(memoryMonitor, stateManager); + expect(largeDatasetQueue).to.be.instanceOf(AssetQueue); + }); + }); + + describe('enqueue', () => { + it('should add asset to queue', () => { + const asset = { uid: 'asset1', title: 'Test Asset' }; + const id = assetQueue.enqueue(asset); + + expect(id).to.be.a('string'); + + const stats = assetQueue.getStats(); + expect(stats.pending).to.equal(1); + expect(stats.total).to.equal(1); + }); + + it('should start processing automatically', (done) => { + const asset = { uid: 'asset1', title: 'Test Asset' }; + + assetQueue.on('itemCompleted', (item, result) => { + expect(result.uid).to.equal('new-uid'); + done(); + }); + + assetQueue.enqueue(asset); + }); + }); + + describe('enqueueBatch', () => { + it('should add multiple assets to queue', () => { + const assets = [ + { uid: 'asset1', title: 'Asset 1' }, + { uid: 'asset2', title: 'Asset 2' }, + { uid: 'asset3', title: 'Asset 3' } + ]; + + const ids = assetQueue.enqueueBatch(assets); + + expect(ids).to.have.length(3); + + const stats = assetQueue.getStats(); + expect(stats.pending).to.equal(3); + }); + }); + + describe('processing', () => { + it('should process assets with controlled concurrency', async () => { + const assets = [ + { uid: 'asset1', title: 'Asset 1' }, + { uid: 'asset2', title: 'Asset 2' }, + { uid: 'asset3', title: 'Asset 3' } + ]; + + let processingCount = 0; + let maxConcurrent = 0; + + (mockProcessor.processAsset as sinon.SinonStub).callsFake(async () => { + processingCount++; + maxConcurrent = Math.max(maxConcurrent, processingCount); + + // Simulate processing time + await new Promise(resolve => setTimeout(resolve, 50)); + + processingCount--; + return { uid: 'new-uid', url: 'new-url' }; + }); + + assetQueue.enqueueBatch(assets); + await assetQueue.waitForCompletion(); + + // Should not exceed max concurrency (2) + expect(maxConcurrent).to.be.at.most(2); + }); + + it('should handle processing errors with retries', async () => { + const asset = { uid: 'asset1', title: 'Test Asset' }; + let attemptCount = 0; + + (mockProcessor.processAsset as sinon.SinonStub).callsFake(async () => { + attemptCount++; + if (attemptCount < 3) { + throw new Error('Processing failed'); + } + return { uid: 'new-uid', url: 'new-url' }; + }); + + let completedCalled = false; + assetQueue.on('itemCompleted', () => { + completedCalled = true; + }); + + assetQueue.enqueue(asset); + await assetQueue.waitForCompletion(); + + expect(attemptCount).to.equal(3); + expect(completedCalled).to.be.true; + }); + + it('should fail after max retries', async () => { + const asset = { uid: 'asset1', title: 'Test Asset' }; + + (mockProcessor.processAsset as sinon.SinonStub).rejects(new Error('Always fails')); + + let failedCalled = false; + assetQueue.on('itemFailed', () => { + failedCalled = true; + }); + + assetQueue.enqueue(asset); + await assetQueue.waitForCompletion(); + + expect(failedCalled).to.be.true; + }); + + it('should check memory pressure during processing', async () => { + const assets = [ + { uid: 'asset1', title: 'Asset 1' }, + { uid: 'asset2', title: 'Asset 2' } + ]; + + (memoryMonitor.checkMemoryPressure as sinon.SinonStub).returns(true); + + assetQueue.enqueueBatch(assets); + await assetQueue.waitForCompletion(); + + expect(memoryMonitor.checkMemoryPressure).to.have.been.called; + expect(memoryMonitor.forceGarbageCollection).to.have.been.called; + }); + }); + + describe('getStats', () => { + it('should return correct statistics', () => { + const assets = [ + { uid: 'asset1', title: 'Asset 1' }, + { uid: 'asset2', title: 'Asset 2' } + ]; + + assetQueue.enqueueBatch(assets); + + const stats = assetQueue.getStats(); + expect(stats.total).to.equal(2); + expect(stats.pending).to.equal(2); + expect(stats.processing).to.equal(0); + expect(stats.completed).to.equal(0); + expect(stats.failed).to.equal(0); + }); + }); + + describe('clearCompleted', () => { + it('should clear completed items from queue', async () => { + const asset = { uid: 'asset1', title: 'Test Asset' }; + + assetQueue.enqueue(asset); + await assetQueue.waitForCompletion(); + + const clearedCount = assetQueue.clearCompleted(); + expect(clearedCount).to.equal(1); + + const stats = assetQueue.getStats(); + expect(stats.total).to.equal(0); + }); + }); + + describe('pause and resume', () => { + it('should pause and resume processing', () => { + assetQueue.pause(); + + const asset = { uid: 'asset1', title: 'Test Asset' }; + assetQueue.enqueue(asset); + + // Should not process while paused + const stats = assetQueue.getStats(); + expect(stats.pending).to.equal(1); + + assetQueue.resume(); + // Processing should resume + }); + }); + + describe('clear', () => { + it('should clear entire queue', () => { + const assets = [ + { uid: 'asset1', title: 'Asset 1' }, + { uid: 'asset2', title: 'Asset 2' } + ]; + + assetQueue.enqueueBatch(assets); + assetQueue.clear(); + + const stats = assetQueue.getStats(); + expect(stats.total).to.equal(0); + expect(stats.completed).to.equal(0); + expect(stats.failed).to.equal(0); + }); + }); +}); \ No newline at end of file diff --git a/packages/contentstack-import/test/unit/utils/incremental-state.test.ts b/packages/contentstack-import/test/unit/utils/incremental-state.test.ts new file mode 100644 index 000000000..6a77a0d9a --- /dev/null +++ b/packages/contentstack-import/test/unit/utils/incremental-state.test.ts @@ -0,0 +1,177 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { existsSync, readFileSync, writeFileSync } from 'node:fs'; +import { IncrementalStateManager } from '../../../src/utils/incremental-state'; + +describe('IncrementalStateManager', () => { + let stateManager: IncrementalStateManager; + let existsSyncStub: sinon.SinonStub; + let readFileSyncStub: sinon.SinonStub; + let writeFileSyncStub: sinon.SinonStub; + + beforeEach(() => { + existsSyncStub = sinon.stub().returns(false); + readFileSyncStub = sinon.stub(); + writeFileSyncStub = sinon.stub(); + + // Mock fs functions + sinon.stub(require('node:fs'), 'existsSync').callsFake(existsSyncStub); + sinon.stub(require('node:fs'), 'readFileSync').callsFake(readFileSyncStub); + sinon.stub(require('node:fs'), 'writeFileSync').callsFake(writeFileSyncStub); + + stateManager = new IncrementalStateManager({ + stateFilePath: '/test/path/.import-state.json', + batchSize: 5, + enableBackup: true, + context: { test: true } + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe('constructor', () => { + it('should initialize with empty state when no file exists', () => { + const counts = stateManager.getMappingCount(); + expect(counts.assets).to.equal(0); + expect(counts.folders).to.equal(0); + expect(counts.urls).to.equal(0); + }); + + it('should load existing state when file exists', () => { + existsSyncStub.returns(true); + readFileSyncStub.returns(JSON.stringify({ + assets: { 'asset1': 'new-asset1' }, + folders: { 'folder1': 'new-folder1' }, + urls: { 'url1': 'new-url1' } + })); + + const newStateManager = new IncrementalStateManager({ + stateFilePath: '/test/path/.import-state.json', + batchSize: 5, + enableBackup: true, + context: { test: true } + }); + + const counts = newStateManager.getMappingCount(); + expect(counts.assets).to.equal(1); + expect(counts.folders).to.equal(1); + expect(counts.urls).to.equal(1); + }); + + it('should create default state manager', () => { + const defaultManager = IncrementalStateManager.createDefault('/test/dir'); + expect(defaultManager).to.be.instanceOf(IncrementalStateManager); + }); + + it('should create state manager for large datasets', () => { + const largeDatasetManager = IncrementalStateManager.createForLargeDataset('/test/dir'); + expect(largeDatasetManager).to.be.instanceOf(IncrementalStateManager); + }); + }); + + describe('addMapping', () => { + it('should add asset mapping', () => { + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + + expect(stateManager.getMapping('asset1', 'asset')).to.equal('new-asset1'); + expect(stateManager.hasMapping('asset1', 'asset')).to.be.true; + }); + + it('should add folder mapping', () => { + stateManager.addMapping('folder1', 'new-folder1', 'folder'); + + expect(stateManager.getMapping('folder1', 'folder')).to.equal('new-folder1'); + expect(stateManager.hasMapping('folder1', 'folder')).to.be.true; + }); + + it('should trigger persistence when batch size is reached', () => { + // Add mappings up to batch size (5) + for (let i = 1; i <= 5; i++) { + stateManager.addMapping(`asset${i}`, `new-asset${i}`, 'asset'); + } + + // Should have triggered persistence + expect(writeFileSyncStub.called).to.be.true; + }); + }); + + describe('getAllMappings', () => { + beforeEach(() => { + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + stateManager.addMapping('asset2', 'new-asset2', 'asset'); + stateManager.addMapping('folder1', 'new-folder1', 'folder'); + }); + + it('should return all asset mappings', () => { + const assetMappings = stateManager.getAllMappings('asset'); + expect(assetMappings).to.deep.equal({ + 'asset1': 'new-asset1', + 'asset2': 'new-asset2' + }); + }); + + it('should return all folder mappings', () => { + const folderMappings = stateManager.getAllMappings('folder'); + expect(folderMappings).to.deep.equal({ + 'folder1': 'new-folder1' + }); + }); + }); + + describe('persistState', () => { + it('should write state to file', () => { + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + stateManager.persistState(); + + expect(writeFileSyncStub.called).to.be.true; + + const writeCall = writeFileSyncStub.getCall(0); + expect(writeCall.args[0]).to.equal('/test/path/.import-state.json'); + + const writtenData = JSON.parse(writeCall.args[1]); + expect(writtenData.assets).to.deep.equal({ 'asset1': 'new-asset1' }); + }); + + it('should create backup when enabled and file exists', () => { + existsSyncStub.returns(true); + readFileSyncStub.returns('existing content'); + + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + stateManager.persistState(); + + // Should have written backup file + expect(writeFileSyncStub.calledTwice).to.be.true; + const backupCall = writeFileSyncStub.getCall(0); + expect(backupCall.args[0]).to.equal('/test/path/.import-state.json.backup'); + expect(backupCall.args[1]).to.equal('existing content'); + }); + }); + + describe('flushState', () => { + it('should force immediate persistence', async () => { + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + + await stateManager.flushState(); + + expect(writeFileSyncStub.called).to.be.true; + }); + }); + + describe('getStats', () => { + beforeEach(() => { + stateManager.addMapping('asset1', 'new-asset1', 'asset'); + stateManager.addMapping('folder1', 'new-folder1', 'folder'); + }); + + it('should return correct statistics', () => { + const stats = stateManager.getStats(); + + expect(stats.mappingCounts.assets).to.equal(1); + expect(stats.mappingCounts.folders).to.equal(1); + expect(stats.pendingWrites).to.equal(2); + expect(stats.stateFileExists).to.be.false; + }); + }); +}); \ No newline at end of file diff --git a/packages/contentstack-import/test/unit/utils/memory-monitor.test.ts b/packages/contentstack-import/test/unit/utils/memory-monitor.test.ts new file mode 100644 index 000000000..f54c3737b --- /dev/null +++ b/packages/contentstack-import/test/unit/utils/memory-monitor.test.ts @@ -0,0 +1,148 @@ +import { expect } from 'chai'; +import sinon from 'sinon'; +import { MemoryMonitor } from '../../../src/utils/memory-monitor'; + +describe('MemoryMonitor', () => { + let memoryMonitor: MemoryMonitor; + let processMemoryUsageStub: sinon.SinonStub; + let globalGcStub: sinon.SinonStub; + let setTimeoutStub: sinon.SinonStub; + + beforeEach(() => { + // Mock process.memoryUsage + processMemoryUsageStub = sinon.stub(process, 'memoryUsage').returns({ + rss: 100 * 1024 * 1024, // 100MB + heapTotal: 80 * 1024 * 1024, // 80MB + heapUsed: 60 * 1024 * 1024, // 60MB + external: 10 * 1024 * 1024, // 10MB + arrayBuffers: 5 * 1024 * 1024, // 5MB + }); + + // Mock global.gc + globalGcStub = sinon.stub(); + (global as any).gc = globalGcStub; + + // Mock setTimeout + setTimeoutStub = sinon.stub(global, 'setTimeout').callsFake((fn: Function) => { + fn(); + return {} as any; + }); + + memoryMonitor = new MemoryMonitor({ + thresholdMB: 50, // 50MB threshold + gcCooldownMs: 1000, + enableLogging: false, + logInterval: 5000, + }); + }); + + afterEach(() => { + processMemoryUsageStub.restore(); + setTimeoutStub.restore(); + delete (global as any).gc; + }); + + describe('constructor', () => { + it('should initialize with correct configuration', () => { + expect(memoryMonitor).to.be.instanceOf(MemoryMonitor); + }); + + it('should create default monitor', () => { + const defaultMonitor = MemoryMonitor.createDefault(); + expect(defaultMonitor).to.be.instanceOf(MemoryMonitor); + }); + + it('should create monitor for large datasets', () => { + const largeDatasetMonitor = MemoryMonitor.createForLargeDataset(); + expect(largeDatasetMonitor).to.be.instanceOf(MemoryMonitor); + }); + }); + + describe('checkMemoryPressure', () => { + it('should return true when memory usage exceeds threshold', () => { + // 60MB used > 50MB threshold + const result = memoryMonitor.checkMemoryPressure(); + expect(result).to.be.true; + }); + + it('should return false when memory usage is below threshold', () => { + // Mock lower memory usage + processMemoryUsageStub.returns({ + rss: 40 * 1024 * 1024, + heapTotal: 35 * 1024 * 1024, + heapUsed: 30 * 1024 * 1024, // 30MB < 50MB threshold + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, + }); + + const result = memoryMonitor.checkMemoryPressure(); + expect(result).to.be.false; + }); + }); + + describe('getMemoryStats', () => { + it('should return correct memory statistics', () => { + const stats = memoryMonitor.getMemoryStats(); + + expect(stats).to.have.property('heapUsed', 60 * 1024 * 1024); + expect(stats).to.have.property('heapTotal', 80 * 1024 * 1024); + expect(stats).to.have.property('rss', 100 * 1024 * 1024); + expect(stats).to.have.property('heapUsedMB', 60); + expect(stats).to.have.property('heapTotalMB', 80); + expect(stats).to.have.property('rssMB', 100); + }); + }); + + describe('forceGarbageCollection', () => { + it('should call global.gc when available', async () => { + await memoryMonitor.forceGarbageCollection(); + expect(globalGcStub.calledOnce).to.be.true; + }); + + it('should respect cooldown period', async () => { + // First call should work + await memoryMonitor.forceGarbageCollection(); + expect(globalGcStub.calledOnce).to.be.true; + + // Second call immediately should be skipped + globalGcStub.resetHistory(); + await memoryMonitor.forceGarbageCollection(); + expect(globalGcStub.called).to.be.false; + }); + + it('should handle missing global.gc gracefully', async () => { + delete (global as any).gc; + + // Should not throw + await memoryMonitor.forceGarbageCollection(); + }); + }); + + describe('isCriticalMemoryPressure', () => { + it('should return true when memory usage exceeds critical threshold', () => { + // Mock very high memory usage (90MB > 75MB critical threshold) + processMemoryUsageStub.returns({ + rss: 100 * 1024 * 1024, + heapTotal: 95 * 1024 * 1024, + heapUsed: 90 * 1024 * 1024, // 90MB > 75MB (50MB * 1.5) + external: 5 * 1024 * 1024, + arrayBuffers: 2 * 1024 * 1024, + }); + + const result = memoryMonitor.isCriticalMemoryPressure(); + expect(result).to.be.true; + }); + + it('should return false when memory usage is below critical threshold', () => { + const result = memoryMonitor.isCriticalMemoryPressure(); + expect(result).to.be.false; // 60MB < 75MB critical threshold + }); + }); + + describe('getMemoryPressureLevel', () => { + it('should return correct pressure level percentage', () => { + const level = memoryMonitor.getMemoryPressureLevel(); + expect(level).to.equal(120); // 60MB / 50MB * 100 = 120% + }); + }); +}); \ No newline at end of file