Only one executing per subprocess being executed
Hi there,
I'm trying to use this package but for some weird reason each subprocess is only executing once and hanging the rest of the code.
I'm using python 3.9, here is a snippet of the code:
# create page data for parallel execution
pages_data = []
for current_page in page_indices:
pages_data.append((
reader,
vertical,
title,
teacher,
current_page,
pages_total
))
# Creates final page and append to final PDF
with Pool() as executor:
rendered_pages = executor.starmap(generators.CreatePageWithWatermark, pages_data)
for page in rendered_pages:
result_pdf_final.add_page(page)
Each generators.CreatePageWithWatermark execution prints the current page and only 1 page is being processed by each subprocess. If i have less pages than the total of cores the code run just fine.
Just for fun I've set 1000 subprocess and my pc freezed but all pages were rendered.
The same behavior happens on the aws lambda.
Thks! that's a great little package.
edit: so now (out of the blue) its working on my pc but not on awslambda, only 6 executions are happening. Very weird.
I managed to get this working.
For some reason when using less than 12 subprocess it simply get stuck until the lambda execution times out.
I forced with 24 and it worked but it consumed way too much ram. Not sure if this happens because of my workload or if it's how the child processes are beeing distributed.
Something that i noticed is that running locally my CPU usage graph looks like a square function graph.
Not sure why this behaviour I will take a look this weekend.
What does generators.CreatePageWithWatermark do?
Please provide a minimal working example so that I can reproduce the behavior.
The queuing is currently a bit dumb. It assigns all tasks up front, assuming that each task takes approximately the same amount of time. Does that assumption hold for your use case?
I'm working with a lot of buffers to avoid saving the PDFs pages to disk (but this may be the solution for this scenario). The CreatePageWithWatermark() function mostly read a page from the buffer provided, generate a page and merge both in a new buffer.
It took a while, but I managed to recreate the issue with the code below:
from lambda_multiprocessing import Pool as LambdaPool
import timeit
import time
import requests
def CreatePageWithWatermark(title, author, page, buffer):
time.sleep(5)
print(len(buffer))
return buffer
def PDFMergeWorker(event, context):
start_time = timeit.default_timer()
url='https://pdfs.semanticscholar.org/c029/baf196f33050ceea9ecbf90f054fd5654277.pdf'
r = requests.get(url, stream=True)
pages_total = 100
pages_data = [(
'test',
'test',
current_page,
r.content,
) for current_page in range(0,100)]
with LambdaPool() as executor:
rendered_pages = executor.starmap(CreatePageWithWatermark, pages_data)
# Speedometer
stop_time = timeit.default_timer()
delta_time = stop_time - start_time
print('Time: ', delta_time)
print('Time per Page: ', delta_time / pages_total)
return 'Done'
The behavior is the same. on my machine there is only 16 print() calls (i have 16 threads) and on aws lambda only 6 print() call are made. (i setted 10G for this lambda)
If you remove the sleep() call the code runs fine. If you keep the sleep() and remove the pdf stream argument, the code also runs fine.
Only with both the code crashes.
Hmm, interesting.
I have seen an issue previously where print statements within a for loop, in one process, containing a sleep get buffered. So if you're printing thousands of lines, the stdout buffer will fill and get flushed. But if it's low volume stuff stays in the buffer longer than you'd expect.
I would expect that the process termination would result in the stdout buffer being flushed. But I don't know what special stuff Lambda does with stdout.
If you swap print for sys.stdout.write, do you get the same behaviour?
Also, in your MWE you're doing only one requests.get() call, and then attempting to operate on the .content response 100 times concurrently. Are you sure that whatever .requests returns is itself concurrency safe? I don't know what len() does to a buffer object. Does it consume it?
(I will play around to debug this weekend.)
What is this script supposed to do? That URL ends with .pdf, but it's not a PDF.
This is what it redirects to:

