pgsync icon indicating copy to clipboard operation
pgsync copied to clipboard

Using Streaming Bulk option, stops the script on error, can we continue the script despite error?

Open keyur1sst opened this issue 4 years ago • 2 comments

PGSync version: 2.1.9

Postgres version: 12.7

Elasticsearch version: Using Opensearch Version 1.0 hosted on AWS

Redis version: 5.0.7

Python version: 3.8.10

Problem Description: I am running Pgsync in Daemon mode (pgsync --config /optional/path/to/schema.json --daemon) using ELASTICSEARCH_STREAMING_BULK=true. The issue I am facing is that the script crashes after 429 error, can we make script to run without failing like without stream mode. I have 300Million record to sync to OpenSearch so running without Stream would not be an option, so can you make script not stop running when we have 429 Too Many Requests or any error.

Error Message (if any):

pgsync --config setting.json --daemon
2021-12-24 18:09:57.731:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
2021-12-24 18:09:57.736:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
2021-12-24 18:09:58.034:WARNING:pgsync.utils: ModuleNotFoundError: No module named 'plugins'
 - contacts
  [--------------------------------------------------]  196001/311609941    0%  1d 20:37:352021-12-24 18:13:29.642:WARNING:elasticsearch: POST https://search-whocalledbaharin-hteym33qwdkixddps47vmj6uta.me-south-1.es.amazonaws.com:443/whocalled/_bulk?refresh=false [status:429 request:0.055s]
2021-12-24 18:13:29.643:WARNING:elasticsearch: Undecodable raw error response from server: Extra data: line 1 column 5 - line 1 column 39 (char 4 - 38)
2021-12-24 18:13:29.643:ERROR:pgsync.sync: Exception TransportError(429, '429 Too Many Requests /whocalled/_bulk')
Traceback (most recent call last):
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/elastichelper.py", line 107, in bulk
    for _ in helpers.streaming_bulk(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 329, in streaming_bulk
    for data, (ok, info) in zip(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 256, in _process_bulk_chunk
    for item in gen:
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 195, in _process_bulk_chunk_error
    raise error
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 240, in _process_bulk_chunk
    resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 463, in bulk
    return self.transport.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 415, in perform_request
    raise e
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 381, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 330, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.TransportError: TransportError(429, '429 Too Many Requests /whocalled/_bulk')
 0:03:31.913506 (211.91 sec)
Traceback (most recent call last):
  File "/home/sst/.local/bin/pgsync", line 7, in <module>
    sync.main()
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/home/sst/.local/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 1139, in main
    sync.pull()
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 1006, in pull
    self.sync(self._sync(txmin=txmin, txmax=txmax))
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/sync.py", line 875, in sync
    self.es.bulk(self.index, docs)
  File "/home/sst/.local/lib/python3.8/site-packages/pgsync/elastichelper.py", line 107, in bulk
    for _ in helpers.streaming_bulk(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 329, in streaming_bulk
    for data, (ok, info) in zip(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 256, in _process_bulk_chunk
    for item in gen:
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 195, in _process_bulk_chunk_error
    raise error
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/helpers/actions.py", line 240, in _process_bulk_chunk
    resp = client.bulk("\n".join(bulk_actions) + "\n", *args, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/utils.py", line 168, in _wrapped
    return func(*args, params=params, headers=headers, **kwargs)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/client/__init__.py", line 463, in bulk
    return self.transport.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 415, in perform_request
    raise e
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/transport.py", line 381, in perform_request
    status, headers_response, data = connection.perform_request(
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/http_urllib3.py", line 277, in perform_request
    self._raise_error(response.status, raw_data)
  File "/home/sst/.local/lib/python3.8/site-packages/elasticsearch/connection/base.py", line 330, in _raise_error
    raise HTTP_EXCEPTIONS.get(status_code, TransportError)(
elasticsearch.exceptions.TransportError: TransportError(429, '429 Too Many Requests /whocalled/_bulk')

keyur1sst avatar Dec 24 '21 13:12 keyur1sst

If you are getting 429 errors then you can ajdust the params

  • ELASTICSEARCH_MAX_RETRIES
  • ELASTICSEARCH_INITIAL_BACKOFF
  • ELASTICSEARCH_MAX_BACKOFF

The defaults are listed here

toluaina avatar Jan 04 '22 09:01 toluaina

@keyur1sst what Opensearch instance type are you using? Perhaps see my answer here.

voyc-jean avatar Jan 04 '22 10:01 voyc-jean