java-client-api
java-client-api copied to clipboard
onJobCompletion's listener does not get called intermittently.
Version of MarkLogic Java Client API : develop (Post 4.0.4 release) Version of MarkLogic Server : 9.0 nightly (20180605 build on 3 node Linux cluster) Java version : 1.8 OS and version : Linux
Input: Some code to illustrate the problem, preferably in a state that can be independently reproduced on our end:
@Test(timeout = 450000)
public void testListenerCloseables() throws Exception {
Assume.assumeTrue(hostNames.length > 1);
System.out.println(Thread.currentThread().getStackTrace()[1].getMethodName());
AtomicInteger success = new AtomicInteger(0);
// There two variables are to track close method on Listeners.
AtomicBoolean testCloseOnBatchListenerUriReady = new AtomicBoolean(false);
AtomicBoolean testCloseOnFailureListenerQueryFailure = new AtomicBoolean(false);
// This variable tracks the OnJobCompleteion status
AtomicBoolean getOnePrimaryDBClient = new AtomicBoolean(false);
// Track primary database client on all listeners and job completion
StringBuilder sb_strBatchListenerUriReady = new StringBuilder();
StringBuilder sb_strJobCompletionListener = new StringBuilder();
class TestCloseOnBatchListenerUriReady implements QueryBatchListener, AutoCloseable {
@Override
public void close() throws Exception {
System.out.println(
"Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class");
testCloseOnBatchListenerUriReady.set(true);
}
@Override
public void processEvent(QueryBatch batch) {
System.out.println(
"processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class");
// Verify the Primary DatabaseClient instance
if (!getOnePrimaryDBClient.get()) {
getOnePrimaryDBClient.set(true);
sb_strBatchListenerUriReady.append(batch.getBatcher().getPrimaryClient().getHost());
sb_strBatchListenerUriReady.append("|");
sb_strBatchListenerUriReady.append(batch.getBatcher().getPrimaryClient().getPort());
}
}
}
class TestCloseOnBatchListenerQueryFailure implements QueryFailureListener, AutoCloseable {
@Override
public void close() throws Exception {
System.out.println(
"Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class");
testCloseOnFailureListenerQueryFailure.set(true);
}
@Override
public void processFailure(QueryBatchException failure) {
System.out.println(
"processFailure called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class");
}
}
// Listener to be called when QueryBatcher has completed reading all URIs
class TestQBJobCompleteionListener implements QueryBatcherListener {
@Override
public void processEvent(QueryBatcher batcher) {
System.out.println(
"processEvent called from testMinNodesWithCloseable in TestQBJobCompleteionListener class");
// Verify a detail - ticket Id at end of completion
sb_strJobCompletionListener.append(batcher.getBatchSize());
}
}
try {
TestCloseOnBatchListenerUriReady closeBatchURIs = new TestCloseOnBatchListenerUriReady();
TestCloseOnBatchListenerQueryFailure closeQueryFailure = new TestCloseOnBatchListenerQueryFailure();
TestQBJobCompleteionListener jobCompleteListener = new TestQBJobCompleteionListener();
QueryBatcher batcher = dmManager.newQueryBatcher(new StructuredQueryBuilder().collection("XmlTransform"))
.withBatchSize(4000).withThreadCount(5);
// Add the new Listeners to the batcher.
batcher.onUrisReady((batch) -> {
success.addAndGet(batch.getItems().length);
}).onQueryFailure(queryException -> {
queryException.printStackTrace();
}).onUrisReady(closeBatchURIs).onQueryFailure(closeQueryFailure).onJobCompletion(jobCompleteListener);
ticket = dmManager.startJob(batcher);
batcher.awaitCompletion();
dmManager.stopJob(ticket);
} catch (Exception e) {
e.printStackTrace();
}
// Verify the DatabaseClient instances.
System.out.println("Primary database instance is " + sb_strBatchListenerUriReady.toString());
// Verify the close status
assertTrue("Close is not called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class",
testCloseOnBatchListenerUriReady.get());
assertTrue("Close is not called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class",
testCloseOnFailureListenerQueryFailure.get());
// Verify the batch size on job completion
assertTrue("Job Completion details not equal", sb_strJobCompletionListener.toString().equalsIgnoreCase("4000"));
// Verify the primary database client
assertTrue("Primary database details not correct",
sb_strBatchListenerUriReady.toString().contains(String.valueOf(port)));
}
Actual output: Console output from Jenkins job is below.
testListenerCloseables
11:00:08.642 [Time-limited test] INFO c.m.c.d.impl.QueryBatcherImpl - (withForestConfig) Using [java-client-api-2-1.marklogic.com, java-client-api-2-2.marklogic.com, java-client-api-2-3.marklogic.com] hosts with forests for "QBFailover"
11:00:08.643 [Time-limited test] INFO c.m.c.d.impl.QueryBatcherImpl - Starting job batchSize=4000, threadCount=5, onUrisReady listeners=3, failure listeners=5
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
processEvent called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
11:00:09.257 [Time-limited test] INFO c.m.c.d.impl.QueryBatcherImpl - Job complete, jobBatchNumber=6, jobResultsSoFar=20000
11:00:09.257 [Thread-39] WARN c.m.c.d.impl.QueryBatcherImpl - Cancelling task to run the job completion listeners since the batcher is stopped
Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerUriReady class
Close called from testMinNodesWithCloseable in TestCloseOnBatchListenerQueryFailure class
Primary database instance is java-client-api-2-2.marklogic.com|8011
Expected output: Was expecting this assert to be called.
assertTrue("Job Completion details not equal", sb_strJobCompletionListener.toString().equalsIgnoreCase("4000"));
Alternatives: None
Closeable are getting called. Tested with ML Server build 10.0_1 dated 03/03/2022.