From a5a3d1d8ad86aa0ee43f443e98ce34359bfd0afb Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Fri, 25 Oct 2024 17:42:45 +0800 Subject: [PATCH] fix same timestamp written into same tsma res ctb --- include/common/tcommon.h | 1 + include/common/tmsgdef.h | 1 + source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/src/mndStb.c | 74 ++++++++++++------ source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 6 +- source/dnode/vnode/src/tq/tqUtil.c | 4 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 86 +++++++++++++++++++++ source/libs/wal/src/walRead.c | 2 +- 11 files changed, 149 insertions(+), 31 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index ea764e6760..9572bd7aad 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -155,6 +155,7 @@ typedef enum EStreamType { STREAM_MID_RETRIEVE, STREAM_PARTITION_DELETE_DATA, STREAM_GET_RESULT, + STREAM_DELETE_GROUP_DATA, } EStreamType; #pragma pack(push, 1) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index c22a3da5ad..c81d649284 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -316,6 +316,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_TSMA_CTB, "vnode-drop-tsma-ctb", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_VND_MSG) TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 0d804eadf0..4b79ecf43a 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 006f44b349..a0356a6c4d 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -1014,6 +1014,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3725d3a3fc..a52f0656dd 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -96,6 +96,7 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma); mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_DROP_TSMA_CTB_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); @@ -4088,7 +4089,8 @@ typedef struct SMDropTbTsmaInfos { typedef struct SMndDropTbsWithTsmaCtx { SHashObj *pTsmaMap; // SHashObj *pDbMap; // - SHashObj *pVgMap; // + SHashObj *pVgMap; // , only for non tsma result child table + SHashObj *pTsmaTbVgMap; // , only for tsma result child table SArray *pResTbNames; // SArray } SMndDropTbsWithTsmaCtx; @@ -4129,6 +4131,16 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { } taosHashCleanup(p->pVgMap); } + + if (p->pTsmaTbVgMap) { + void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL); + while (pIter) { + SVDropTbVgReqs *pReqs = pIter; + taosArrayDestroy(pReqs->req.pArray); + pIter = taosHashIterate(p->pTsmaTbVgMap, pIter); + } + taosHashCleanup(p->pTsmaTbVgMap); + } taosMemoryFree(p); } @@ -4154,6 +4166,12 @@ static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) { code = terrno; goto _end; } + + pCtx->pTsmaTbVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); + if (!pCtx->pTsmaTbVgMap) { + code = terrno; + goto _end; + } *ppCtx = pCtx; _end: if (code) mndDestroyDropTbsWithTsmaCtx(pCtx); @@ -4192,16 +4210,38 @@ static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, con } static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont, - int32_t contLen) { + int32_t contLen, tmsg_t msgType) { STransAction action = {0}; action.epSet = pVgReqs->info.epSet; action.pCont = pCont; action.contLen = contLen; - action.msgType = TDMT_VND_DROP_TABLE; + action.msgType = msgType; action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST; return mndTransAppendRedoAction(pTrans, &action); } +static int32_t mndBuildDropTbRedoActions(SMnode* pMnode, STrans* pTrans, SHashObj* pVgMap, tmsg_t msgType) { + int32_t code = 0; + void* pIter = taosHashIterate(pVgMap, NULL); + while (pIter) { + const SVDropTbVgReqs *pVgReqs = pIter; + int32_t len = 0; + void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len); + if (!p) { + taosHashCancelIterate(pVgMap, pIter); + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + break; + } + if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) { + taosHashCancelIterate(pVgMap, pIter); + break; + } + pIter = taosHashIterate(pVgMap, pIter); + } + return code; +} + static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) { int32_t code = 0; SMnode *pMnode = pRsp->info.node; @@ -4216,23 +4256,9 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); - void *pIter = taosHashIterate(pCtx->pVgMap, NULL); - while (pIter) { - const SVDropTbVgReqs *pVgReqs = pIter; - int32_t len = 0; - void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len); - if (!p) { - taosHashCancelIterate(pCtx->pVgMap, pIter); - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - goto _OVER; - } - if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len)) != 0) { - taosHashCancelIterate(pCtx->pVgMap, pIter); - goto _OVER; - } - pIter = taosHashIterate(pCtx->pVgMap, pIter); - } + if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; + if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER; + if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; _OVER: @@ -4260,7 +4286,8 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) { code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId); if (code) goto _OVER; } - if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) { + code = mndCreateDropTbsTxnPrepare(pReq, pCtx); + if (code == 0) { code = TSDB_CODE_ACTION_IN_PROGRESS; } _OVER: @@ -4445,7 +4472,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWith code = terrno; goto _end; } - TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end); + TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pTsmaTbVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end); } } _end: @@ -4476,7 +4503,8 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) { code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId); if (code) goto _end; - if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + code = mndCreateDropTbsTxnPrepare(pRsp, pCtx); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _end: if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); tDecoderClear(&decoder); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 653b47ff14..bdb94d4a6e 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -146,7 +146,7 @@ int32_t tqBuildFName(char** data, const char* path, char* name); int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name); // tq util -int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t type, int64_t sver, int64_t ever); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 80c04a3276..bbc58004d9 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1551,7 +1551,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA _resume_delete: version = RSMA_EXEC_MSG_VER(msg); if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, - &packData.pDataBlock, 1))) { + &packData.pDataBlock, 1, STREAM_DELETE_DATA))) { taosFreeQitem(msg); TAOS_CHECK_EXIT(code); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 95955e579f..c8c34b10a4 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -363,11 +363,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con tqError("%s failed to create data submit for stream since out of memory", id); return code; } - } else if (pCont->msgType == TDMT_VND_DELETE) { + } else if (pCont->msgType == TDMT_VND_DELETE || pCont->msgType == TDMT_VND_DROP_TSMA_CTB) { void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); int32_t len = pCont->bodyLen - sizeof(SMsgHead); - - code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0); + EStreamType blockType = pCont->msgType == TDMT_VND_DELETE ? STREAM_DELETE_DATA : STREAM_DELETE_GROUP_DATA; + code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType); if (code == TSDB_CODE_SUCCESS) { if (*pItem == NULL) { tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index e066938fc0..54b063d692 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -572,7 +572,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* return 0; } -int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { +int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) { int32_t code = 0; int32_t line = 0; SDecoder* pCoder = &(SDecoder){0}; @@ -593,7 +593,7 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void* } SSDataBlock* pDelBlock = NULL; - code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); + code = createSpecialDataBlock(blockType, &pDelBlock); TSDB_CHECK_CODE(code, line, END); code = blockDataEnsureCapacity(pDelBlock, numOfTables); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dd13c975cf..1ea2aac809 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -50,6 +50,8 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); +static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, + SRpcMsg *pOriginRpc); static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); @@ -481,6 +483,75 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) { return code; } +static int32_t vnodePreProcessDropTSmaCtbMsg(SVnode *pVnode, SRpcMsg *pMsg) { + SVDropTbBatchReq dropReq = {0}; + int32_t code = 0; + int32_t lino = 0; + SDecoder dc = {0}; + SEncoder ec = {0}; + int32_t nTbs = 0; + SDeleteRes res = {0}; + int32_t size = 0; + uint8_t *pCont = NULL; + tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + if (tDecodeSVDropTbBatchReq(&dc, &dropReq) < 0) { + code = TSDB_CODE_INVALID_MSG; + TSDB_CHECK_CODE(code, lino, _exit); + } + nTbs = dropReq.nReqs; + res.skey = INT64_MIN; + res.ekey = INT64_MAX; + res.affectedRows = 1; + res.uidList = taosArrayInit(nTbs, sizeof(tb_uid_t)); + if (!res.uidList) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + + vDebug("vnode preprocess drop tsma ctb, vgId:%d tb num: %d", TD_VID(pVnode), nTbs); + for (int32_t i = 0; i < nTbs; ++i) { + SVDeleteRsp rsp = {.affectedRows = 1}; + tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, dropReq.pReqs[i].name); + if (uid == 0) { + vWarn("vgId:%d, drop tsma ctb:%s not found", TD_VID(pVnode), dropReq.pReqs[i].name); + continue; + } + if (NULL == taosArrayPush(res.uidList, &uid)) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + } + + tEncodeSize(tEncodeDeleteRes, &res, size, code); + pCont = rpcMallocCont(size + sizeof(SMsgHead)); + if (!pCont) { + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + ((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead); + ((SMsgHead *)pCont)->vgId = TD_VID(pVnode); + + tEncoderInit(&ec, pCont + sizeof(SMsgHead), size); + code = tEncodeDeleteRes(&ec, &res); + tEncoderClear(&ec); + if (code != 0) { + vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__); + TSDB_CHECK_CODE(code, lino, _exit); + } + rpcFreeCont(pMsg->pCont); + pMsg->pCont = pCont; + pCont = NULL; + pMsg->contLen = size + sizeof(SMsgHead); + +_exit: + if (res.uidList) { + taosArrayDestroy(res.uidList); + } + tDecoderClear(&dc); + rpcFreeCont(pCont); + return code; +} + int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -507,6 +578,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_ARB_CHECK_SYNC: { code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg); } break; + case TDMT_VND_DROP_TSMA_CTB: { + code = vnodePreProcessDropTSmaCtbMsg(pVnode, pMsg); + } break; default: break; } @@ -711,6 +785,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_ARB_CHECK_SYNC: vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp); break; + case TDMT_VND_DROP_TSMA_CTB: + if (vnodeProcessDropTSmaCtbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) { + goto _err; + } + break; default: vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); return TSDB_CODE_INVALID_MSG; @@ -2512,3 +2591,10 @@ _OVER: int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } #endif + +static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp, + SRpcMsg *pOriginalMsg) { + pRsp->msgType = TDMT_VND_DROP_TSMA_CTB_RSP; + pRsp->code = TSDB_CODE_SUCCESS; + return pRsp->code; +} diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 610adfb0e1..9a3ea34eff 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { int32_t type = pReader->pHead->head.msgType; if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || - (IS_META_MSG(type) && pReader->cond.scanMeta)) { + (type == TDMT_VND_DROP_TSMA_CTB) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { TAOS_RETURN(walFetchBody(pReader)); } else { TAOS_CHECK_RETURN(walSkipFetchBody(pReader));