[BEAM-896] FileBasedSink removes temp directory#1278
[BEAM-896] FileBasedSink removes temp directory#1278jkff wants to merge 4 commits intoapache:masterfrom
Conversation
Also fixes RAT plugin in pom.xml to ignore .idea (found while running integration tests locally).
|
R: @peihe |
|
Pei is churning on IOCF API so he should look at this. |
|
Sounds good. Note, though, that this CL doesn't change IOChannelFactory APIs - it touches FileIOChannelFactory, but only makes one error message more detailed. All the changes are in FileBasedSink per se and its private classes. |
| * @param <T> the type of values written to the sink. | ||
| */ | ||
| public abstract class FileBasedSink<T> extends Sink<T> { | ||
| private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriteOperation.class); |
| LOG.debug("Directory {} already doesn't exist", directory); | ||
| return; | ||
| } | ||
| Collection<String> matches = factory.match(new File(directory, "*").getAbsolutePath()); |
There was a problem hiding this comment.
The method name indicates it deletes sub directories.
It requires sub directories becomes empty before they are removed.
I think FileIOChannelFactory.match() can guarantee that order by replacing preOrderTraversal with postOrderTraversal. (Long term, we probably want to specify the order with a MatchOptions)
For this PR, I suggest to choose a better method name. (We can enrich the functionality later when we need it)
|
Once my other comment is addressed, then LGTM. |
|
R: @dhalperi for committer. Also, TODO for myself: backport into https://github.com/GoogleCloudPlatform/DataflowJavaSDK |
|
|
||
| public GcsOperations(PipelineOptions options) { | ||
| public GcsOperations(IOChannelFactory factory, PipelineOptions options) { | ||
| gcsUtil = new GcsUtilFactory().create(options); |
There was a problem hiding this comment.
what's with the factory constructor? It seems unused.
| */ | ||
| void remove(Collection<String> filenames) throws IOException; | ||
| /** Removes a directory and the files in it (but not subdirectories). */ | ||
| void removeDirectoryAndFiles(String directory) throws IOException; |
| LOG.debug("Removing {} temporary files found under {}", matches.size(), directory); | ||
| gcsUtil.remove(matches); | ||
| // No need to remove the directory itself: GCS doesn't have directories, so if the directory | ||
| // is empty, then it already doesn't exist. |
There was a problem hiding this comment.
why match instead of deleting the specific files whose names we have?
There was a problem hiding this comment.
Explained in a comment under FileBasedWriteOperation.finalize()
There was a problem hiding this comment.
Shouldn't we match-and-union rather than just match? S3 eventual consistency...
There was a problem hiding this comment.
Fair. Added that. Still not perfect, but at least it will be perfect in pipelines without bundle failures, which is, hopefully, most pipelines.
This is a weaker but backward-compatible version of respective Beam changes: apache/beam#1050 apache/beam#1278
This is a weaker but backward-compatible version of respective Beam changes: apache/beam#1050 apache/beam#1278
Also fixes some documentation in FileBasedSink.
Also fixes RAT plugin in pom.xml to ignore .idea
(found while running integration tests locally).
R: @dhalperi