janusgraph icon indicating copy to clipboard operation
janusgraph copied to clipboard

Remove CQL ExecutorService configuration option

Open porunov opened this issue 3 years ago • 14 comments

DO NOT MERGE

As a conclusion we decided not to merge this PR but instead to work directly on removing parallel backend ops executor service in favor of asynchronious queries execution. I'm not closing this PR as for now to have a better visibility of the discussion we had below.


Initial PR comment is below:

Instead of allowing users to use an executor service for CQL parallelism we should engage users in configuring internall CQL driver throughput correctly. Moreover having this configuration existing just adds complexity to overall JanusGraph understanding for new learners even so we discourage users from using it in production

Related to #2741

As for the next steps I think we should remove storage.parallel-backend-executor-service and instead relay on async execution of storage drivers. Those storage drivers which don't support it can simply use their own executor service instead of forcing all storage backends to use this executor service. If so, we can go ahead and reuse DataStax CQL async execution instead of limiting parallel ops execution via a global (per JanusGraph instance) executor service which be default drastically limits parallelism for queries which use parallel ops.

Signed-off-by: Oleksandr Porunov [email protected]


Thank you for contributing to JanusGraph!

In order to streamline the review of the contribution we ask you to ensure the following steps have been taken:

For all changes:

  • [ ] Is there an issue associated with this PR? Is it referenced in the commit message?
  • [ ] Does your PR body contain #xyz where xyz is the issue number you are trying to resolve?
  • [x] Has your PR been rebased against the latest commit within the target branch (typically master)?
  • [x] Is your initial contribution a single, squashed commit?

