spring-integration icon indicating copy to clipboard operation
spring-integration copied to clipboard

JdbcMetadataStore does not handle concurrent transactions correctly

Open JulienCharon opened this issue 4 years ago • 6 comments

Versions

  • Sprint Boot 2.5.2
  • Sprint Integration 5.5.1 and 5.5.2
  • PostgreSQL 11.2and 13

Describe the bug I have following File Integration Flow setup in a Spring Boot App:

  @Bean
  public IntegrationFlow fileFlow(
      Consumer consumer,
      ConcurrentMetadataStore metadataStore,
      Executor filePollerExecutor) {
    var fileFilter =
        new ChainFileListFilter<>(
            List.of(
                new LastModifiedFileListFilter(10L),
                new FileSystemPersistentAcceptOnceFileListFilter(
                    metadataStore, "myPrefix")));
    return IntegrationFlows.from(
            Files.inboundAdapter(sourceDirectory)
                .filter(fileFilter),
            poller ->
                poller.poller(
                    pm ->
                        pm.fixedRate(100L)
                            .taskExecutor(filePollerExecutor)
							.transactional()))
        .transform(Files.toStringTransformer(StandardCharsets.UTF_8.toString(), true))
        .handle(
            String.class,
            (p, h) -> {
              consumer.consumer(p);
              return null;
            })
        .get();
  }

  @Bean
  Executor filePollerExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(10);
    executor.setMaxPoolSize(100);
    executor.initialize();
    return executor;
  }

  @Bean
  ConcurrentMetadataStore metadataStore(DataSource dataSource) {
    return new JdbcMetadataStore(dataSource);
  }

When I copy 500 files into the directory that is being polled, most of the are processed. But a couple of them are not, I can see a couple of exceptions like:

2021-08-02 14:34:17.319 ERROR 25316 --- [llerExecutor-10] o.s.integration.handler.LoggingHandler :

org.springframework.messaging.MessagingException: nested exception is org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:427) at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348) at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630) at java.base/java.lang.Thread.run(Thread.java:831) Caused by: org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?]; SQL state [25P02]; error code [0]; ERROR: current transaction is aborted, commands ignored until end of transaction block; nested exception is org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.springframework.jdbc.core.JdbcTemplate.translateException(JdbcTemplate.java:1542) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:667) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:713) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:744) at org.springframework.jdbc.core.JdbcTemplate.query(JdbcTemplate.java:757) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:879) at org.springframework.jdbc.core.JdbcTemplate.queryForObject(JdbcTemplate.java:906) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:156) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$FastClassBySpringCGLIB$$1bf04d6a.invoke() at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750) at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:692) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore$$EnhancerBySpringCGLIB$$32961232.putIfAbsent() at org.springframework.integration.file.filters.AbstractPersistentAcceptOnceFileListFilter.accept(AbstractPersistentAcceptOnceFileListFilter.java:83) at org.springframework.integration.file.filters.AbstractFileListFilter.filterFiles(AbstractFileListFilter.java:38) at org.springframework.integration.file.filters.ChainFileListFilter.filterFiles(ChainFileListFilter.java:59) at org.springframework.integration.file.filters.CompositeFileListFilter.filterFiles(CompositeFileListFilter.java:144) at org.springframework.integration.file.DefaultDirectoryScanner.listFiles(DefaultDirectoryScanner.java:95) at org.springframework.integration.file.FileReadingMessageSource.scanInputDirectory(FileReadingMessageSource.java:375) at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:349) at org.springframework.integration.file.FileReadingMessageSource.doReceive(FileReadingMessageSource.java:94) at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:142) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:212) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:444) at java.base/jdk.internal.reflect.GeneratedMethodAccessor270.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:567) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123) at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388) at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) at jdk.proxy3/jdk.proxy3.$Proxy219.call(Unknown Source) at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413) ... 5 more Caused by: org.postgresql.util.PSQLException: ERROR: current transaction is aborted, commands ignored until end of transaction block at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164) at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:114) at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeQuery(ProxyPreparedStatement.java:52) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeQuery(HikariProxyPreparedStatement.java) at org.springframework.jdbc.core.JdbcTemplate$1.doInPreparedStatement(JdbcTemplate.java:722) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) ... 47 more Caused by: org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "int_metadata_store_pk" Detail: Key (metadata_key, region)=(business-case_C:\myDir\myFile-xyz.xml, DEFAULT) already exists. at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2552) at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2284) at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:322) at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:481) at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:401) at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164) at org.postgresql.jdbc.PgPreparedStatement.executeUpdate(PgPreparedStatement.java:130) at com.zaxxer.hikari.pool.ProxyPreparedStatement.executeUpdate(ProxyPreparedStatement.java:61) at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.executeUpdate(HikariProxyPreparedStatement.java) at org.springframework.jdbc.core.JdbcTemplate.lambda$update$2(JdbcTemplate.java:965) at org.springframework.jdbc.core.JdbcTemplate.execute(JdbcTemplate.java:651) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:960) at org.springframework.jdbc.core.JdbcTemplate.update(JdbcTemplate.java:1015) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.tryToPutIfAbsent(JdbcMetadataStore.java:167) at org.springframework.integration.jdbc.metadata.JdbcMetadataStore.putIfAbsent(JdbcMetadataStore.java:148) ... 41 more

