postgresql-dart icon indicating copy to clipboard operation
postgresql-dart copied to clipboard

Long running queries / connection leak

Open jimmyff opened this issue 6 years ago • 14 comments

I'm running in to a strange issue using this package with GCP Cloud SQL.

If I generate a lot of traffic I start to see timeout errors in places I really wouldn't expect to get timeout errors (eg: a simple insert statement). After running under high load for a while I then start getting this error: Caught connect error, PostgreSQLSeverity.fatal 53300: remaining connection slots are reserved for non-replication superuser connections , #0 PostgreSQLConnection.open (package:postgres/src/connection.dart:133:7)

When I look at the pg_stat_activity I see many queries doing the insert with wait_event = "ClientRead" and state = "idle in transaction". Not sure how I could fail to close a transaction?

I added a timeoutInSeconds to both the queries in my file however now the connection pool is getting filled up with the delete queries, they show wait_event = "tuple" and state = "active".

I think it sounds like I have a connection leak but I'm not sure how. I've included my code below.


/// Request should provide the AccountReplicateMessage object as it's body
Future<shelf.Response> accountReplicateService(shelf.Request request) async {
  final requestString = await request.readAsString();
  if (requestString.isEmpty) {
    return shelf.Response(400,
        body: '400: Bad Request', headers: {'Cache-Control': 'no-store'});
  }
  print('accountReplicateService request: $requestString');
  final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64
      .decode(json.decode(requestString)['message']['data'].toString()))));

  final db = GetIt.instance<DatabaseService>().db;
  try {
    await db.open();
  } catch (e, s) {
    print(['Caught connect error', e, s]);
    return shelf.Response.internalServerError(
        body: 'Failed to connect to Database.',
        headers: {'Cache-Control': 'no-store'});
  }

  try {
    final account = message.account;
    var result = await db.transaction((PostgreSQLExecutionContext ctx) async {

      await ctx.query("DELETE FROM public.profile where id = @idParam",
          substitutionValues: {"idParam": account.uid}, timeoutInSeconds: 10);

      await ctx.query("""
    INSERT INTO public.profile (
	    id, dob, visible
    ) VALUES (
      @idParam, @dobParam, @visibleParam, 
    )
  """, substitutionValues: {
        "idParam": account.uid,
        "dobParam": account.dob,
        "visibleParam": account.profile.visible,
      }, timeoutInSeconds: 30);

      // Calculate the bloom filter bits
      final List<int> bitIndexes = BloomFilter.hashIndexesWithSize<String>(
          ServiceConfig.bloomFilterBitCount,
          ServiceConfig.bloomFilterExpectedItems,
          account.uid);

      final List<String> profileIds = await executeSearch(
          ctx, bitIndexes.first, account.profile, account.uid);

      final replicatedMessage = AccountReplicatedMessage((b) => b
        ..uid = account.uid
        ..bloomFilterBits = BuiltList<int>.from(bitIndexes).toBuilder()
        ..bloomFilterBitLastSearched = bitIndexes.first
        ..profiles = BuiltList<String>.from(profileIds).toBuilder());

      // publish account_replicated message
      final gapi = await GetIt.instance<GapiService>().gapiClient;

      final PubsubApi pubsubApi =
          GetIt.instance<PubSubService>().pubsubApi(gapi);
      final response = await pubsubApi.projects.topics.publish(
          PublishRequest.fromJson({
            "messages": [
              {
                "attributes": <String, String>{
                  "uid": account.uid,
                },
                "data": base64Encode(
                    utf8.encode(json.encode(replicatedMessage.toJsonMap()))),
                "publishTime": DateTime.now().toUtc().toIso8601String()
              }
            ]
          }),
          'projects/onesceneapp/topics/account_replicated');

      print('Inserted: ${account.uid}.  x${profileIds.length} search results.');

    });
    await db.close();

    return shelf.Response.ok('', headers: {'Cache-Control': 'no-store'});
  } catch (e, s) {
    print(e);
    print(s);
    await db.close();
    return shelf.Response.internalServerError(
        headers: {'Cache-Control': 'no-store'});
  }
}



jimmyff avatar Oct 02 '19 15:10 jimmyff

I'm not 100% sure of what the exact cause of your issue is, but you need a query queue if you are running into the connection limit of your database server - regardless of whether or not you are leaking connections. You can accomplish this easily by just reusing the database connection - there is a queue built into it. Creating a new database connection and closing it for every HTTP request is more expensive than it needs to be.

You can see how Aqueduct does this here, or use some other pooling package available on pub. FWIW, you could also use Aqueduct, cut all of this code in half, push the test surface to the framework instead of your code, and avoid difficult to debug statements like this one: final message = AccountNewMessage.fromJsonMap(json.decode(utf8.decode(base64.decode(json.decode(requestString)['message']['data'].toString()))));

