quickjs-go icon indicating copy to clipboard operation
quickjs-go copied to clipboard

使用Invoke调用函数,时成功时失败.

Open Mr-TianLang opened this issue 1 year ago • 5 comments

我使用Eval声明了一个方法,然后通过ctx.Invoke 调用它,时而成功,时而失败. reqRT, err := DefaultContext.Eval((request) => { + l.config.ReqJsScript + return request;}) defer reqRT.Free() if err != nil || DefaultContext.Exception() != nil { log.Error(err) log.Error(DefaultContext.Exception()) } reqVal := DefaultContext.Object() defer reqVal.Free() headersVal := DefaultContext.Map() defer headersVal.Free() for key, _ := range ctx.Request().Header().Headers() { headersVal.Put(DefaultContext.String(key), DefaultContext.String(ctx.Request().Header().GetHeader(key))) } reqVal.Set("headers", headersVal.ToValue()) RawBody, _ := ctx.Request().Body().RawBody() reqVal.Set("body", DefaultContext.String(string(RawBody))) reqVal.Set("uri", DefaultContext.String(ctx.Request().URI().Path()))

request := DefaultContext.Invoke(reqRT, DefaultContext.Null(), reqVal) defer request.Free() log.Info(request.PropertyNames()) log.Info(request.IsObject())

request.Get("headers").ToMap().ForEach(func(key, value quickjs.Value) { ctx.Proxy().Header().SetHeader(key.String(), value.String()) }) contentType, _, _ := mime.ParseMediaType(ctx.Proxy().Body().ContentType()) ctx.Proxy().Body().SetRaw(contentType, []byte(request.Get("body").String())) ctx.Proxy().URI().SetRawQuery(request.Get("uri").String()) 失败的时候会在request.Get 时,报空指针异常. 这是什么原因

其中DefaultContext 是全局变量

Mr-TianLang avatar Aug 06 '24 09:08 Mr-TianLang

请注意 quickjs context 并非线程安全,你需要在每次请求创建新的 runtime 和 context,更好的做法是使用线程池,需要时从池中取~

buke avatar Aug 06 '24 09:08 buke

原来如此,我去试试感谢

Mr-TianLang avatar Aug 06 '24 11:08 Mr-TianLang

我编写了一个工作池 `var taskQueue = make(chan *Task, 100) type Task struct { id string script string reqJSBody *JSBody respJSBody chan *JSBody }

func WorkPoolInit() { // 启动 Worker Goroutine 处理任务 for i := 0; i < 10; i++ { go Worker(i, taskQueue) } }

func Worker(id int, taskQueue <-chan Task) { rt := quickjs.NewRuntime( quickjs.WithExecuteTimeout(30), quickjs.WithMemoryLimit(1010241024), quickjs.WithGCThreshold(2581024), quickjs.WithMaxStackSize(65534), quickjs.WithCanBlock(true), quickjs.WithModuleImport(true), ) ctx := rt.NewContext() defer ctx.Close() num := 0 for task := range taskQueue { runJS(task, ctx) } }

func AddTask(jsBody *JSBody, jsScript string, id string) *JSBody { task := &Task{ id: id, reqJSBody: jsBody, respJSBody: make(chan *JSBody, 1), script: jsScript, } taskQueue <- task return <-task.respJSBody }

func runJS(task *Task, ctx *quickjs.Context) { result := &JSBody{ Header: make(map[string]string), } defer func(result *JSBody) { if err := recover(); err != nil { log.Errorf("js脚本运行失败:%v", err) result.Error = fmt.Errorf("%v", err) } task.respJSBody <- result }(result)

reqRT, err := ctx.Eval(`(request) => {` + task.script + `return request;}`)
defer reqRT.Free()
errorStr := fmt.Sprint(ctx.Exception())
if err != nil || (errorStr != "" && errorStr != "<nil>") {
	panic(fmt.Sprintf("代码加载失败:%v:%s", err, errorStr))
}

reqVal := ctx.Object()
// defer reqVal.Free()
headersVal := ctx.Map()
// defer headersVal.Free()
for key, value := range task.reqJSBody.Header {
	headersVal.Put(ctx.String(key), ctx.String(value))
}
reqVal.Set("headers", headersVal.ToValue())
reqVal.Set("body", ctx.String(task.reqJSBody.Body))
reqVal.Set("uri", ctx.String(task.reqJSBody.URI))

request := ctx.Invoke(reqRT, ctx.Null(), reqVal)
defer request.Free()
errorStr = fmt.Sprint(ctx.Exception())
if errorStr != "" && errorStr != "<nil>" {
	panic(errorStr)
}

request.Get("headers").ToMap().ForEach(func(key, value quickjs.Value) {
	result.Header[key.String()] = value.String()
})
result.Body = request.Get("body").String()
result.URI = request.Get("uri").String()

} ` 我使用 fortio -qps 300000 -c 30 进行压测, 几轮之后,报下面错误,程序异常退出,这预估是什么原因导致的,我应该怎么排查一下那 SIGSEGV: segmentation violation PC=0x34bdc0a m=14 sigcode=1 signal arrived during cgo execution

