JdbcMetadataStore does not handle concurrent transactions correctly
Versions
- Sprint Boot
2.5.2 - Sprint Integration
5.5.1and5.5.2 - PostgreSQL
11.2and13
Describe the bug I have following File Integration Flow setup in a Spring Boot App:
@Bean
public IntegrationFlow fileFlow(
Consumer consumer,
ConcurrentMetadataStore metadataStore,
Executor filePollerExecutor) {
var fileFilter =
new ChainFileListFilter<>(
List.of(
new LastModifiedFileListFilter(10L),
new FileSystemPersistentAcceptOnceFileListFilter(
metadataStore, "myPrefix")));
return IntegrationFlows.from(
Files.inboundAdapter(sourceDirectory)
.filter(fileFilter),
poller ->
poller.poller(
pm ->
pm.fixedRate(100L)
.taskExecutor(filePollerExecutor)
.transactional()))
.transform(Files.toStringTransformer(StandardCharsets.UTF_8.toString(), true))
.handle(
String.class,
(p, h) -> {
consumer.consumer(p);
return null;
})
.get();
}
@Bean
Executor filePollerExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(100);
executor.initialize();
return executor;
}
@Bean
ConcurrentMetadataStore metadataStore(DataSource dataSource) {
return new JdbcMetadataStore(dataSource);
}
When I copy 500 files into the directory that is being polled, most of the are processed. But a couple of them are not, I can see a couple of exceptions like:
2021-08-02 14:34:17.319 ERROR 25316 --- [llerExecutor-10] o.s.integration.handler.LoggingHandler :
org.springframework.messaging.MessagingException: nested exception is org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:427) at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348) at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1542) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:713) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:744) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:757) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:879) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:906) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:156) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$FastClassBySpringCGLIB$$1bf04d6a.invoke(
) at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$EnhancerBySpringCGLIB$$32961232.putIfAbsent( ) at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:83) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:38) at org.springframework.integration.file.filters.ChainFileListFilter.filterFiles(ChainFileListFilter.java:59) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:144) at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:95) at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:94) at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444) at java.base/jdk.internal.reflect.GeneratedMethodAccessor270.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at jdk.proxy3/jdk.proxy3.$Proxy219.call(Unknown Source) at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413) ... 5 more Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164) at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:114) at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java) at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:722) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) ... 47 more Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "int_metadata_store_pk" Detail: Key (metadata_key, region)=(business-case_C:\myDir\myFile-xyz.xml, DEFAULT) already exists. at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164) at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:130) at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java) at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.tryToPutIfAbsent(JdbcMetadataStore.java:167) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:148) ... 41 more
Although it looks like all files where processed correctly, when I query the int_metadata_store table, I can see that a couple of them have the value 0 in the metadata_value column. If I copy those files to the directory being polled again, they are processed again.
Expected behavior
All files should be processed, no exceptions should be thrown and there should be no entries with metadata_value "0" int_metadata_store table.
Looks similar to the behavior described in #3282
This looks more like this issue: https://github.com/spring-projects/spring-integration/issues/3576.
Please, see the solution in the end. Probably you have the same problem.
@artembilan: thank you very much for you input.
I'm afraid that's not the reason. I debugged through org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator and I can see that there is an exception that is correctly translated to a DuplicateKeyException.
However, there is another Exception with error code 25P02 coming from PostgreSQL saying that the current transaction is already aborted. But that one occurs when executing a select statement (SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?).
After analyzing JdbcMetadataStore#putIfAbsent it seems clear to me what's happening:
-
tryToPutIfAbsent(...)is called - The insert statement causes an exception in the database which is translated to a
DuplicateKeyException. When the exception is thrown from the database (error code23505), the transaction is aborted - Since
tryToPutIfAbsent(...)returns zero, the query on line 156, which isSELECT METADATA_VALUE FROM %sMETADATA_STORE WHERE METADATA_KEY=? AND REGION=?, is executed, causing the database to throw another exception (error code25P02) because the transaction is already aborted
What is your TransactionManager?
The JdbcMetadataStore.putIfAbsent() is marked with @Transactional. So, if it is declared as a bean and there is an appropriate TransactionManager (typically a DataSourceTransactionManager), the outer transaction is started, fully around this method. Not local withing a target DB.
I mean that DuplicateKeyException must not mark this application-level transaction as aborted since DB has not started it to control, therefore it must not abort it or commit.
Do I miss anything related to the application-level transaction management?
I'm using Spring Boot and didn't configure a TransactionManager manually. org.springframework.orm.jpa.JpaTransactionManager is configured automatically.
I'm far from being an expert in transaction management, so I guess you're right. But how can that behavior be explained then?
OK. It probably would be great if you are able to share with us a simple Spring Boot project which let us to reproduce issue and debug the situation.
Perhaps we need to revise this issue for a similar fix we did in the DefaultLockRepository: https://github.com/spring-projects/spring-integration/issues/3733.
So, this JdbcMetadataStore will be free from @Transactional and you will be able to inject an appropriate TransactionManager which works well exactly with JDBC.