diff --git a/src/common/storage/backends/StorageClient.js b/src/common/storage/backends/StorageClient.js index c24c3c9bd..f1178d5d1 100644 --- a/src/common/storage/backends/StorageClient.js +++ b/src/common/storage/backends/StorageClient.js @@ -1,7 +1,7 @@ /* globals define*/ define([ 'client/logger', - 'deepforge/gmeConfig' + 'deepforge/gmeConfig', ], function( Logger, gmeConfig @@ -9,6 +9,7 @@ define([ const fetch = require.isBrowser ? window.fetch : require.nodeRequire('node-fetch'); const Headers = require.isBrowser ? window.Headers : fetch.Headers; + const stream = require.isBrowser ? null : require.nodeRequire('stream'); const StorageClient = function(id, name, logger) { this.id = id; this.name = name; @@ -48,10 +49,18 @@ define([ throw new Error(`File download not implemented for ${this.name}`); }; + StorageClient.prototype.getFileStream = async function(/*dataInfo*/) { + throw new Error(`Stream download not implemented for ${this.name}`); + }; + StorageClient.prototype.putFile = async function(/*filename, content*/) { throw new Error(`File upload not supported by ${this.name}`); }; + StorageClient.prototype.putFileStream = async function(/*filename, stream*/) { + throw new Error(`Stream upload not supported by ${this.name}`); + }; + StorageClient.prototype.deleteFile = async function(/*dataInfo*/) { throw new Error(`File deletion not supported by ${this.name}`); }; @@ -87,5 +96,17 @@ define([ throw new Error(`stat not implemented for ${this.name}`); }; + StorageClient.prototype.ensureStreamSupport = function() { + if(require.isBrowser) { + throw new Error('Streams are not supported in browser'); + } + }; + + StorageClient.prototype.ensureReadableStream = function (obj) { + if(stream && !(obj instanceof stream.Readable)) { + throw new Error(`${obj} should be an instance of a readable stream`); + } + }; + return StorageClient; }); diff --git a/src/common/storage/backends/gme/Client.js b/src/common/storage/backends/gme/Client.js index 2eac706fe..04414f893 100644 --- a/src/common/storage/backends/gme/Client.js +++ b/src/common/storage/backends/gme/Client.js @@ -35,11 +35,24 @@ define([ return await this.blobClient.getObject(data); }; + GMEStorage.prototype.getFileStream = async function(dataInfo) { + const url = await this.getDownloadURL(dataInfo); + const response = await this.fetch(url, {method: 'GET'}); + return response.body; + }; + GMEStorage.prototype.putFile = async function(filename, content) { const hash = await this.blobClient.putFile(filename, content); return this.createDataInfo(hash); }; + GMEStorage.prototype.putFileStream = async function(filename, stream) { + this.ensureStreamSupport(); + this.ensureReadableStream(stream); + const hash = await this.blobClient.putFile(filename, stream); + return this.createDataInfo(hash); + }; + GMEStorage.prototype.deleteDir = GMEStorage.prototype.deleteFile = async function() {}; diff --git a/src/common/storage/backends/s3/Client.js b/src/common/storage/backends/s3/Client.js index 498931961..1b5840b25 100644 --- a/src/common/storage/backends/s3/Client.js +++ b/src/common/storage/backends/s3/Client.js @@ -63,7 +63,6 @@ define([ this.logger.error(`Failed to create bucket ${this.bucketName} in S3 server.`); throw err; } - } }; @@ -78,6 +77,16 @@ define([ return data.Body; }; + S3Storage.prototype.getFileStream = async function(dataInfo) { + const {endpoint, bucketName, filename} = dataInfo.data; + const {accessKeyId, secretAccessKey} = this.config; + const s3Client = await this.getS3Client({endpoint, accessKeyId, secretAccessKey}); + return s3Client.getObject({ + Bucket: bucketName, + Key: filename + }).createReadStream(); + }; + S3Storage.prototype.putFile = async function (filename, content) { const s3Client = await this.getS3Client(); await this.createBucketIfNeeded(s3Client); @@ -98,6 +107,31 @@ define([ return dataInfo; }; + S3Storage.prototype.putFileStream = async function(filename, stream) { + this.ensureStreamSupport(); + this.ensureReadableStream(stream); + const s3Client = await this.getS3Client(); + const params = await this.getUploadParams(s3Client, filename, stream); + try { + await s3Client.upload(params).promise(); + } catch (err) { + throw new Error(`Unable to upload ${stream}: ${err.message}`); + } + const dataInfo = await this.stat(filename); + this.logger.debug(`Successfully uploaded file ${filename} to the S3 server using stream`); + return dataInfo; + }; + + S3Storage.prototype.getUploadParams = async function(s3Client, filename, body) { + await this.createBucketIfNeeded(s3Client); + this.logger.debug(`Created bucket ${this.bucketName}`); + return { + Body: require.isBrowser ? new Blob([body]) : body, + Bucket: this.bucketName, + Key: filename, + }; + }; + S3Storage.prototype.deleteDir = async function (dirname) { const s3Client = await this.getS3Client(); const {Contents} = await s3Client.listObjectsV2({ diff --git a/src/common/storage/backends/sciserver-files/Client.js b/src/common/storage/backends/sciserver-files/Client.js index 1f0871bbc..3a89fb572 100644 --- a/src/common/storage/backends/sciserver-files/Client.js +++ b/src/common/storage/backends/sciserver-files/Client.js @@ -18,9 +18,7 @@ define([ SciServerFiles.prototype = Object.create(StorageClient.prototype); SciServerFiles.prototype.getFile = async function (dataInfo) { - let {volume, filename, volumePool='Storage'} = dataInfo.data; - const url = `file/${volumePool}/${volume}/${filename}`; - const response = await this.fetch('download', url); + const response = await this.getDownloadResponse(dataInfo); if (require.isBrowser) { return await response.arrayBuffer(); } else { @@ -28,6 +26,11 @@ define([ } }; + SciServerFiles.prototype.getFileStream = async function(dataInfo) { + const response = await this.getDownloadResponse(dataInfo); + return response.body; + }; + SciServerFiles.prototype.putFile = async function (filename, content) { if (!this.volume) { throw new Error('Cannot upload file to SciServer. No volume specified.'); @@ -49,6 +52,14 @@ define([ return this.createDataInfo(metadata); }; + SciServerFiles.prototype.putFileStream = async function(filename, stream) { + this.ensureStreamSupport(); + this.ensureReadableStream(stream); + await this.putFile(filename, stream); + // stat necessary because of byteLength + return await this.stat(filename); + }; + SciServerFiles.prototype.deleteDir = async function (dirname) { const url = `data/${this.volumePool}/${this.volume}/${dirname}`; const opts = {method: 'DELETE'}; @@ -120,5 +131,11 @@ define([ return this.createDataInfo(metadata); }; + SciServerFiles.prototype.getDownloadResponse = async function (dataInfo) { + let {volume, filename, volumePool='Storage'} = dataInfo.data; + const url = `file/${volumePool}/${volume}/${filename}`; + return await this.fetch('download', url); + }; + return SciServerFiles; }); diff --git a/src/common/storage/index.js b/src/common/storage/index.js index a00d86199..21d42c9e9 100644 --- a/src/common/storage/index.js +++ b/src/common/storage/index.js @@ -72,6 +72,11 @@ define([ return client.getFile(dataInfo); }; + Storage.getFileStream = async function(dataInfo, logger, configs) { + const client = await this.getClientForDataInfo(dataInfo, logger, configs); + return client.getFileStream(dataInfo); + }; + Storage.deleteFile = async function(dataInfo, logger, configs) { const client = await this.getClientForDataInfo(dataInfo, logger, configs); return client.deleteFile(dataInfo); diff --git a/test/integration/StorageBackends.spec.js b/test/integration/StorageBackends.spec.js index 1b7e05ed8..979108690 100644 --- a/test/integration/StorageBackends.spec.js +++ b/test/integration/StorageBackends.spec.js @@ -2,27 +2,35 @@ describe('Storage Features Test', function () { this.timeout(5000); const assert = require('assert'); + const fs = require('fs'); const testFixture = require('../globals'); const {promisify} = require('util'); + let {Writable, pipeline} = require('stream'); + pipeline = promisify(pipeline); const {requirejs} = testFixture; const TEST_STORAGE = 'storageFeaturesSpec'; const TEST_PATH = `${TEST_STORAGE}/dummyFile`; + const TEST_FILE_NAME = 'TestFile'; + const CONTENT = 'A Quick Brown Fox Jumped over a lazy Dog'; const logger = testFixture.logger.fork('StorageTests'); const Storage = requirejs('deepforge/storage/index'); const gmeConfig = testFixture.getGmeConfig(); const server = new testFixture.WebGME.standaloneServer(gmeConfig); server.start = promisify(server.start); server.stop = promisify(server.stop); + const { StringDecoder } =require('string_decoder'); const storageBackends = Storage.getAvailableBackends(); let StorageConfigs, client, clients = {}, - dataInfo; + dataInfoBuffer, + dataInfoStream; before(async function () { await server.start(); + fs.writeFileSync(TEST_FILE_NAME, CONTENT); StorageConfigs = await testFixture.getStorageConfigs(); for (const backend of storageBackends) { client = await Storage.getClient(backend, logger, StorageConfigs[backend]); @@ -35,37 +43,52 @@ describe('Storage Features Test', function () { for (const backend of storageBackends) { it(`should putFile using ${backend}`, async function() { this.retries(maxRetries(backend)); - dataInfo = await clients[backend].putFile(TEST_PATH, - Buffer.from('A Quick Brown Fox Jumped over a lazy Dog.')); + dataInfoBuffer = await clients[backend].putFile(TEST_PATH, + Buffer.from(CONTENT)); }); it(`should getFile using ${backend}`, async function() { this.retries(maxRetries(backend)); - await clients[backend].getFile(dataInfo); + await clients[backend].getFile(dataInfoBuffer); }); it(`should getCachePath using ${backend}`, async () => { - await clients[backend].getCachePath(dataInfo); + await clients[backend].getCachePath(dataInfoBuffer); }); it(`should stat file using ${backend}`, async () => { - if(backend !== 'gme'){ - await clients[backend].stat(TEST_PATH); - } else { - assert.rejects(clients[backend].stat(TEST_PATH), { - name: 'Error', - message: 'stat not implemented for WebGME Blob Storage' - }); - } + if(backend !== 'gme'){ + await clients[backend].stat(TEST_PATH); + } else { + assert.rejects(clients[backend].stat(TEST_PATH), { + name: 'Error', + message: 'stat not implemented for WebGME Blob Storage' + }); + } + }); + + it(`should putFileStream using ${backend}`, async function () { + this.retries(maxRetries([backend])); + const stream = fs.createReadStream(TEST_FILE_NAME); + const pathInStorageBackend = `${TEST_STORAGE}/${TEST_FILE_NAME}`; + dataInfoStream = await clients[backend].putFileStream(pathInStorageBackend, stream); + }); + + it(`should getFileStream using ${backend}`, async function () { + this.retries(maxRetries([backend])); + const inputStream = await clients[backend].getFileStream(dataInfoStream); + await verifyStreamContent(inputStream); }); it(`should deleteFile using ${backend}`, async function() { this.retries(maxRetries(backend)); - await clients[backend].deleteFile(dataInfo); + await clients[backend].deleteFile(dataInfoBuffer); + await clients[backend].deleteFile(dataInfoStream); }); } after(async function () { + fs.unlinkSync(TEST_FILE_NAME); await server.stop(); }); @@ -75,4 +98,29 @@ describe('Storage Features Test', function () { } return 1; } + + async function verifyStreamContent(inputStream) { + const outputStream = new StringWritable(); + await pipeline(inputStream, outputStream); + assert(outputStream.data === CONTENT); + } + + class StringWritable extends Writable { + constructor(options) { + super(options); + this._decoder = new StringDecoder(options && options.defaultEncoding); + this.data = ''; + } + _write(chunk, encoding, callback) { + if (encoding === 'buffer') { + chunk = this._decoder.write(chunk); + } + this.data += chunk; + callback(); + } + _final(callback) { + this.data += this._decoder.end(); + callback(); + } + } });