goroutine 213 [syscall, locked to thread]: runtime.cgocall(0x34742d3, 0xc000e433f0) /usr/lib/go-1.21/src/runtime/cgocall.go:157 +0x4b fp=0xc000e433a0 sp=0xc000e43368 pc=0x1073bab github.com/buke/quickjs-go._Cfunc_JS_Eval(0x77865c001780, 0x77865c982b30, 0x7a, 0x77865ca249e0, 0x0 ) _cgo_gotypes.go:383 +0x4d fp=0xc000e433f0 sp= 0xc000e433a0 pc=0x328434d github.com/buke/quickjs-go.(*Context).Eval.func4(0xc000d8e000, 0x77865c982b30, {0xc000bfc280, 0x7a}, 0x77865ca249e0, 0x0) /workspaces/gateway-node/vendor/github.com/buke/quickjs-go/context.go: 345 +0x110 fp=0xc000e43490 sp=0xc000e433f0 pc=0x328edb0 github.com/buke/quickjs-go.(*Context).Eval(0xc000d8e000, { 0xc000bfc280, 0x7a}, {0x0, 0x0, 0x0}) /workspaces/gateway-node/vendor/github.com/buke/quickjs-go/context.go :345 +0x36b fp=0xc000e43650 sp= 0xc000e43490 pc=0x328eaab apinto/gateway/node/drivers/plugins/js-script.runJS(0xc006f26b10, 0xc000d8e000) /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:93 +0x27c fp=0xc000e43bf0 sp=0xc000e43650 pc= 0x32a34dc apinto/gateway/node/drivers/plugins/js-script.Worker( 0x8, 0xc000280c00)

/workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:46 + 0xa45 fp=0xc000e43fb0 sp=0xc000e43bf0 pc=0x32a2c65 apinto/gateway/node/drivers/plugins/js-script.WorkPoolInit.func1 () /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go: 66 +0x33 fp=0xc000e43fe0 sp=0xc000e43fb0 pc=0x32a3073 runtime.goexit() /usr/lib/go-1.21/src/runtime/asm_amd64.s:1650 +0x1 fp=0xc000e43fe8 sp= 0xc000e43fe0 pc=0x10ea321 created by apinto/gateway/node/drivers/plugins/js-script.WorkPoolInit in goroutine 156 /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:66 + 0x252