Are you trying to download a PDF, and process each page in the PDF in parallel? Are you trying to stream the download of the PDF, and process each page while downloading the next page?
Have you looked at this again?
I'm not able to reproduce the bug because the example is not a minimal working example.
Most importantly:
- You described the failure mode as "hanging" and "crashes". Those are two different things. (Opposites, really). Hanging is when the script appears to be doing nothing (it has not exited, but it is not printing anything either). Whereas I think of "crash" as an uncaught exception being thrown (in which case you should paste the exception in), or a lower level fault (like a seg fault), in which case you should also paste in the text output. So I don't know what outcome I'm trying to reproduce. In particular, for "hanging", how long did you wait, relative to how long it takes
multiprocessing.Poolto run the script? It could be just thatstdoutis buffered until the end of the script, but the code is otherwise running as expected, so you just need to wait longer. Try callingsys.stdout.flush()immediately after the print, before the sleep, and after.map(). (You'll needimport sys.) -
pages_datais a list of tuples of ints, strings and a byte array. It contains nothing exotic, or specific to PDFs or the requests library, really. Can you just hard-codepages_dataand reproduce the issue? - Can you confirm that you ran this code on your laptop, using the standard
multiprocessing.Pool()function instead of this library, and you got a different result?
But also:
- you mention "remove the pdf stream argument", but it's not clear which argument that is. Do you mean
stream=Truefor therequests.get()call, which is not related to the PDF format or any PDF library? - your functions and variables are named after PDFs, but you're not doing anything PDF-specific. If the issue is related to streaming content with the
requestslibrary, can you download non-PDF content? (Yes I suspect whatever issue you're seeing is related to the concurrency safety of the return value ofrequests.get()withstream=True) - For a machine with N CPUs, this example will take 100*5/N seconds. If you drop the number of 'pages' to 10, or 2, do you still get the same issue? If you drop the sleep time to 1s, do you still get the same issue? If you can do both then we can reproduce the issue in 2 seconds instead of 80, which makes it easier to resolve.
- You define
pages_total, but then on the next line you use100as a literal. So why definepages_total? If you don't need it to reproduce the issue, then delete it, check you still get the issue, and post the script without it. - Why do you specify
stream=True? The very first thing you do after calling.get(stream=True)is to access.content, which will call.read()on the stream. So why bother callingstream=True? Since you're accessing.contentrepeatedly you're reading from the stream for the first entry ofpages_data, and then accessing a cache of it on the next. - Why did you name your argument
buffer? It's a bytes array, not a file-like object. (Note that in general I would expect that buffers can't be safely passed between threads unless the relevant documentation explicitly says it can.) - Why does
CreatePageWithWatermarkreturn something? Why is the result from.map()saved to a variable? Yes for your real usage that is probably required. But for providing a minimal reproducable example you should delete everything that isn't required to reproduce the bug. - Since you are using
stream=True, have you tried usingwith requests.get(stream=True) as r(see the docs) - You imported the
timeitmodule, but you just use it to return the current time at two points. Why not just usetime.time()? You're printing out time values. What times are you expecting? What times are you observing? - That specific URL returns a body of length 0, and an HTTP 301 redirect status. The
requestslibrary follows the redirect (I think). Can you reproduce this issue with a URL that isn't redirected? In fact that URL redirects in my browser to this URL, which actually returns an HTTP 202 status. That's very strange. Can you reproduce this error with a normal URL? (e.g. a URL to a JPEG of a cat?)
Here's a MWE (except it doesn't reproduce the issue). (It could be reduced even further, depending on what the issue is.)
import multiprocessing
import time
import sys
from lambda_multiprocessing import Pool as LambdaPool
import requests
def CreatePageWithWatermark(body: bytes, url):
time.sleep(1)
ret = (len(body), url)
print(f"Returning ({type(ret[0])}, {type(ret[1])})")
return ret
def PDFMergeWorker(event={}, context=None):
url='https://example.com/'
r = requests.get(url)
pages_data = [(
r.content,
url
) for _ in range(0,2)]
with LambdaPool() as executor:
rendered_pages = executor.starmap(CreatePageWithWatermark, pages_data)
expected_length = len(requests.get(url).content)
expected = [(expected_length, url) for _ in pages_data]
assert(rendered_pages == expected)
if __name__ == '__main__':
PDFMergeWorker()