Add ability to disable RxJavaAssemblyTracking in specific observable
To make RxJavaAssemblyTracking work everywhere except high-load places.
Smth like
Observable.
...
.compose( removeHookHere() )
I can't think of any good way of doing this. If you can make sure you create chains on a single thread, you can turn the tracking on and off.
Correct me if I am wrong. Some rough approach.
RxJavaAssemblyTracking.enable() works via setOnObservableAssembly.
We can create custom rx operator that adds current Observable's hashCode to designated list. If setOnObservableAssembly listener encounters Observable that is in that list, it unwraps ObservableOnAssembly and replaces hashCode with new one, so that next time it encounters this object, it does not wrap it into ObservableOnAssembly.
How would you remove from that list?
You could experiment with it by taking the hooks from RxJavaPlugins after enabling the tracking and replacing them with whatever scheme you like:
RxJavaAssemblyTracking.enable();
var set = new ConcurrentHashMap<Integer, Object>();
var observableHook = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(o -> {
if (set.containsKey(o.hashCode()) {
return observableHook.apply(o);
}
return o;
});
Thanks fot your snippet! I will depart from it.
How would you remove from that list?
When onSubscribe called, one can remove hashCode from list.
This is what I came up with. I rely on observable's source.
Current blockers in RxJava3 ver 3.0.10:
- compose does not report source, so chain is broken.
- scalar flatMap does not store source (non-scalar does). See Observable::9123
I am glad for suggestions on where it may fail as well as general comments.
Working snippet:
Observable
.just(1)
.compose(AssemblyStop.stopAssemblyTrackingDownstream())
// .flatMap (__ -> Observable.just(2) ) //<--- Breaks chain
// .compose (__ -> Observable.just(1) ) //<--- Breaks chain
.subscribeOn(Schedulers.io())
.map (it -> it + 1)
.observeOn(Schedulers.computation())
.subscribe();
Implementation:
import java.util.HashSet;
import java.util.Set;
import hu.akarnokd.rxjava3.debug.RxJavaAssemblyTracking;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableTransformer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.fuseable.HasUpstreamObservableSource;
import io.reactivex.rxjava3.internal.observers.LambdaObserver;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
// Java 8
// Uses two Rx hooks: setOnObservableAssembly and setOnObservableSubscribe
// Note: switching to different observable (compose, flatMap) will yield IllegalStateException in subscribe
public class AssemblyStop {
private static final Set<String> registry = new HashSet<>();
static <T> ObservableTransformer<T, T> stopAssemblyTrackingDownstream() {
return observable -> {
registry.add(observable.toString());
return observable;
};
}
@SuppressWarnings("rawtypes")
static void init() {
RxJavaAssemblyTracking.enable();
Function observableHook = RxJavaPlugins.getOnObservableAssembly();
RxJavaPlugins.setOnObservableAssembly(o -> {
if(registry.contains(o.toString())) return o; // This is result of our compose operator
if(!(o instanceof HasUpstreamObservableSource)) {
// Operator w/o source like Just or Zip with multiple sources
return (Observable) observableHook.apply(o);
}
Object source = ((HasUpstreamObservableSource) o).source();
String sourceHash = source.toString();
if(registry.contains(sourceHash)) {
// Request to stop assembly tracking
// We continue to track this object
registry.remove(sourceHash);
registry.add(o.toString());
return o;
} else return (Observable) observableHook.apply(o);
});
RxJavaPlugins.setOnObservableSubscribe( (observable, observer) -> {
if(observer instanceof LambdaObserver) {
// Check if our chain was broken by some compose
String hash = observable.toString();
if(!registry.contains(hash)) throw new IllegalStateException("AssemblyStop chain was broken. Did you switch observable (compose, flatMap) ?");
registry.remove(observable.toString());
}
return observer;
});
}
}
compose
You have disconnected output, there is no way to link it to the original inside the operator.
scalar
flatMap
You may have to prevent optimizations by using hide() on sources.
In general, what you try to achieve is highly contextual and as such would need a different architecture for RxJava.
Alternative would be an RxJava-backed custom factory for operators so you control the application of operators and thus can attach extra context to flow. Basically as if you'd write a MyObservable with mirrored operators.
Thanks for thorough comment!
You have disconnected output, there is no way to link it to the original inside the operator.
Yeah, switching observables! What do you think about adding extra hook inside compose? Does not seem intrusive, looks like RxJavaPlugins.onAssembly.
You may have to prevent optimizations by using
hide()on sources.
Thanks, I will research on hide.
As of scalar flat map, if you apply it from ScalarSupplier, it returns scalarXMap w/o source. Otherwise it uses ObservableFlatMap with source (Observable::9117-9125).
It can be fixed by adding extra source field to scalarXMap, also not something intrusive.
if (this instanceof ScalarSupplier) {
@SuppressWarnings("unchecked")
T v = ((ScalarSupplier<T>)this).get();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<>(this, mapper, delayErrors, maxConcurrency, bufferSize));
In general, what you try to achieve is highly contextual and as such would need a different architecture for RxJava.
I was inspired by following comment in RxJava issues. Quote: "RxJavaAssemblyTracking ... comes with performance drawbacks. It's a global on/off switch. ..." I also faced need to stop RxJavaAssemblyTracking at too long or highly repetitive chains.
Could you give concrete examples when different architecture will be needed (i.e. relying on HasUpstreamObservableSource interface fails, except for switching observables of course) ?
My goal is to create approach that fits into current RxJava architecture or requires very minor additions (or achievable with simple bytecode manipulation on complile time via Gradle plugin).
Basically as if you'd write a MyObservable with mirrored operators.
I hope I will not need this :) Ok, it will be last resort. Gradle plugin that will replace rxjava imports with mine.
It can be fixed by adding extra source field
I'm not fond of keeping a reference to something otherwise not used. You could use some ThreadLocal trickery to remember what the hook saw previously and match up scalars with scalarXMaps, assuming you don't use multiple threads to assemble a flow in the first place.
Could you give concrete examples
Anything that would otherwise cause multi-wrapping. Also anything relying on internal components.
I decided to go with this approach:
Alternative would be an RxJava-backed custom factory for operators so you control the application of operators and thus can attach extra context to flow. Basically as if you'd write a MyObservable with mirrored operators.
Basically my operator will wrap into my proxy observable, that wraps all observable methods.