Although it looks like all files where processed correctly, when I query the int_metadata_store table, I can see that a couple of them have the value 0 in the metadata_value column. If I copy those files to the directory being polled again, they are processed again.

Expected behavior

All files should be processed, no exceptions should be thrown and there should be no entries with metadata_value "0" int_metadata_store table.

Looks similar to the behavior described in #3282

JulienCharon avatar Aug 02 '21 13:08 JulienCharon

This looks more like this issue: https://github.com/spring-projects/spring-integration/issues/3576.

Please, see the solution in the end. Probably you have the same problem.

artembilan avatar Aug 03 '21 14:08 artembilan

@artembilan: thank you very much for you input. I'm afraid that's not the reason. I debugged through org.springframework.jdbc.support.SQLErrorCodeSQLExceptionTranslator and I can see that there is an exception that is correctly translated to a DuplicateKeyException. However, there is another Exception with error code 25P02 coming from PostgreSQL saying that the current transaction is already aborted. But that one occurs when executing a select statement (SELECT METADATA_VALUE FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=?). After analyzing JdbcMetadataStore#putIfAbsent it seems clear to me what's happening:

  1. tryToPutIfAbsent(...) is called
  2. The insert statement causes an exception in the database which is translated to a DuplicateKeyException. When the exception is thrown from the database (error code 23505), the transaction is aborted
  3. Since tryToPutIfAbsent(...) returns zero, the query on line 156, which is SELECT METADATA_VALUE FROM %sMETADATA_STORE WHERE METADATA_KEY=? AND REGION=?, is executed, causing the database to throw another exception (error code 25P02) because the transaction is already aborted

JulienCharon avatar Aug 03 '21 15:08 JulienCharon

What is your TransactionManager? The JdbcMetadataStore.putIfAbsent() is marked with @Transactional. So, if it is declared as a bean and there is an appropriate TransactionManager (typically a DataSourceTransactionManager), the outer transaction is started, fully around this method. Not local withing a target DB. I mean that DuplicateKeyException must not mark this application-level transaction as aborted since DB has not started it to control, therefore it must not abort it or commit.

Do I miss anything related to the application-level transaction management?

artembilan avatar Aug 10 '21 15:08 artembilan

I'm using Spring Boot and didn't configure a TransactionManager manually. org.springframework.orm.jpa.JpaTransactionManager is configured automatically. I'm far from being an expert in transaction management, so I guess you're right. But how can that behavior be explained then?

JulienCharon avatar Aug 13 '21 07:08 JulienCharon

OK. It probably would be great if you are able to share with us a simple Spring Boot project which let us to reproduce issue and debug the situation.

artembilan avatar Aug 17 '21 15:08 artembilan

Perhaps we need to revise this issue for a similar fix we did in the DefaultLockRepository: https://github.com/spring-projects/spring-integration/issues/3733.

So, this JdbcMetadataStore will be free from @Transactional and you will be able to inject an appropriate TransactionManager which works well exactly with JDBC.

artembilan avatar Apr 19 '22 14:04 artembilan