diff --git a/src/vnode/inc/vnodeCancel.h b/src/vnode/inc/vnodeCancel.h index 7459e0707c..32096739ac 100644 --- a/src/vnode/inc/vnodeCancel.h +++ b/src/vnode/inc/vnodeCancel.h @@ -24,7 +24,7 @@ extern "C" { int32_t vnodeInitCWorker(); void vnodeCleanupCWorker(); -int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead); +int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead); #ifdef __cplusplus } diff --git a/src/vnode/src/vnodeCancel.c b/src/vnode/src/vnodeCancel.c index 2239e38474..5f422d798c 100644 --- a/src/vnode/src/vnodeCancel.c +++ b/src/vnode/src/vnodeCancel.c @@ -120,17 +120,21 @@ void vnodeCleanupCWorker() { vnodeStopCWorker(); } -int32_t vnodeWriteIntoCQueue(SVReadMsg *pRead) { - vTrace("msg:%p, write into vcqueue", pRead); +int32_t vnodeWriteIntoCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) { + atomic_add_fetch_32(&pVnode->refCount, 1); + pRead->pVnode = pVnode; + + vTrace("vgId:%d, write into vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); return taosWriteQitem(tsVCWorkerQueue, pRead->qtype, pRead); } -static void vnodeFreeFromCQueue(SVReadMsg *pRead) { - vTrace("msg:%p, free from vcqueue", pRead); +static void vnodeFreeFromCQueue(SVnodeObj *pVnode, SVReadMsg *pRead) { + vTrace("vgId:%d, free from vcqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedRMsg); taosFreeQitem(pRead); + vnodeRelease(pVnode); } -static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) { +static void vnodeSendVCancelRpcRsp(SVnodeObj *pVnode, SVReadMsg *pRead, int32_t code) { SRpcMsg rpcRsp = { .handle = pRead->rpcHandle, .pCont = pRead->rspRet.rsp, @@ -139,7 +143,7 @@ static void vnodeSendVCancelRpcRsp(SVReadMsg *pRead, int32_t code) { }; rpcSendResponse(&rpcRsp); - vnodeFreeFromCQueue(pRead); + vnodeFreeFromCQueue(pVnode, pRead); } static void *vnodeCWorkerFunc(void *param) { @@ -153,13 +157,12 @@ static void *vnodeCWorkerFunc(void *param) { break; } - vTrace("msg:%p will be processed in vcworker queue", pRead); - assert(qtype == TAOS_QTYPE_RPC); assert(pVnode == NULL); - - int32_t code = vnodeProcessRead(NULL, pRead); - vnodeSendVCancelRpcRsp(pRead, code); + assert(pRead->pVnode != NULL); + + int32_t code = vnodeProcessRead(pRead->pVnode, pRead); + vnodeSendVCancelRpcRsp(pRead->pVnode, pRead, code); } return NULL; diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index f7a7afd9db..ec5ba8f352 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -116,9 +116,9 @@ int32_t vnodeWriteToRQueue(void *vparam, void *pCont, int32_t contLen, int8_t qt } pRead->qtype = qtype; - + if (pRead->msgType == TSDB_MSG_TYPE_CM_KILL_QUERY) { - return vnodeWriteIntoCQueue(pRead); + return vnodeWriteIntoCQueue(pVnode, pRead); } else { atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);