PARQUET-2126: Make cached (de)compressors thread-safe
CodecFactory cached instances of compressors and decompressors across threads, which was not thread-safe. This change makes the caches thread-local.
Seems good to me (non-binding!). Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix.
Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior. I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though. I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.
Seems good to me (non-binding!). Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix.
Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior. I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though. I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.
Alright. You have a point. If the maintainers want me to delete that stuff, they can let me know, and I'll go ahead and do it.
If we change it to be per thread, then would it be a problem in the scenario where short living threads come and go? When the thread stopped, we might not know and leak here.
And, please add tests
If we change it to be per thread, then would it be a problem in the scenario where short living threads come and go? When the thread stopped, we might not know and leak here.
And, please add tests
This is why I use the concurrent hash map, indexed by the thread. Short lived threads are not a problem in that case.
I can't think of how I would go about testing this. Do you have any ideas? I'll have a look to see if there exist any tests already and see if I can add something.
My question is when a thread exits, we don't have a corresponding evict operation on the map. Using thread pool might be OK if the thread object is not changed, but not sure if there is a scenario where threads are created/exited quickly and we leak in that case.
My question is when a thread exits, we don't have a corresponding evict operation on the map. Using thread pool might be OK if the thread object is not changed, but not sure if there is a scenario where threads are created/exited quickly and we leak in that case.
No matter what thread release() is called from, it will clean up all (de)compressors from all threads. I designed it specifically this way so that a leak won't happen. As long as close/release is called when it should be.
Note that it's not appropriate to call close or release while (de)compression is still going on. If someone does that, it might still work, but it would be a protocol violation. The usage pattern should be:
- Create Codec factory
- Create worker threads
- Threads create codecs
- Threads finish using codecs
- Threads terminate
- The thread that created the worker threads waits until those threads are done
- close/release is called.
Someone might do something different, but that would be a bug no different from someone closing a file in one thread while it's being written to in another.
I added a test.
Thanks for addressing the feedback! What I meant was that ideally when 'Threads terminate' happens, it should clean up the compressor/decompressor immediately. I understand we won't leak in the end of 'close/release is called' though. Do you know how large is the compressor/decompressor in memory?
Seems good to me (non-binding!). Revisiting whether or not the caching strategy make sense might be worthwhile, but that shouldn't stop this fix.
Small comment: I would remove most of the references to the JIRA ticket as well as descriptions of the old behavior. I think the comment that describes the new behavior and why it might be unintuitive with a reference to the JIRA makes sense though. I'll defer to others in the project (again, I'm not a committer) if there are existing standards for this though.
@dossett, we don't have a standard like that. It seems OK to have. What do you think?
@shangxinli I do not feel strongly about it. I think historical context is better kept in JIRAs and PR discussion than in code comments, but that is just a style choice if there's no standard. (I appreciate you following up, btw!)
@theosib-amazon Do you still have time for addressing the feedback? I think we are very close to merge.
@theosib-amazon Do you still have time for addressing the feedback? I think we are very close to merge.
I'm not really sure which feedback to address. Are you concerned about leaking if release/close isn't called? I'm pretty sure that would result in leaks. I suppose that might be solvable if we added a finalize() method that called release(). That might solve the problem. Should we do that?
Are you concerned about leaking if release/close isn't called? I'm pretty sure that would result in leaks. I suppose that might be solvable if we added a finalize() method that called release(). That might solve the problem. Should we do that?
My 2c: finalize() is problematic and deprecated in Java so I don't recommend adding it. The requirement here that the caller must close after they're finished is totally reasonable and to be found in APIs everywhere.
@theosib-amazon, I am not concerned if release/close isn't called and I agree the caller must call release/close after finishing. My question is that before release/close is called, there could be short-living threads that are used to create the compressor/decompressor in the cache. Those short-living threads exit and the cache is not aware of that, then that causes the cache grows with a lot of dead compressor/decompressors. In the scenario where short-living threads just come and go as a normal business, this could be a problem. I know normally it is not a problem because in most of the cases we use thread pool but I am just not sure there is a corner case like that. Parquet is a low-level library and is used in so many cases.
I am sorry if I didn't make my previous comment more obvious.
One option is to provide another API call that releases the cached instance for only the current thread.
Maybe what we should do is have a release on the factory that releases everything and a release on the codec itself that releases only itself. The codec would have to maintain a reference to the factory so it can tell the factory to delete the instance from the container.
I did some poking around. It looks like if you call release() on a codec, it (a) resets the codec (freeing resources, I think) and (b) returns it to a pool of codecs without actually destroying the codec.
Later, when release() is called on the factory, it just calls release() again on each of the codecs, returning them to the pool. The only other effect is that references are removed from a container in the factory.
The only question, then, is what happens if release is called twice on a codec. It looks like nothing happens because CodecPool.payback() will return false when the codec is already in the pool. Moreover, I'm pretty sure the original implementation already did this.
So I think the solution it to literally do nothing. The new usage pattern is now:
- Create Codec factory
- Create worker threads
- Threads create codecs
- Threads finish using codecs
- Threads optionally call release on their codecs if they want to free resources right away.
- Threads terminate
- The thread that created the worker threads waits until those threads are done
- release is called on the factory, cleaning up any codecs that were not released already
I just thought of something that makes me nervous about this PR that requires further investigation. Consider the following scenario:
- Thread A allocates a codec
- Thread A releases the codec, which puts it into a global pool of codecs
- Thread B allocates the same kind of codec, which comes from that same pool
- Thread A allocates that same kind of codec again, but it gets it from the factory's map instead of the pool
I'm concerned that this could result in the same codec being given to both threads at the same time. The solution would be to remove the codec from the factory's map when release() is called on the codec itself.
Note that this problem is not introduced by this PR, since the double pooling existed before. The irony is that the pool is thread-safe, while the factory was not.
ypu might want to look at WeakReferences...we've been using them recently to implement threadlocal-like storage where GCs will trigger cleanup of instances which aren't being used any more https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakReferenceThreadMap.java https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/WeakReferenceMap.java
the evolution of that code would be to implement the callback the JVM can issue on reference expiry and so do extra cleanup there