使用Invoke调用函数,时成功时失败.
我使用Eval声明了一个方法,然后通过ctx.Invoke 调用它,时而成功,时而失败.
reqRT, err := DefaultContext.Eval((request) => { + l.config.ReqJsScript + return request;})
defer reqRT.Free()
if err != nil || DefaultContext.Exception() != nil {
log.Error(err)
log.Error(DefaultContext.Exception())
}
reqVal := DefaultContext.Object()
defer reqVal.Free()
headersVal := DefaultContext.Map()
defer headersVal.Free()
for key, _ := range ctx.Request().Header().Headers() {
headersVal.Put(DefaultContext.String(key), DefaultContext.String(ctx.Request().Header().GetHeader(key)))
}
reqVal.Set("headers", headersVal.ToValue())
RawBody, _ := ctx.Request().Body().RawBody()
reqVal.Set("body", DefaultContext.String(string(RawBody)))
reqVal.Set("uri", DefaultContext.String(ctx.Request().URI().Path()))
request := DefaultContext.Invoke(reqRT, DefaultContext.Null(), reqVal) defer request.Free() log.Info(request.PropertyNames()) log.Info(request.IsObject())
request.Get("headers").ToMap().ForEach(func(key, value quickjs.Value) { ctx.Proxy().Header().SetHeader(key.String(), value.String()) }) contentType, _, _ := mime.ParseMediaType(ctx.Proxy().Body().ContentType()) ctx.Proxy().Body().SetRaw(contentType, []byte(request.Get("body").String())) ctx.Proxy().URI().SetRawQuery(request.Get("uri").String()) 失败的时候会在request.Get 时,报空指针异常. 这是什么原因
其中DefaultContext 是全局变量
请注意 quickjs context 并非线程安全,你需要在每次请求创建新的 runtime 和 context,更好的做法是使用线程池,需要时从池中取~
原来如此,我去试试感谢
我编写了一个工作池 `var taskQueue = make(chan *Task, 100) type Task struct { id string script string reqJSBody *JSBody respJSBody chan *JSBody }
func WorkPoolInit() { // 启动 Worker Goroutine 处理任务 for i := 0; i < 10; i++ { go Worker(i, taskQueue) } }
func Worker(id int, taskQueue <-chan Task) { rt := quickjs.NewRuntime( quickjs.WithExecuteTimeout(30), quickjs.WithMemoryLimit(1010241024), quickjs.WithGCThreshold(2581024), quickjs.WithMaxStackSize(65534), quickjs.WithCanBlock(true), quickjs.WithModuleImport(true), ) ctx := rt.NewContext() defer ctx.Close() num := 0 for task := range taskQueue { runJS(task, ctx) } }
func AddTask(jsBody *JSBody, jsScript string, id string) *JSBody { task := &Task{ id: id, reqJSBody: jsBody, respJSBody: make(chan *JSBody, 1), script: jsScript, } taskQueue <- task return <-task.respJSBody }
func runJS(task *Task, ctx *quickjs.Context) { result := &JSBody{ Header: make(map[string]string), } defer func(result *JSBody) { if err := recover(); err != nil { log.Errorf("js脚本运行失败:%v", err) result.Error = fmt.Errorf("%v", err) } task.respJSBody <- result }(result)
reqRT, err := ctx.Eval(`(request) => {` + task.script + `return request;}`)
defer reqRT.Free()
errorStr := fmt.Sprint(ctx.Exception())
if err != nil || (errorStr != "" && errorStr != "<nil>") {
panic(fmt.Sprintf("代码加载失败:%v:%s", err, errorStr))
}
reqVal := ctx.Object()
// defer reqVal.Free()
headersVal := ctx.Map()
// defer headersVal.Free()
for key, value := range task.reqJSBody.Header {
headersVal.Put(ctx.String(key), ctx.String(value))
}
reqVal.Set("headers", headersVal.ToValue())
reqVal.Set("body", ctx.String(task.reqJSBody.Body))
reqVal.Set("uri", ctx.String(task.reqJSBody.URI))
request := ctx.Invoke(reqRT, ctx.Null(), reqVal)
defer request.Free()
errorStr = fmt.Sprint(ctx.Exception())
if errorStr != "" && errorStr != "<nil>" {
panic(errorStr)
}
request.Get("headers").ToMap().ForEach(func(key, value quickjs.Value) {
result.Header[key.String()] = value.String()
})
result.Body = request.Get("body").String()
result.URI = request.Get("uri").String()
} ` 我使用 fortio -qps 300000 -c 30 进行压测, 几轮之后,报下面错误,程序异常退出,这预估是什么原因导致的,我应该怎么排查一下那 SIGSEGV: segmentation violation PC=0x34bdc0a m=14 sigcode=1 signal arrived during cgo execution
goroutine 213 [syscall, locked to thread]: runtime.cgocall(0x34742d3, 0xc000e433f0) /usr/lib/go-1.21/src/runtime/cgocall.go:157 +0x4b fp=0xc000e433a0 sp=0xc000e43368 pc=0x1073bab github.com/buke/quickjs-go._Cfunc_JS_Eval(0x77865c001780, 0x77865c982b30, 0x7a, 0x77865ca249e0, 0x0 ) _cgo_gotypes.go:383 +0x4d fp=0xc000e433f0 sp= 0xc000e433a0 pc=0x328434d github.com/buke/quickjs-go.(*Context).Eval.func4(0xc000d8e000, 0x77865c982b30, {0xc000bfc280, 0x7a}, 0x77865ca249e0, 0x0) /workspaces/gateway-node/vendor/github.com/buke/quickjs-go/context.go: 345 +0x110 fp=0xc000e43490 sp=0xc000e433f0 pc=0x328edb0 github.com/buke/quickjs-go.(*Context).Eval(0xc000d8e000, { 0xc000bfc280, 0x7a}, {0x0, 0x0, 0x0}) /workspaces/gateway-node/vendor/github.com/buke/quickjs-go/context.go :345 +0x36b fp=0xc000e43650 sp= 0xc000e43490 pc=0x328eaab apinto/gateway/node/drivers/plugins/js-script.runJS(0xc006f26b10, 0xc000d8e000) /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:93 +0x27c fp=0xc000e43bf0 sp=0xc000e43650 pc= 0x32a34dc apinto/gateway/node/drivers/plugins/js-script.Worker( 0x8, 0xc000280c00)
/workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:46 + 0xa45 fp=0xc000e43fb0 sp=0xc000e43bf0 pc=0x32a2c65 apinto/gateway/node/drivers/plugins/js-script.WorkPoolInit.func1 () /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go: 66 +0x33 fp=0xc000e43fe0 sp=0xc000e43fb0 pc=0x32a3073 runtime.goexit() /usr/lib/go-1.21/src/runtime/asm_amd64.s:1650 +0x1 fp=0xc000e43fe8 sp= 0xc000e43fe0 pc=0x10ea321 created by apinto/gateway/node/drivers/plugins/js-script.WorkPoolInit in goroutine 156 /workspaces/gateway-node/drivers/plugins/js-script/work_pool.go:66 + 0x252
goroutine 1 [chan receive, 1 minutes]: runtime.gopark(0x3c7df50, 0xc0001459d8, 0xe, 0x17, 0x2) /usr/lib/go-1.21/src/runtime/proc.go:398 +0xfc fp=0xc000b8da48 sp=0xc000b8da18 pc=0x10b4bfc runtime.chanrecv (0xc000145980, 0xc000b07b70, 0x1) /usr/lib/go-1.21/src/runtime/chan.go:583 +0x2a9 fp=0xc000b8dac0 sp=0xc000b8da48 pc=0x1075bc9 runtime.chanrecv1(0xc000145980? , 0xc000b07be0?) /usr/lib/go-1.21/src/runtime/chan.go :442 +0x12 fp= 0xc000b8dae8 sp=0xc000b8dac0 pc=0x10758f2 apinto/gateway/node/eosc/process-worker.(*ProcessWorker).wait(0xc000a56ae0 ) /workspaces/gateway-node/eosc/process-worker/process.go:78 +0x1d9 fp=0xc000b8dc40 sp= 0xc000b8dae8 pc=0x346a499 apinto/gateway/node/eosc/process-worker.Process() /workspaces/gateway-node/eosc/process-worker/process.go:51 + 0x35d fp=0xc000b8dd90 sp= 0xc000b8dc40 pc=0x3469f7d main.ProcessWorker() /workspaces/gateway-node/app/apinto/worker.go:11 +0x14 fp=0xc000b8dda0 sp=0xc000b8dd90 pc=0x3473034
apinto/gateway/node/eosc/process.Run( ) /workspaces/gateway-node/eosc/process/process.go:102 + 0x187 fp=0xc000b8de70 sp=0xc000b8dda0 pc=0x193d6c7 main.main () /workspaces/gateway-node/app/apinto/main.go: 35 +0x1f fp=0xc000b8df68 sp=0xc000b8de70 pc=0x346e0bf runtime.main() /usr/lib/go-1.21/src/runtime/proc.go:267 + 0x267 fp=0xc000b8dfe0 sp=0xc000b8df68 pc=0x10b4787 runtime.goexit( ) /usr/lib/go-1.21/src/runtime/asm_amd64.s:1650 + 0x1 fp=0xc000b8dfe8 sp=0xc000b8dfe0 pc= 0x10ea321
goroutine 2 [force gc (idle), 1 minutes]: runtime.gopark(0x3c7e2b0, 0x4f7b420, 0x11, 0x14 , 0x1) /usr/lib/go-1.21/src/runtime/proc.go :398 +0xfc fp= 0xc00008ef80 sp=0xc00008ef50 pc=0x10b4bfc
runtime.goparkunlock( 0x0?, 0x0? , 0x0?, 0x0?) /usr/lib/go-1.21/src/runtime/proc.go: 404 +0x25 fp=0xc00008efb0 sp=0xc00008ef80 pc=0x10b4c85 runtime.forcegchelper() /usr/lib/go-1.21/src/runtime/proc.go:322 +0xb5 fp=0xc00008efe0 sp= 0xc00008efb0 pc=0x10b4a15 runtime.goexit( )
建议是排除法吧, 1,先不要用工作池,尝试每次请求新建 runtime & context,看看是否还有报错 2,工作池参考 https://github.com/vijayviji/executor 改一个吧,我自己使用没什么问题,参考代码如下
package engine
import (
"fmt"
"runtime"
"strconv"
"sync/atomic"
"github.com/project-oryon/oryon/logger"
)
type TaskStatus int
const (
// TaskNotStarted - Task not started
TaskNotStarted TaskStatus = iota
// TaskStarted - Task is running
TaskStarted TaskStatus = iota
// TaskDone - Task is done
TaskDone TaskStatus = iota
)
type executorTask struct {
jsRpcReq *JsRpcReq
jsRpcResChan chan *JsRpcRes // channel used by the task to communicate the return value to the future.
taskStatus TaskStatus // task's current status
}
func (task *executorTask) run(th *executorThread, taskID uint64) {
logger.GetLogger().Debug(fmt.Sprintf("JsExecutor running task %d in thread %s", taskID, th.name))
task.taskStatus = TaskStarted
req, err := th.jsEngine.Call(task.jsRpcReq)
if err != nil {
logger.GetLogger().Error(fmt.Sprintf("Error in task %d in thread %s", taskID, th.name), "error", err)
}
task.jsRpcResChan <- req
if err := th.jsEngine.Reset(); err != nil {
logger.GetLogger().Error(fmt.Sprintf("Error in task %d in thread %s", taskID, th.name), "error", err)
}
task.taskStatus = TaskDone
}
// Gets the status of a task
func (task *executorTask) getStatus() TaskStatus {
return task.taskStatus
}
type Future struct {
task executorTask // task represented by this future
}
// Get - Gets the return value of a task this future is pointing to.
func (fut *Future) Get() *JsRpcRes {
result := <-fut.task.jsRpcResChan
return result
}
// GetStatus - Gets the status of the task this future is pointing to.
func (fut *Future) GetStatus() interface{} {
return fut.task.getStatus()
}
type executorThread struct {
name string // name of this thread
jsEngineFactory JsEngineFactory
jsEngineOptions []JsEngineOption
jsEngine JavascriptEngine
taskQueue chan executorTask // task queue for this thread
quit chan bool // to signal to the thread to quit
quitDone chan bool // to signal to the executor that thread quitting is done.
}
// The main loop executed by the executor. Listens to the task queue and executes the tasks.
func (th *executorThread) startTaskLoop() {
runtime.LockOSThread() // lock the thread to the OS thread to prevent the scheduler from moving it around.
jsEngine := th.jsEngineFactory(th.jsEngineOptions...)
th.jsEngine = jsEngine
th.name = jsEngine.Name() + th.name
taskID := uint64(0)
for {
select {
case task := <-th.taskQueue:
// this is uint64. no need to wrap.
taskID++
task.run(th, taskID)
case <-th.quit:
jsEngine.Close()
// logger.GetLogger().Debug(fmt.Sprintf("Quitting jsExecutorThread %s", th.name))
th.quitDone <- true
return
}
}
}
type Executor struct {
// name string // name of this executor
threads []executorThread // threads backing this executor
nextThreadToQueue uint64 // next thread to queue the incoming task. RR policy.
}
func (ex *Executor) Shutdown() {
for _, th := range ex.threads {
th.quit <- true
}
// wait for all the threads to quit
for _, th := range ex.threads {
<-th.quitDone
}
logger.GetLogger().Debug("Quitting jsExecutor done")
}
func (ex *Executor) Call(jsRpcReq *JsRpcReq) *Future {
task := executorTask{
jsRpcResChan: make(chan *JsRpcRes, 1),
jsRpcReq: jsRpcReq,
taskStatus: TaskNotStarted,
}
val := atomic.AddUint64(&ex.nextThreadToQueue, 1)
val %= uint64(len(ex.threads))
ex.threads[val].taskQueue <- task
future := &Future{
task: task,
}
return future
}
// NewExecutorPool - Creates an executor backed by multiple threads.
// poolSize - Number of threads in the pool.
// qSize - Queue size of every thread in the pool.
func NewExecutorPool(poolSize int, queueSize int, jsEngineFactory JsEngineFactory, jsEngineOptions ...JsEngineOption) *Executor {
ex := &Executor{
nextThreadToQueue: 0,
}
for i := 0; i < poolSize; i++ {
th := executorThread{
/*
* syscall.Gettid() is not defined for mac os. Let's use simple counter and the name to define unique
* ID for executor.
*/
name: "-thread-" + strconv.FormatUint(uint64(i+1), 10),
jsEngineFactory: jsEngineFactory,
jsEngineOptions: jsEngineOptions,
taskQueue: make(chan executorTask, queueSize),
quit: make(chan bool),
quitDone: make(chan bool),
}
ex.threads = append(ex.threads, th)
go th.startTaskLoop()
}
logger.GetLogger().Debug(fmt.Sprintf("JsExecutor started with %d threads", poolSize))
return ex
}
刚才测了一下 每次请求新建 runtime & context 没有报错,不过 qps 才 100 哈哈,继续改造