goroutine 1 [chan receive, 1 minutes]: runtime.gopark(0x3c7df50, 0xc0001459d8, 0xe, 0x17, 0x2) /usr/lib/go-1.21/src/runtime/proc.go:398 +0xfc fp=0xc000b8da48 sp=0xc000b8da18 pc=0x10b4bfc runtime.chanrecv (0xc000145980, 0xc000b07b70, 0x1) /usr/lib/go-1.21/src/runtime/chan.go:583 +0x2a9 fp=0xc000b8dac0 sp=0xc000b8da48 pc=0x1075bc9 runtime.chanrecv1(0xc000145980? , 0xc000b07be0?) /usr/lib/go-1.21/src/runtime/chan.go :442 +0x12 fp= 0xc000b8dae8 sp=0xc000b8dac0 pc=0x10758f2 apinto/gateway/node/eosc/process-worker.(*ProcessWorker).wait(0xc000a56ae0 ) /workspaces/gateway-node/eosc/process-worker/process.go:78 +0x1d9 fp=0xc000b8dc40 sp= 0xc000b8dae8 pc=0x346a499 apinto/gateway/node/eosc/process-worker.Process() /workspaces/gateway-node/eosc/process-worker/process.go:51 + 0x35d fp=0xc000b8dd90 sp= 0xc000b8dc40 pc=0x3469f7d main.ProcessWorker() /workspaces/gateway-node/app/apinto/worker.go:11 +0x14 fp=0xc000b8dda0 sp=0xc000b8dd90 pc=0x3473034

apinto/gateway/node/eosc/process.Run( ) /workspaces/gateway-node/eosc/process/process.go:102 + 0x187 fp=0xc000b8de70 sp=0xc000b8dda0 pc=0x193d6c7 main.main () /workspaces/gateway-node/app/apinto/main.go: 35 +0x1f fp=0xc000b8df68 sp=0xc000b8de70 pc=0x346e0bf runtime.main() /usr/lib/go-1.21/src/runtime/proc.go:267 + 0x267 fp=0xc000b8dfe0 sp=0xc000b8df68 pc=0x10b4787 runtime.goexit( ) /usr/lib/go-1.21/src/runtime/asm_amd64.s:1650 + 0x1 fp=0xc000b8dfe8 sp=0xc000b8dfe0 pc= 0x10ea321

goroutine 2 [force gc (idle), 1 minutes]: runtime.gopark(0x3c7e2b0, 0x4f7b420, 0x11, 0x14 , 0x1) /usr/lib/go-1.21/src/runtime/proc.go :398 +0xfc fp= 0xc00008ef80 sp=0xc00008ef50 pc=0x10b4bfc

runtime.goparkunlock( 0x0?, 0x0? , 0x0?, 0x0?) /usr/lib/go-1.21/src/runtime/proc.go: 404 +0x25 fp=0xc00008efb0 sp=0xc00008ef80 pc=0x10b4c85 runtime.forcegchelper() /usr/lib/go-1.21/src/runtime/proc.go:322 +0xb5 fp=0xc00008efe0 sp= 0xc00008efb0 pc=0x10b4a15 runtime.goexit( )

Mr-TianLang avatar Aug 07 '24 09:08 Mr-TianLang

建议是排除法吧, 1,先不要用工作池,尝试每次请求新建 runtime & context,看看是否还有报错 2,工作池参考 https://github.com/vijayviji/executor 改一个吧,我自己使用没什么问题,参考代码如下

package engine

import (
	"fmt"
	"runtime"
	"strconv"
	"sync/atomic"

	"github.com/project-oryon/oryon/logger"
)

type TaskStatus int

const (
	// TaskNotStarted - Task not started
	TaskNotStarted TaskStatus = iota

	// TaskStarted - Task is running
	TaskStarted TaskStatus = iota

	// TaskDone - Task is done
	TaskDone TaskStatus = iota
)

type executorTask struct {
	jsRpcReq     *JsRpcReq
	jsRpcResChan chan *JsRpcRes // channel used by the task to communicate the return value to the future.
	taskStatus   TaskStatus     // task's current status
}

func (task *executorTask) run(th *executorThread, taskID uint64) {
	logger.GetLogger().Debug(fmt.Sprintf("JsExecutor running task %d in thread %s", taskID, th.name))

	task.taskStatus = TaskStarted

	req, err := th.jsEngine.Call(task.jsRpcReq)
	if err != nil {
		logger.GetLogger().Error(fmt.Sprintf("Error in task %d in thread %s", taskID, th.name), "error", err)
	}

	task.jsRpcResChan <- req

	if err := th.jsEngine.Reset(); err != nil {
		logger.GetLogger().Error(fmt.Sprintf("Error in task %d in thread %s", taskID, th.name), "error", err)
	}

	task.taskStatus = TaskDone
}

