celery-progress icon indicating copy to clipboard operation
celery-progress copied to clipboard

Intermittent Issue with Starting of Celery Task

Open sindhujit1 opened this issue 1 year ago • 3 comments

I am seeing sometimes my celery task is not starting and fails on the task with the error:

.<task_name>

image

and sometime the task works just fine:

image

I do have 3 instances of celery running, and all of them are pointing to port 6379 of redis. I made sure my celery instances/workers are unique by specifying the -n parameter.

Can you help me understand if there is a setting I can tweak to avoid this ?

sindhujit1 avatar Jun 25 '24 20:06 sindhujit1

Hello. In this case it looks like the task itself is failing, and celery-progress is just reporting that back to you. I think you will have to dig into your own infrastructure to figure out what's causing the task to sometimes fail.

czue avatar Jun 26 '24 09:06 czue

I am following the exact sample example structure that you have provided. We are using redis as the message broker. Is it something with the header @shared_task(bind=True) that is causing the issue ?

sindhujit1 avatar Jun 27 '24 23:06 sindhujit1

That error message is not coming from calling the task. It is an error being raised inside the task. I would look at the code inside the task. It looks like it might be related to a network error?

czue avatar Jun 28 '24 07:06 czue

So this is the function which calls the task:

def create_oobsubnet(request):

    try:
        print(request.POST,"request.POST")
        print(dir(request))
        asnnumber = ''
        value_uplink_oob_devices,value_oobdevice_input,value_terminal_server,value_interfacePort_input = [],[],[],[]
        value_tile_location,finalerrorMessage = [],[]
        # countryname = re.sub(r'\s+', '', request.POST['countryname'])
        # sitename = re.sub(r'\s+', '', request.POST['sitename'])
        countryname = request.POST['countryname']
        sitename = request.POST['sitename']
        print(sitename)
        for each in request.POST.getlist('value_oobdevice_input[]'):
            value_oobdevice_input.append(each.strip())
        for each in request.POST.getlist('value_terminal_server[]'):
            value_terminal_server.append(each.strip())
        for i,each in enumerate(value_oobdevice_input):
            value_uplink_oob_devices.append(request.POST.getlist('value_uplink_oob_devices['+str(i)+'][]'))

        print(value_uplink_oob_devices,"value_uplink_oob_devices")
        for each in request.POST.getlist('value_interfacePort_input[]'):
            value_interfacePort_input.append(each.strip())
        for each in request.POST.getlist('value_tile_location[]'):
            value_tile_location.append(each.strip())
        value_description_input = request.POST.getlist('value_description_input[]')
        value_deviceenv = request.POST.getlist('value_deviceenv[]')
        subnetSize = request.POST.getlist('value_subnetsize[]')
        value_kind_subnet = request.POST.getlist('value_kind_subnet[]')
        value_kind_customer = request.POST.getlist('value_kind_customer[]')
        value_oob_device_type = request.POST.getlist('value_oob_device_type[]')
        ip_block = ''
        if request.POST['checkboxILO'] == '1':
            value_subnet_count = 2
        else:
            value_subnet_count = 1
        username_dceuser,password_dceuser,finalerrorMessage = getCreds_AppViewX(finalerrorMessage)
        username,password,finalerrorMessage = getCreds(sitename,finalerrorMessage)
        if countryname == 'LAB':
            offeringType = 'e0'
        else:
            offeringType = 'e3'

        admin_page_link = request.build_absolute_uri('?').split("/netadc3_2")[0] + "/netadc3_2/admin/campus_networkinventory/campussiteform/"
        admin_page_link_url = "netadc3_2/admin/campus_networkinventory/campussiteform/"
        ######## ip_block ########

        if CampusSiteForm.objects.values_list("oob_ip_dns_block",flat=True).filter(site_name=sitename).count() == 0:

            finalerrorMessage.append("IP Block is Missing. Campus Site Form Should be Filled Out Properly Under the admin page.Link - <a href='/"+admin_page_link_url+"'>"+admin_page_link+"</a>")
            totalDict = {'finalerrorMessage':finalerrorMessage}
            return HttpResponse(json.dumps(totalDict),content_type="application/json")


        ip_block_id = (list(CampusSiteForm.objects.values_list("oob_ip_dns_block",flat=True).filter(site_name=sitename)))[0]     
        ip_block = (list(IPBlockDNSMapping.objects.values_list("block_name",flat=True).filter(ip_block_id=ip_block_id)))[0]
        print(ip_block,"ip_block")
        ######## End ip_block ########

        if CampusSiteForm.objects.values_list("oob_asn",flat=True).filter(site_name=sitename).count() == 0:

            finalerrorMessage.append("Campus Site Form Should be Filled Out Under the admin page.Link - <a href='/"+admin_page_link_url+"'>"+admin_page_link+"</a>")
            totalDict = {'finalerrorMessage':finalerrorMessage}
            return HttpResponse(json.dumps(totalDict),content_type="application/json")
        
        else:
            asnnumber = (list(CampusSiteForm.objects.values_list("oob_asn",flat=True).filter(site_name=sitename)))[0]
        print(asnnumber,"asnnumber")
        result = oobsubnet_task.delay(20,countryname,sitename,value_oobdevice_input,value_description_input,value_terminal_server,
        value_uplink_oob_devices,value_interfacePort_input,value_tile_location,subnetSize,value_kind_subnet,value_kind_customer,value_subnet_count,
        value_oob_device_type,value_deviceenv,offeringType,asnnumber,ip_block,username,password,username_dceuser,password_dceuser,request.user.email)

        print(type(result.task_id),"type(result.task_id)")
        data = {'task_id': result.task_id}
        print(data,"data in 1st view",type(result.task_id))
        return HttpResponse(json.dumps(data),content_type="application/json")

    except Exception as e:
        exc_type, exc_obj, tb = sys.exc_info()
        f = tb.tb_frame
        lineno = tb.tb_lineno
        filename = f.f_code.co_filename
        linecache.checkcache(filename)
        line = linecache.getline(filename, lineno, f.f_globals)
        finalerrorMessage.append('OOB Subnet Creation Failed:' + str(e) + " Error On file "+filename + " Line Number "+str(lineno))
        print('in exception part')
        totalDict = {'finalerrorMessage':finalerrorMessage}
        print(totalDict)
        return HttpResponse(json.dumps(totalDict),content_type="application/json")

