From 36454c73edd829a46b459e295360932176ce36df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 15 May 2023 18:38:14 +0800 Subject: [PATCH] fix:add lock to pHandle for safety --- include/libs/wal/wal.h | 3 +- source/dnode/vnode/src/inc/tq.h | 57 ++++++++++++++++------------ source/dnode/vnode/src/tq/tq.c | 52 ++++++++++++++----------- source/dnode/vnode/src/tq/tqMeta.c | 6 ++- source/dnode/vnode/src/tq/tqOffset.c | 4 +- source/dnode/vnode/src/tq/tqRead.c | 2 + source/dnode/vnode/src/tq/tqUtil.c | 22 ++--------- source/libs/executor/src/executor.c | 3 ++ source/libs/wal/src/walRef.c | 22 +---------- 9 files changed, 80 insertions(+), 91 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 46dc179295..8d4f6af577 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -208,8 +208,7 @@ SWalRef *walRefCommittedVer(SWal *); SWalRef *walOpenRef(SWal *); void walCloseRef(SWal *pWal, int64_t refId); -int32_t walRefVer(SWalRef *, int64_t ver); -void walUnrefVer(SWalRef *); +int32_t walSetRefVer(SWalRef *, int64_t ver); // helper function for raft bool walLogExist(SWal *, int64_t ver); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index dece28de6b..5686bb256b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -46,23 +46,23 @@ typedef struct STqOffsetStore STqOffsetStore; // tqPush -typedef struct { - // msg info - int64_t consumerId; - int64_t reqOffset; - int64_t processedVer; - int32_t epoch; - // rpc info - int64_t reqId; - SRpcHandleInfo rpcInfo; - tmr_h timerId; - int8_t tmrStopped; - // exec - int8_t inputStatus; - int8_t execStatus; - SStreamQueue inputQ; - SRWLatch lock; -} STqPushHandle; +//typedef struct { +// // msg info +// int64_t consumerId; +// int64_t reqOffset; +// int64_t processedVer; +// int32_t epoch; +// // rpc info +// int64_t reqId; +// SRpcHandleInfo rpcInfo; +// tmr_h timerId; +// int8_t tmrStopped; +// // exec +// int8_t inputStatus; +// int8_t execStatus; +// SStreamQueue inputQ; +// SRWLatch lock; +//} STqPushHandle; // tqExec @@ -90,6 +90,11 @@ typedef struct { int32_t numOfCols; // number of out pout column, temporarily used } STqExecHandle; +typedef enum tq_handle_status{ + TMQ_HANDLE_STATUS_IDLE = 0, + TMQ_HANDLE_STATUS_EXEC = 1, +}tq_handle_status; + typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int64_t consumerId; @@ -98,18 +103,18 @@ typedef struct { int64_t snapshotVer; SWalReader* pWalReader; SWalRef* pRef; - STqPushHandle pushHandle; // push +// STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; int32_t noDataPollCnt; - int8_t exec; + tq_handle_status status; } STqHandle; -typedef struct { - SMqDataRsp* pDataRsp; - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - SRpcHandleInfo info; -} STqPushEntry; +//typedef struct { +// SMqDataRsp* pDataRsp; +// char subKey[TSDB_SUBSCRIBE_KEY_LEN]; +// SRpcHandleInfo info; +//} STqPushEntry; struct STQ { SVnode* pVnode; @@ -185,7 +190,9 @@ int32_t tqStreamTasksScanWal(STQ* pTq); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); -bool tqIsHandleExecuting(STqHandle* pHandle); +FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_STATUS_EXEC == pHandle->status; } +FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_EXEC;} +FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) {pHandle->status = TMQ_HANDLE_STATUS_IDLE;} #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f3da17553b..70ab29ecf3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -61,6 +61,9 @@ void tqCleanUp() { static void destroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); + if (pData->pRef) { + walCloseRef(pData->pWalReader->pWal, pData->pRef->refId); + } if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { taosMemoryFreeClear(pData->execHandle.execCol.qmsg); } else if (pData->execHandle.subType == TOPIC_SUB_TYPE__DB) { @@ -292,10 +295,13 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t } if (offset.val.type == TMQ_OFFSET__LOG) { + taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, offset.subKey, strlen(offset.subKey)); - if (pHandle && (walRefVer(pHandle->pRef, offset.val.version) < 0)) { + if (pHandle && (walSetRefVer(pHandle->pRef, offset.val.version) < 0)) { + taosWUnLockLatch(&pTq->lock); return -1; } + taosWUnLockLatch(&pTq->lock); } return 0; @@ -340,34 +346,38 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { STqOffsetVal reqOffset = req.reqOffset; int32_t vgId = TD_VID(pTq->pVnode); + taosWLockLatch(&pTq->lock); // 1. find handle STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; + taosWUnLockLatch(&pTq->lock); return -1; } + while (tqIsHandleExec(pHandle)) { + tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", consumerId, vgId, req.subKey); + taosMsleep(5); + } + tqSetHandleExec(pHandle); + taosWUnLockLatch(&pTq->lock); + // 2. check re-balance status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - taosRUnLockLatch(&pTq->lock); return -1; } - taosRUnLockLatch(&pTq->lock); // 3. update the epoch value - taosWLockLatch(&pTq->lock); int32_t savedEpoch = pHandle->epoch; if (savedEpoch < reqEpoch) { tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); pHandle->epoch = reqEpoch; } - taosWUnLockLatch(&pTq->lock); char buf[80]; tFormatOffset(buf, 80, &reqOffset); @@ -384,18 +394,13 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey); int32_t code = 0; + taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { - // walCloseRef(pHandle->pWalReader->pWal, pHandle->pRef->refId); - if (pHandle->pRef) { - walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); - } - - while (tqIsHandleExecuting(pHandle)) { + while (tqIsHandleExec(pHandle)) { tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); taosMsleep(5); } - code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (code != 0) { tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); @@ -410,6 +415,8 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (tqMetaDeleteHandle(pTq, pReq->subKey) < 0) { tqError("cannot process tq delete req %s, since no such offset in tdb", pReq->subKey); } + taosWUnLockLatch(&pTq->lock); + return 0; } @@ -456,6 +463,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey, req.oldConsumerId, req.newConsumerId); + taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { if (req.oldConsumerId != -1) { @@ -507,7 +515,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.pTqReader = tqReaderOpen(pVnode); pHandle->execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, (SSnapContext**)(&handle.sContext)); @@ -538,6 +546,11 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } else { + while (tqIsHandleExec(pHandle)) { + tqDebug("sub req vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); + taosMsleep(5); + } + if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); @@ -553,22 +566,17 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (pTaskInfo != NULL) { qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); } - - taosWLockLatch(&pTq->lock); - // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, pHandle); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pTaskInfo); } - - taosWUnLockLatch(&pTq->lock); + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); goto end; } end: + taosWUnLockLatch(&pTq->lock); taosMemoryFree(req.qmsg); return ret; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index f3ecaa08f6..5654147b6d 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -54,7 +54,7 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle) { if (tDecodeCStrAlloc(pDecoder, &pHandle->execHandle.execCol.qmsg) < 0) return -1; } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->execHandle.execDb.pFilterOutTbUid = - taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); + taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); int32_t size = 0; if (tDecodeI32(pDecoder, &size) < 0) return -1; for (int32_t i = 0; i < size; i++) { @@ -295,7 +295,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { code = -1; goto end; } - walRefVer(handle.pRef, handle.snapshotVer); + walSetRefVer(handle.pRef, handle.snapshotVer); SReadHandle reader = { .meta = pTq->pVnode->pMeta, @@ -352,7 +352,9 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { handle.execHandle.task = qCreateQueueExecTaskInfo(NULL, &reader, vgId, NULL, 0); } tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle.subKey, handle.consumerId, vgId); + taosWLockLatch(&pTq->lock); taosHashPut(pTq->pHandle, pKey, kLen, &handle, sizeof(STqHandle)); + taosWUnLockLatch(&pTq->lock); } end: diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 34e93cec2d..377a5d1887 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -78,13 +78,15 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname) { // todo remove this if (offset.val.type == TMQ_OFFSET__LOG) { + taosWLockLatch(&pStore->pTq->lock); STqHandle* pHandle = taosHashGet(pStore->pTq->pHandle, offset.subKey, strlen(offset.subKey)); if (pHandle) { - if (walRefVer(pHandle->pRef, offset.val.version) < 0) { + if (walSetRefVer(pHandle->pRef, offset.val.version) < 0) { // tqError("vgId: %d, tq handle %s ref ver %" PRId64 "error", pStore->pTq->pVnode->config.vgId, pHandle->subKey, // offset.val.version); } } + taosWUnLockLatch(&pStore->pTq->lock); } taosMemoryFree(pMemBuf); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1fbdb25528..0a49b3c441 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1017,6 +1017,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle + taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1073,6 +1074,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } + taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index c5bf4268a7..f424f45a29 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,8 +162,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -bool tqIsHandleExecuting(STqHandle* pHandle) { return 1 == atomic_load_8(&pHandle->exec); } - static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; @@ -173,12 +171,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); - while(tqIsHandleExecuting(pHandle)){ - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); - taosMsleep(5); - } - atomic_store_8(&pHandle->exec, 1); - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { @@ -193,9 +185,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // lock taosWLockLatch(&pTq->lock); code = tqRegisterPushHandle(pTq, pHandle, pMsg); + tqSetHandleIdle(pHandle); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); - atomic_store_8(&pHandle->exec, 0); return code; } else{ @@ -214,7 +206,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); tDeleteSMqDataRsp(&dataRsp); } - atomic_store_8(&pHandle->exec, 0); + taosWLockLatch(&pTq->lock); + tqSetHandleIdle(pHandle); + taosWUnLockLatch(&pTq->lock); return code; } @@ -228,13 +222,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); - while(tqIsHandleExecuting(pHandle)){ - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 5ms and retry", vgId, pHandle->subKey); - taosMsleep(5); - } - - atomic_store_8(&pHandle->exec, 1); - if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { code = -1; @@ -329,7 +316,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - atomic_store_8(&pHandle->exec, 0); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 8bbbd3524d..a02abd1728 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1060,7 +1060,10 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ // if offset version is small than first version , let's seek to first version + taosThreadMutexLock(&((SWalReader*)pWalReader)->pWal->mutex); int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + taosThreadMutexUnlock(&((SWalReader*)pWalReader)->pWal->mutex); + if (pOffset->version + 1 < firstVer){ pOffset->version = firstVer - 1; } diff --git a/source/libs/wal/src/walRef.c b/source/libs/wal/src/walRef.c index 4d451db0c0..6aba661926 100644 --- a/source/libs/wal/src/walRef.c +++ b/source/libs/wal/src/walRef.c @@ -45,7 +45,7 @@ void walCloseRef(SWal *pWal, int64_t refId) { taosMemoryFree(pRef); } -int32_t walRefVer(SWalRef *pRef, int64_t ver) { +int32_t walSetRefVer(SWalRef *pRef, int64_t ver) { SWal *pWal = pRef->pWal; wDebug("vgId:%d, wal ref version %" PRId64 ", refId %" PRId64, pWal->cfg.vgId, ver, pRef->refId); if (pRef->refVer != ver) { @@ -57,26 +57,12 @@ int32_t walRefVer(SWalRef *pRef, int64_t ver) { } pRef->refVer = ver; - // bsearch in fileSet - // SWalFileInfo tmpInfo; - // tmpInfo.firstVer = ver; - // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - // ASSERT(pRet != NULL); - // pRef->refFile = pRet->firstVer; - taosThreadMutexUnlock(&pWal->mutex); } return 0; } -#if 1 -void walUnrefVer(SWalRef *pRef) { - pRef->refId = -1; - // pRef->refFile = -1; -} -#endif - SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { if (pRef == NULL) { pRef = walOpenRef(pWal); @@ -87,12 +73,6 @@ SWalRef *walRefFirstVer(SWal *pWal, SWalRef *pRef) { taosThreadMutexLock(&pWal->mutex); int64_t ver = walGetFirstVer(pWal); pRef->refVer = ver; - // bsearch in fileSet - // SWalFileInfo tmpInfo; - // tmpInfo.firstVer = ver; - // SWalFileInfo *pRet = taosArraySearch(pWal->fileInfoSet, &tmpInfo, compareWalFileInfo, TD_LE); - // ASSERT(pRet != NULL); - // pRef->refFile = pRet->firstVer; taosThreadMutexUnlock(&pWal->mutex); wDebug("vgId:%d, wal ref version %" PRId64 " for first", pWal->cfg.vgId, ver);