itsjoeconway avatar Oct 02 '19 17:10 itsjoeconway

Okay, I'll have a go at switching over to Aqueduct, I initially thought it was maybe a little overkill for what I needed but as I'm already running in to issues using shelf maybe I should just make the switch. Is it relatively straight forward to switch from shelf to Aqueduct?

How would it avoid those difficult to debug statements?

Thanks

jimmyff avatar Oct 03 '19 09:10 jimmyff

Not too sure on what it takes to switch - it can't be too bad: the method you've shown here would be pretty close to valid just by switching shelf. with aqueduct. The routing and application structure are different, but that's the easy part anyway.

  • See this section on body decoding to clean up that statement I pulled out.
  • See this section on exception handling behavior to reduce your need to try-catch and send responses.

itsjoeconway avatar Oct 03 '19 12:10 itsjoeconway

It seems that I faced with very similar issue!

At some point requests start to be processed extremely slowly and until I kill the connection the speed won't increase

https://stackoverflow.com/questions/68375709/why-can-a-query-periodically-take-10-times-longer-to-execute

bubnenkoff avatar Jul 15 '21 08:07 bubnenkoff

@bubnenkoff: I've created package:postgres_pool for this exact reason: it allows you to have periodically reconnected connections based on configured settings like query counts, connection age, total session duration...

isoos avatar Jul 15 '21 09:07 isoos

@isoos oh cool! Thanks for fast answer. I just to thought how to contact with you. Even wrote to Telegram (do not know is it you or not). Am I right understand that postgres_pool is just like hack to prevent such leek?

bubnenkoff avatar Jul 15 '21 09:07 bubnenkoff

I have no Telegram, so that's not me :)

Am I right understand that postgres_pool is just like hack to prevent such leek?

It also has retry logic for transactions, and concurrency control.

isoos avatar Jul 15 '21 09:07 isoos

@isoos but is there any chance that driver will get fixed in nearest time? I can try postgres_pool, but I also can wait for a fixing driver.

bubnenkoff avatar Jul 15 '21 09:07 bubnenkoff

@bubnenkoff stablekernel's repo is no longer active, I maintain a fork and I have uploader rights to the package. If somebody wants to work on this, I'll be happy to accept PRs, but for me reconnecting periodically solves the pain, so I won't put much effort into it myself... I'm not even sure if it is something the Dart package should fix, or just some kind of resource aggregation on the connection that would happen anyway...

isoos avatar Jul 15 '21 09:07 isoos

@isoos thanks! Is it possible to do something like:

main() {
  // ...
  Timer.periodic(Duration(hours: 1), (timer) {
   connection.close();
   connection.open();
  });

I do not want to rewrite code for now. And need any simple hack. Is it good idea to do so?

bubnenkoff avatar Jul 15 '21 09:07 bubnenkoff

I think postgres_pool will serve you well:

  • PgPool implements PostgreSQLExecutionContext: https://pub.dev/documentation/postgres_pool/latest/postgres_pool/PgPool-class.html, so you can use it as drop-in replacement (for most practical purposes).
  • You can set this field to one hour to achieve the same as you wanted: https://pub.dev/documentation/postgres_pool/latest/postgres_pool/PgPoolSettings/maxConnectionAge.html

isoos avatar Jul 15 '21 10:07 isoos

@isoos Could you help me, I am writing on dart not too often. How to pass this connections settings?

PgPoolSettings? settings;

void main() async {

pg = PgPool(
    PgEndpoint(
        host: 'localhost',
        port: 5432,
        database: 'mydb',
        username: 'postgres',
        password: '123'),
        settings: settings // what should be here?
  ) ;

bubnenkoff avatar Jul 23 '21 13:07 bubnenkoff

@bubnenkoff: I've updated the example here: https://github.com/agilord/postgres_pool/blob/master/example/example.dart#L4-L15

isoos avatar Jul 23 '21 17:07 isoos

I found interesting setting in new PG. Is it about this issue? https://www.postgresql.org/docs/14/runtime-config-client.html#GUC-IDLE-SESSION-TIMEOUT

Пт, 23 июля 2021 г. в 20:24, István Soós @.***>:

@bubnenkoff https://github.com/bubnenkoff: I've updated the example here:

https://github.com/agilord/postgres_pool/blob/master/example/example.dart#L4-L15

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/stablekernel/postgresql-dart/issues/104#issuecomment-885787238, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABRWNFRQDVMH3HDF2NSLU3LTZGQUJANCNFSM4I4YJKZQ .

bubnenkoff avatar Aug 07 '21 13:08 bubnenkoff