opensearch-py icon indicating copy to clipboard operation
opensearch-py copied to clipboard

[BUG] - parallel_bulk does not work in AWS lambda

Open Aarif1430 opened this issue 4 years ago • 12 comments

OSError: [Errno 38] Function not implemented. I started seeing this error after upgrading to python3.9. The reason is opensearch bulk function is using multiprocessing module internally and python multiprocessing.pool.ThreadPool is breaking.

OSError: [Errno 38] Function not implemented
sl = self._semlock = _multiprocessing.SemLock(
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)

File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 57, in __init__
SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/var/lang/lib/python3.9/multiprocessing/synchronize.py", line 162, in __init__
return Lock(ctx=self.get_context())


--

image

It looks like:

  • synchronize.Lock doesn't work in lambda for any version of Python (lambda has no /dev/shm, and no write access to /dev in lambda - see: https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda )

  • ThreadPool is now using synchronize.Lock from version 3.9

To Reproduce

Steps to reproduce the behavior:

  1. Deploy an application using opensearch-py==1.0.0 to aws lambda
  2. Invoke bulk function of opensearch
  3. See error

Expected behavior The opensearch client should work as it was working fine with python3.6

Plugins opensearch-py==1.0.0

Screenshots Error screenshots image image

Host/Environment (please complete the following information):

  • OS: Aws lambda

Additional context Add any other context about the problem here.

Aarif1430 avatar Nov 25 '21 21:11 Aarif1430

I'm also seeing this error with Python 3.8

[ERROR] OSError: [Errno 38] Function not implemented
Traceback (most recent call last):
....
  File "/var/task/opensearchpy/helpers/actions.py", line 469, in parallel_bulk
    pool = BlockingPool(thread_count)
  File "/var/lang/lib/python3.8/multiprocessing/pool.py", line 925, in __init__
    Pool.__init__(self, processes, initializer, initargs)
  File "/var/lang/lib/python3.8/multiprocessing/pool.py", line 196, in __init__
    self._change_notifier = self._ctx.SimpleQueue()
  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 113, in SimpleQueue
    return SimpleQueue(ctx=self.get_context())
  File "/var/lang/lib/python3.8/multiprocessing/queues.py", line 336, in __init__
    self._rlock = ctx.Lock()
  File "/var/lang/lib/python3.8/multiprocessing/context.py", line 68, in Lock
    return Lock(ctx=self.get_context())
  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 162, in __init__
    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
  File "/var/lang/lib/python3.8/multiprocessing/synchronize.py", line 57, in __init__
    sl = self._semlock = _multiprocessing.SemLock(

jasongilman avatar Dec 02 '21 17:12 jasongilman

@jasongilman Did you get this error in a lambda or elsewhere?

wbeckler avatar Aug 05 '22 05:08 wbeckler

@wbeckler It was in a lambda.

jasongilman avatar Aug 05 '22 09:08 jasongilman

@jasongilman Yes it was in aws lambda.

Aarif1430 avatar Aug 05 '22 11:08 Aarif1430

Is anyone up for contributing a patch that addresses this issue when /dev/shm isn't available? There's a potential drop in replacement for the multiprocessing library: https://pypi.org/project/lambda-multiprocessing/

wbeckler avatar Oct 19 '22 22:10 wbeckler

At a high level, is this issue about adding Python 3.9 support (starting with CI)?

dblock avatar Jul 26 '23 14:07 dblock

@Aarif1430 @jasongilman Is the bug still persisting?

saimedhi avatar Sep 08 '23 17:09 saimedhi

CI with Python 3.9 was added in https://github.com/opensearch-project/opensearch-py/pull/336 and it currently passes. We need a test that reproduces this problem.

dblock avatar Nov 10 '23 14:11 dblock

I'm able the reproduce the issue:

Create lambda with python3.9:

import json
from multiprocessing.pool import ThreadPool

def lambda_handler(event, context):
    print("Hello")
    pool = ThreadPool()
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

Give error

{
  "errorMessage": "[Errno 38] Function not implemented",
  "errorType": "OSError",
  "stackTrace": [
    "  File \"/var/task/lambda_function.py\", line 6, in lambda_handler\n    pool = ThreadPool()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/pool.py\", line 927, in __init__\n    Pool.__init__(self, processes, initializer, initargs)\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/pool.py\", line 196, in __init__\n    self._change_notifier = self._ctx.SimpleQueue()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/context.py\", line 113, in SimpleQueue\n    return SimpleQueue(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/queues.py\", line 341, in __init__\n    self._rlock = ctx.Lock()\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/context.py\", line 68, in Lock\n    return Lock(ctx=self.get_context())\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 162, in __init__\n    SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)\n",
    "  File \"/var/lang/lib/python3.9/multiprocessing/synchronize.py\", line 57, in __init__\n    sl = self._semlock = _multiprocessing.SemLock(\n"
  ]
}

samuelc-tm avatar Nov 13 '23 15:11 samuelc-tm

Looking at https://pypi.org/project/lambda-thread-pool/

You cannot use "multiprocessing.Queue" or "multiprocessing.Pool" within a Python Lambda environment because the Python Lambda execution environment does not support shared memory for processes.

This means we need to get rid of or be able to swap ThreadPool with LambdaThreadPool in https://github.com/opensearch-project/opensearch-py/blob/da436cbbe8dda34abd607f527d4f0bdacb9b30d8/opensearchpy/helpers/actions.py#L470.

For an immediate workaround you can copy-paste the parallel_bulk implementation and replace BlockingPool with LambdaThreadPool and see if that works. For something maintainable, I would extract BlockingPool from this implementation by adding an abstract thread pool interface, implement another one for LambdaThreadPool and add a configuration parameter to specify which thread pool to use. Anyone wants to give either a try?

dblock avatar Nov 13 '23 17:11 dblock

I renamed this to "parallel_bulk doesn't work in AWS lambda", is there anything else that doesn't?

dblock avatar Nov 13 '23 17:11 dblock

Thank you, in my case the ThreadPool is used by some sdk and it wouldn't be ideal to change. We started getting the issue when upgrading from python3.7 to 3.9. We might just find an alternative solution instead of using the sdk.

samuelc-tm avatar Nov 15 '23 14:11 samuelc-tm