// Gets the status of a task
func (task *executorTask) getStatus() TaskStatus {
	return task.taskStatus
}

type Future struct {
	task executorTask // task represented by this future
}

// Get - Gets the return value of a task this future is pointing to.
func (fut *Future) Get() *JsRpcRes {
	result := <-fut.task.jsRpcResChan

	return result
}

// GetStatus - Gets the status of the task this future is pointing to.
func (fut *Future) GetStatus() interface{} {
	return fut.task.getStatus()
}

type executorThread struct {
	name            string // name of this thread
	jsEngineFactory JsEngineFactory
	jsEngineOptions []JsEngineOption
	jsEngine        JavascriptEngine
	taskQueue       chan executorTask // task queue for this thread
	quit            chan bool         // to signal to the thread to quit
	quitDone        chan bool         // to signal to the executor that thread quitting is done.
}

// The main loop executed by the executor. Listens to the task queue and executes the tasks.
func (th *executorThread) startTaskLoop() {
	runtime.LockOSThread() // lock the thread to the OS thread to prevent the scheduler from moving it around.

	jsEngine := th.jsEngineFactory(th.jsEngineOptions...)
	th.jsEngine = jsEngine
	th.name = jsEngine.Name() + th.name

	taskID := uint64(0)
	for {
		select {
		case task := <-th.taskQueue:
			// this is uint64. no need to wrap.
			taskID++

			task.run(th, taskID)

		case <-th.quit:
			jsEngine.Close()
			// logger.GetLogger().Debug(fmt.Sprintf("Quitting jsExecutorThread %s", th.name))
			th.quitDone <- true
			return
		}
	}
}

type Executor struct {
	// name              string           // name of this executor
	threads           []executorThread // threads backing this executor
	nextThreadToQueue uint64           // next thread to queue the incoming task. RR policy.
}

func (ex *Executor) Shutdown() {
	for _, th := range ex.threads {
		th.quit <- true
	}

	// wait for all the threads to quit
	for _, th := range ex.threads {
		<-th.quitDone
	}

	logger.GetLogger().Debug("Quitting jsExecutor done")
}

func (ex *Executor) Call(jsRpcReq *JsRpcReq) *Future {
	task := executorTask{
		jsRpcResChan: make(chan *JsRpcRes, 1),
		jsRpcReq:     jsRpcReq,
		taskStatus:   TaskNotStarted,
	}

	val := atomic.AddUint64(&ex.nextThreadToQueue, 1)
	val %= uint64(len(ex.threads))
	ex.threads[val].taskQueue <- task

	future := &Future{
		task: task,
	}
	return future
}

// NewExecutorPool - Creates an executor backed by multiple threads.
// poolSize - Number of threads in the pool.
// qSize - Queue size of every thread in the pool.
func NewExecutorPool(poolSize int, queueSize int, jsEngineFactory JsEngineFactory, jsEngineOptions ...JsEngineOption) *Executor {
	ex := &Executor{
		nextThreadToQueue: 0,
	}

	for i := 0; i < poolSize; i++ {

		th := executorThread{
			/*
			 * syscall.Gettid() is not defined for mac os. Let's use simple counter and the name to define unique
			 * ID for executor.
			 */
			name:            "-thread-" + strconv.FormatUint(uint64(i+1), 10),
			jsEngineFactory: jsEngineFactory,
			jsEngineOptions: jsEngineOptions,
			taskQueue:       make(chan executorTask, queueSize),
			quit:            make(chan bool),
			quitDone:        make(chan bool),
		}

		ex.threads = append(ex.threads, th)

		go th.startTaskLoop()
	}

	logger.GetLogger().Debug(fmt.Sprintf("JsExecutor started with %d threads", poolSize))

	return ex
}

buke avatar Aug 07 '24 10:08 buke

刚才测了一下 每次请求新建 runtime & context 没有报错,不过 qps 才 100 哈哈,继续改造

Mr-TianLang avatar Aug 07 '24 12:08 Mr-TianLang