For code changes:

  • [ ] Have you written and/or updated unit tests to verify your changes?
  • [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under ASF 2.0?
  • [ ] If applicable, have you updated the LICENSE.txt file, including the main LICENSE.txt file in the root of this repository?
  • [ ] If applicable, have you updated the NOTICE.txt file, including the main NOTICE.txt file found in the root of this repository?

For documentation related changes:

  • [x] Have you ensured that format looks appropriate for the output in which it is rendered?

porunov avatar Aug 15 '22 13:08 porunov

As for the next steps I think we should remove storage.parallel-backend-executor-service and instead relay on async execution of storage drivers.

That is a good point and @cdegroc and has done lots of experiments with that. Removing storage.parallel-backend-executor-service in favor of async CQL executation is certainly beneficial for throughput, but there is a subtle point: if we use async CQL execution, we need to retain the CQL executor service. The reason is stated below.

According to https://docs.datastax.com/en/developer/java-driver/4.14/manual/core/async/,

You should also be careful about expensive computations: if your callbacks hold I/O threads for too long, they will negatively impact the driver’s throughput. Consider providing your own executor:

// Create this as a global resource in your application
Executor computeExecutor = ...

CompletionStage<Integer> resultStage =
    responseStage.thenApplyAsync( // note: thenApplyAsync instead of thenApply
        resultSet -> someVeryExpensiveComputation(resultSet),
        computeExecutor);

Since JanusGraph needs to deserialize data which is a CPU-intensive step, the CQL thread pool (with a size close to the number of CPU cores) is still beneficial.

In all, experiments seem to suggest that we need to keep either the backend thread pool or we need to keep the CQL thread pool for the Cassandra use case.

li-boxuan avatar Aug 15 '22 13:08 li-boxuan

Since JanusGraph needs to deserialize data which is a CPU-intensive step, the CQL thread pool (with a size close to the number of CPU cores) is still beneficial.

That's a good point. I guess we could leave one of the pools for that but I was thinking what if we execute that data deserialization in the calling thread? I.e. right now we need 1 thread to execute 1 JanusGraph query. So, potentially we can just use that thread for any heavy computation I guess. That said, if Gremlin is switched to Async and JanusGraph starts supporting it we may still need some pool for that computation (currently discussed here: https://lists.apache.org/thread/563oxfgddynj0d7sqsnj5vfv1gk8lvmk).

porunov avatar Aug 15 '22 14:08 porunov

I guess we could leave one of the pools for that but I was thinking what if we execute that data deserialization in the calling thread? I.e. right now we need 1 thread to execute 1 JanusGraph query. So, potentially we can just use that thread for any heavy computation I guess.

Let's say you have query.batch = True enabled and you have a query like g.V().hasId(within(1,2,3,4,...100k)).values(), there will be 100k queries to the storage backend. In this case, using the caller thread for deserialization would be much slower compared to using a CQL thread pool.

li-boxuan avatar Aug 15 '22 14:08 li-boxuan

Let's say you have query.batch = True enabled and you have a query like g.V().hasId(within(1,2,3,4,...100k)).values(), there will be 100k queries to the storage backend. In this case, using the caller thread for deserialization would be much slower compared to using a CQL thread pool.

That's true. Using executor pool in this case would be up to N times faster (where N is the pool size). That said, let's take a look at the next example:

ExecutorService executorService = Executors.newFixedThreadPool(1000);
for(int i = 0; i<1000; i++){
    executorService.submit(() -> g.V().hasId(123).values());
}

In the above example let's say we have CQL executor service size of 8. It means all 1000 requests will be stack waiting for the data to be computed. In fact, we will waste 1000 threads just waiting for the result while only 8 threads would work on real data computation. So, 1008 threads and only 8 of them are useful. All other 1000 threads just take some CPU time without contributing much value.

My assumption is that have CQL thread pool is beneficial only when the amount of JanusGraph parallel queries are <= CQL thread pool size. As soon as we have the amount of parallel JanusGraph queries > CQL thread pool size - that pool becomes a bottleneck.

That just my assumption, I really didn't test that.

My idea was to somehow rewrite this method somehow: https://github.com/JanusGraph/janusgraph/blob/3fa6ce7993ce02cbfe8d08f3446a43e3a2f7425b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/CQLSimpleSliceFunction.java#L38

I was thinking maybe to return a Supplier function which computes the result when called. Didn't think much about it yet but maybe even something like:

protected Supplier<EntryList> getSlice(CompletableFuture<AsyncResultSet> completableFutureSlice) {
    return () -> {
        AsyncResultSet asyncResultSet = interruptibleWait(completableFutureSlice);
        return fromResultSet(asyncResultSet, this.getter);
    };
}

In that case, we don't block current thread waiting for the result to be completed but instead the upper calling code can push several CQL requests and then when they need EntryList it just starts calling all of them one by one and the final execution is processed in the current thread instead of a separate thread pool which may be blocked quite quickly.

Probably I'm just missing something as I didn't look at it enough

porunov avatar Aug 15 '22 15:08 porunov

Using executor pool in this case would be up to N times faster (where N is the pool size).

That is not 100% accurate; it should be up to N times faster where N = min(pool size, number of CPU cores)

In the above example let's say we have CQL executor service size of 8. It means all 1000 requests will be stack waiting for the data to be computed. In fact, we will waste 1000 threads

I am not sure if I get it. If you have an executor service size of 8, how come you could waste 1000 threads? Do you mean we have a CPU core size of 8 and an executor service size of 1000? Anyways, assuming we abandon the backend thread pool, my experiment shows that the CQL thread pool is beneficial iff its size is not significantly larger than the number of CPU cores.

li-boxuan avatar Aug 15 '22 15:08 li-boxuan

That is not 100% accurate; it should be up to N times faster where N = min(pool size, number of CPU cores)

I assumed that pool size is equal to cores amount. Of course if someone tries to use 80 pool size having 8 cores it won't speed up performance but vice versa it will be slower that pool size of 8 because CPU will spend time switching thread contexts. Assuming pool size is 8 and there are 8 cores - this should be the best performance we can achieve (on paper).

I am not sure if I get it. If you have an executor service size of 8, how come you could waste 1000 threads? Do you mean we have a CPU core size of 8 and an executor service size of 1000? Anyways, assuming we abandon the backend thread pool, my experiment shows that the CQL thread pool is beneficial iff its size is not significantly larger than the number of CPU cores.

I think you are right here. We should probably leave one of the pools for deseralization computation. Now the question only which pool do we want to leave and which one should be removed.

porunov avatar Aug 15 '22 15:08 porunov

@li-boxuan I guess it makes sense to leave this CQL execution service but remove parallel backend executor service. That said, the purpose of this CQL execution service will change because it will be used for deseralization computation only and not for queries execution. If you are OK I will close this PR without merging it. If you think we need to remove CQL execution service, I'm OK merging this PR

porunov avatar Aug 15 '22 15:08 porunov

That said, the purpose of this CQL execution service will change because it will be used for deserialization computation only and not for queries execution.

I think even now it is serving the same purpose (deserialization) only. Even now, I suspect it makes any sense to use a CQL thread pool with size > number of CPU cores. Thus, IMO we should keep the CQL Executorservice but make it a fixed thread pool.

li-boxuan avatar Aug 16 '22 02:08 li-boxuan

That said, the purpose of this CQL execution service will change because it will be used for deserialization computation only and not for queries execution.

I think even now it is serving the same purpose (deserialization) only. Even now, I suspect it makes any sense to use a CQL thread pool with size > number of CPU cores. Thus, IMO we should keep the CQL Executorservice but make it a fixed thread pool.

As far as I understand right now it actually uses a thread to wait for CQL IO to be finished. It means that the next CQL query won't be sent to the storage unless there is a free thread in the pool. So I suspect using executor pool with the size of CPU cores will limit the amount of parallel CQL queries running to N (N the amount of CPU cores / thread pool size). Let me know if you think I'm wrong here.

porunov avatar Aug 16 '22 08:08 porunov

As far as I understand right now it actually uses a thread to wait for CQL IO to be finished. It means that the next CQL query won't be sent to the storage unless there is a free thread in the pool.

This was my initial understanding as well but now I think that is wrong. When you call getSlice function,

https://github.com/JanusGraph/janusgraph/blob/3fa6ce7993ce02cbfe8d08f3446a43e3a2f7425b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/function/slice/AbstractCQLSliceFunction.java#L46-L57

this.session.executeAsync will submit the statement to the driver (which in turn sends the request to the Cassandra cluster). ~~So the size of the thread pool won't limit the number of CQL queries sent to Cassandra.~~ UPDATE: this is wrong under certain scenarios. See discussions below.

li-boxuan avatar Aug 16 '22 13:08 li-boxuan

this.session.executeAsync will submit the statement to the driver (which in turn sends the request to the Cassandra cluster). So the size of the thread pool won't limit the number of CQL queries sent to Cassandra.

I guess you can be right about it but I wasn't sure because I didn't investigate it much but I see several CQL iterators which might potentially block a thread waiting for IO but I didn't check that much. First itarator: https://github.com/JanusGraph/janusgraph/blob/3fa6ce7993ce02cbfe8d08f3446a43e3a2f7425b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java#L367

Second iterator: https://github.com/JanusGraph/janusgraph/blob/3fa6ce7993ce02cbfe8d08f3446a43e3a2f7425b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java#L492

Third iterator: https://github.com/JanusGraph/janusgraph/blob/3fa6ce7993ce02cbfe8d08f3446a43e3a2f7425b/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLResultSetKeyIterator.java#L38

I'm especially concerned about CQLPagingIterator which uses ResultSet currentResultSet = session.execute(boundStatement);. I was imagine that in case getSlice function indirectly calls CQLPagingIterator it could lead to blocking a thread for CQL query which limits the amount of paged executing queries to the amount of available threads in the pool. That said, it was my assumption and I didn't investigate it much. Maybe I'm wrong.

porunov avatar Aug 16 '22 14:08 porunov

@porunov I think you are right. At least for CQLResultSetIterator, I do see that it could lead to a thread in the CQL thread pool to do blocking wait for I/O. So yeah, in that case, the number of threads in CQL thread pool could impact the throughput. This is also consistent with my experiments.

I think the takeaway message is we should still keep the CQL thread pool. We could start to consider removing the backend thread pool in favor of async queries as suggested by @porunov. In fact, @cdegroc has already a private implementation for that but not sure if it is ready to submit to the JanusGraph repo.

li-boxuan avatar Aug 17 '22 03:08 li-boxuan

👋 Indeed, we've done some experiments internally using asynchronous requests to C* (i.e. removing storage.parallel-backend-executor-service). We have observed improvements on tail latencies and believe one of the reasons is that with the backend executor service, some of our traversals enqueued many C* requests in the executor service queue, delaying other traversals execution.

We have also tried to disable the storage.cql.executor-service (in addition to using async requests to C*). First, as @li-boxuan said, delegating the deserialization to the CQL Driver threads is discouraged, as this is a CPU intensive task. Then, JanusGraph also needs to be updated to support asynchronous paging, as C* Driver's non-blocking threads won't support synchronous paging operations (doc).

IMHO removing the backend executor service (which can be a bottleneck) and moving to async is a step in the right direction. Keeping the CQL executor service also makes sense based on DataStax recommendations (though its size could be automatically adapted based on number of available CPUs - @li-boxuan might have guidelines for this).

We would be happy to contribute. However, note that the KeyColumnValueStore interface implies synchronous calls at the moment, so it would need to be updated as well. And, more importantly, our change set is limited to the CQL backend and only some request types (e.g. getSlice). So quite some work remains to be done before it can be reviewed or merged, even more so if we want to update interfaces and offer that option for all backends.

cdegroc avatar Aug 17 '22 10:08 cdegroc

Thank you @li-boxuan and @cdegroc for your comments! I believe we are on the same page with these modifications.

I believe KeyColumnValueStore interface should be changed to asynchronous calls instead of synchronous. That of course requires all backend implementations to implement asynchronous interface which is a breaking change but I believe we will need to do that anyways when TinkerPop starts to support asynchronous queries executions (a side topic about it is here: https://lists.apache.org/thread/563oxfgddynj0d7sqsnj5vfv1gk8lvmk). So, I would be more than happy to see this breaking change in 1.0.0 version of JanusGraph. Of course not every storage backend may support real async capabilities, but for those storage backends which don't support it we can introduce a thread pool (i.e. "fake" async API as @joshsh and I call it in the above topic).

The interesting task is about asynchronous paging. It isn't hard to switch back to previous CQL paging logic where we were simply creating a new CQL statement with necessary page state as in v0.4 of JanusGraph: https://github.com/JanusGraph/janusgraph/blob/9739be22e3c2e226d15e8d9cde96d6c009bae65e/janusgraph-cql/src/main/java/org/janusgraph/diskstorage/cql/CQLKeyColumnValueStore.java#L422

In that case paging is possible with asynchronous execution because we don't need current statement to fetch the next page. All we need is to get current page state and provide it to the next statement. That said, if we do that then we will have asynchronous execution of queries which retrieve the first page and then all next pages will be retrieved synchronously. Each such execution will block a single thread. In that case we will end up with all CQL ExecutorService threads waiting for CQL IO instead of working on deserialization of data.

I didn't think much about how to solve asynchronous paging problem. I guess we could think about somehow cleverly use CompletionStage<AsyncResultSet>. Something like pageToRetrieve.thenApplyAsync( page -> { page.fetchNextPage().thenApplyAsync( nextPage -> {...}, executorService) }, executorService) but really didn't think about it yet.

I guess the main problem is that we don't know how many pages user wants to retrieve. It feels like we need some kind of configuration parameter (probably per transaction) which can be used as a hint for the amount of pages to pre-fetch. Like: pages-amount-to-pre-fetch: 10. In this case we execute 10 CQL statements sequentially but asynchronously one by one after the next page is retrieved (i.e. jumping back and forth from our CQL ExecutorService during deserialization processing to CQL Driver IO during waiting for query execution).

Again, didn't think about it much, so any ideas are helpful.

porunov avatar Aug 17 '22 12:08 porunov

@cdegroc @li-boxuan I think it makes sense to remove backend-ops executor pool in 1.0.0 release and leave only CQL executor pool as we discussed earlier. In case you don't have enough time contributing this feature back to JanusGraph then I can start working on this solution (i.e. to change getSlice query to return Future<EntryList> instead of returning EntryList). This is going to be a breaking change for custom JanusGraph backend adapters, but I think it's not a big breaking change because we can simply introduce I/O thread pools for each storage which doesn't support async currently. I.e. the same I/O thread pool which is currently called storage.parallel-backend-ops will be copied into the backend implementations which don't support async execution. I assume I will face the same issues you faced earlier, so if there is anything you would like to share (code samples / issues you faced / thoughts you have) then it would be great if you could share it. It will save more time for developing async CQL queries execution and I will have more time to develop more multi-query optimizations for different steps in such case. In case you think you could contribute this feature back then I would be more than happy to review it. Otherwise I will try to tackle this issue and will see what challenges I face.

porunov avatar May 02 '23 12:05 porunov

I started to take a look at usage of this backend-ops pool and it seems to be used for multiQueries only when the underlying storage backend doesn't support multi-key queries (Map<StaticBuffer,CompletableFuture<EntryList>> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh)). In such case, I'm thinking that we could simply add multi-key support for CQL backend and the usage of backend-ops won't be used for CQL IO operations anymore. Thus, this breaking change isn't really necessary as far as I understand unless I miss something. Let me know if you have any concerns or thoughts about it.

porunov avatar May 03 '23 01:05 porunov

I started to take a look at usage of this backend-ops pool and it seems to be used for multiQueries only when the underlying storage backend doesn't support multi-key queries (Map<StaticBuffer,CompletableFuture<EntryList>> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh)). In such case, I'm thinking that we could simply add multi-key support for CQL backend and the usage of backend-ops won't be used for CQL IO operations anymore. Thus, this breaking change isn't really necessary as far as I understand unless I miss something. Let me know if you have any concerns or thoughts about it.

Opened a separate issue to track this feature: #3759 Started working on it here: #3760 (a draft PR. Didn't test it yet). Nevertheless, in case anyone has any comments / thoughts about it, please, comment. I didn't find any benefit changing getSlice query interface to return Future<EntryList> because the backend-ops thread pool is used for multiQuery evaluation only and can be replaced by getSlice queries which accept multiple keys (multi-key queries). That said, if you know any other case where changing the interface could be beneficial, please, let me know and we can change it sooner than later.

porunov avatar May 03 '23 03:05 porunov

In case you think you could contribute this feature back then I would be more than happy to review it

I left DataDog and I don't have access to that code anymore. Thanks for taking this initiative. I would be happy to review too even though I hate multi-threading programming :)

li-boxuan avatar May 03 '23 07:05 li-boxuan

I started to take a look at usage of this backend-ops pool and it seems to be used for multiQueries only when the underlying storage backend doesn't support multi-key queries (Map<StaticBuffer,CompletableFuture<EntryList>> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh)). In such case, I'm thinking that we could simply add multi-key support for CQL backend and the usage of backend-ops won't be used for CQL IO operations anymore. Thus, this breaking change isn't really necessary as far as I understand unless I miss something. Let me know if you have any concerns or thoughts about it.

FWIW, we've considered enabling multi-get support for CQL as an optimisation, but found a couple JanusGraph issues recommending against it. See this comment and linked issue #35. In addition, DataStax documentation and this blog post also mention that using IN to query multiple partitions is an anti-pattern. I don't know how much of this still applies to recent C* versions though.

cdegroc avatar May 03 '23 12:05 cdegroc

@cdegroc @li-boxuan I think it makes sense to remove backend-ops executor pool in 1.0.0 release and leave only CQL executor pool as we discussed earlier. In case you don't have enough time contributing this feature back to JanusGraph then I can start working on this solution (i.e. to change getSlice query to return Future<EntryList> instead of returning EntryList). This is going to be a breaking change for custom JanusGraph backend adapters, but I think it's not a big breaking change because we can simply introduce I/O thread pools for each storage which doesn't support async currently. I.e. the same I/O thread pool which is currently called storage.parallel-backend-ops will be copied into the backend implementations which don't support async execution. I assume I will face the same issues you faced earlier, so if there is anything you would like to share (code samples / issues you faced / thoughts you have) then it would be great if you could share it. It will save more time for developing async CQL queries execution and I will have more time to develop more multi-query optimizations for different steps in such case. In case you think you could contribute this feature back then I would be more than happy to review it. Otherwise I will try to tackle this issue and will see what challenges I face.

I'll try to rebase our changes against 1.0.0 and open a PR, at least to bootstrap the discussion as, IMHO, adding an Async interface to the Backend makes sense, in the long term.

cdegroc avatar May 03 '23 12:05 cdegroc

I started to take a look at usage of this backend-ops pool and it seems to be used for multiQueries only when the underlying storage backend doesn't support multi-key queries (Map<StaticBuffer,CompletableFuture<EntryList>> getSlice(List<StaticBuffer> keys, SliceQuery query, StoreTransaction txh)). In such case, I'm thinking that we could simply add multi-key support for CQL backend and the usage of backend-ops won't be used for CQL IO operations anymore. Thus, this breaking change isn't really necessary as far as I understand unless I miss something. Let me know if you have any concerns or thoughts about it.

FWIW, we've considered enabling multi-get support for CQL as an optimisation, but found a couple JanusGraph issues recommending against it. See this comment and linked issue #35. In addition, DataStax documentation and this blog post also mention that using IN to query multiple partitions is an anti-pattern. I don't know how much of this still applies to recent C* versions though.

Thanks for these points! I will investigate them, but to be clear in #3760 I'm not changing CQL queries actually and I'm not adding IN support. Instead I simply changing the way queries are sent. Instead of using parallel-backend-ops thread pool to IO operations I'm changing the queries to reuse async functionality of DataStax Cassandra Driver.

I.e. the implementation is as simple as that:

public Map<StaticBuffer, EntryList> getSlice(final List<StaticBuffer> keys, final SliceQuery query, final StoreTransaction txh) throws BackendException {

    Map<StaticBuffer, CompletableFuture<EntryList>> futureResult = new HashMap<>(keys.size());

    for(StaticBuffer key : keys){
        futureResult.put(key, cqlSliceFunction.getSlice(new KeySliceQuery(key, query), txh));
    }

    Map<StaticBuffer, EntryList> result = new HashMap<>(keys.size());

    for(Map.Entry<StaticBuffer, CompletableFuture<EntryList>> entry : futureResult.entrySet()){
        result.put(entry.getKey(), getCompleted(entry.getValue()));
    }

    return result;
}

I didn't test it yet and didn't run benchmarks for that (will do when all current tests pass), but I assume it should improve parallelism because no more additional threads will be used for CQL IO operations.

I'm also removing CQLResultSetIterator because pages are going to be retrieved in async fashion (jumping from custom provided ExecutorService for deserialization work to CQL IO internal pool for IO work and so on back and forth until all pages are retrieved).

porunov avatar May 03 '23 12:05 porunov

Fixed by #3760

porunov avatar May 29 '23 10:05 porunov