And the task itself is wrapped entirely in a try except block. So I am wondering if something failed inside the task, it would catch the exception and return back. But its just failing saying the task name as the error.

@shared_task(bind=True)
def oobsubnet_task(self, seconds, countryname, sitename, value_oobdevice_input, value_description_input, value_terminal_server,
				   value_uplink_oob_devices,
				   value_interfacePort_input, value_tile_location, subnetSize, value_kind_subnet, value_kind_customer,
				   value_subnet_count, value_oob_device_type, value_deviceenv,
				   offeringType, asnnumber,ip_block,username, password, username_dceuser, password_dceuser, user_email):
	changeRequestNumber, changeTaskNumber = '', ''
	try:
		## partdatacenter temprarory list it needs to get from oobsubnet form direct, for partdatacenter is hasn't value for now ##
		partdatacenter = []
		partdatacenter.append('LAB')
		
		################### Lots of code #####################################
except Exception as e:
		if changeRequestNumber:
			closure_result = 'success'
			# close_cmr_networkinventory(changeRequestNumber,changeTaskNumber,CMR_ENV,closure_result,user_email)
			close_cmr_networkinventory1(changeRequestNumber, changeTaskNumber, CMR_ENV, closure_result,
										okta_access_token)
			finalerrorMessage.append('Change Number Created through Automation ' + changeRequestNumber)
		print("**************** Ending Celery Task for OOBSubnet*************************")
		exc_type, exc_obj, tb = sys.exc_info()
		f = tb.tb_frame
		lineno = tb.tb_lineno
		filename = f.f_code.co_filename
		linecache.checkcache(filename)
		line = linecache.getline(filename, lineno, f.f_globals)
		finalerrorMessage.append(
			'OOB Subnet Creation Failed:' + str(e) + " Error On file " + filename + " Line Number " + str(lineno))
		# logger.error('OOB Subnet Creation Failed:' + str(e) + " Error On file "+filename + " Line Number "+str(lineno))
		print('in exception part')
		totalDict = {'finalerrorMessage': finalerrorMessage}
		return totalDict

sindhujit1 avatar Jun 30 '24 14:06 sindhujit1

The error never happens when I have only one task running. It happens when I have 2 or more tasks running. But these 2 tasks are in 2 different django applications . So I am not sure how it can affect it. Is it because I am using the same redis port 6379 ?

sindhujit1 avatar Jul 02 '24 17:07 sindhujit1

actually the line where it fails on is :

for key,value in taskresult.get().items():

sindhujit1 avatar Jul 02 '24 22:07 sindhujit1

oh, yeah, you may need to add a db number to your redis URLs.

redis://localhost:6379/0 redis://localhost:6379/1

etc. so they don't conflict with each other.

czue avatar Jul 03 '24 13:07 czue

ok thank you. Let me try that and test it out thoroughly.

sindhujit1 avatar Jul 03 '24 15:07 sindhujit1

This seems to have solved the issue. Issue can be closed. Thank you sir !

sindhujit1 avatar Jul 11 '24 19:07 sindhujit1