From cd105381a9a3f10c0e14af2386ea5c5e259981bf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 12 May 2023 11:41:59 +0800 Subject: [PATCH] fix:[TD-24111]avoid exec pHandle task in multi query thread --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b2223f9d64..85415fbe34 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -102,7 +102,7 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; int32_t noDataPollCnt; - int8_t sendRsp; + int8_t exec; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 128f83bf53..32819a5924 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -163,7 +163,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } static bool isHandleExecuting(STqHandle* pHandle){ - return 0 == atomic_load_8(&pHandle->sendRsp); + return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, @@ -185,7 +185,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -203,7 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } else{ @@ -224,7 +224,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } @@ -248,7 +248,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -345,7 +345,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead);