fix same timestamp written into same tsma res ctb

This commit is contained in:
wangjiaming0909 2024-10-25 17:42:45 +08:00
parent 9468216991
commit a5a3d1d8ad
11 changed files with 149 additions and 31 deletions

View File

@ -155,6 +155,7 @@ typedef enum EStreamType {
STREAM_MID_RETRIEVE, STREAM_MID_RETRIEVE,
STREAM_PARTITION_DELETE_DATA, STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT, STREAM_GET_RESULT,
STREAM_DELETE_GROUP_DATA,
} EStreamType; } EStreamType;
#pragma pack(push, 1) #pragma pack(push, 1)

View File

@ -316,6 +316,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_ARB_CHECK_SYNC, "vnode-arb-check-sync", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_FETCH_TTL_EXPIRED_TBS, "vnode-fetch-ttl-expired-tbs", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TABLE_NAME, "vnode-table-name", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_TSMA_CTB, "vnode-drop-tsma-ctb", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_VND_MSG) TD_CLOSE_MSG_SEG(TDMT_VND_MSG)
TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8 TD_NEW_MSG_SEG(TDMT_SCH_MSG) // 3<<8

View File

@ -182,6 +182,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TB_WITH_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TABLE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_ANAL_ALGO, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -1014,6 +1014,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_NAME, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TSMA_CTB, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -96,6 +96,7 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TB_WITH_TSMA, mndProcessDropTbWithTsma);
mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs); mndSetMsgHandle(pMnode, TDMT_VND_FETCH_TTL_EXPIRED_TBS_RSP, mndProcessFetchTtlExpiredTbs);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TABLE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TSMA_CTB_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
@ -4088,7 +4089,8 @@ typedef struct SMDropTbTsmaInfos {
typedef struct SMndDropTbsWithTsmaCtx { typedef struct SMndDropTbsWithTsmaCtx {
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos> SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo> SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs> SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>, only for non tsma result child table
SHashObj *pTsmaTbVgMap; // <vgid, SVDropTbVgReqs>, only for tsma result child table
SArray *pResTbNames; // SArray<char*> SArray *pResTbNames; // SArray<char*>
} SMndDropTbsWithTsmaCtx; } SMndDropTbsWithTsmaCtx;
@ -4129,6 +4131,16 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
} }
taosHashCleanup(p->pVgMap); taosHashCleanup(p->pVgMap);
} }
if (p->pTsmaTbVgMap) {
void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL);
while (pIter) {
SVDropTbVgReqs *pReqs = pIter;
taosArrayDestroy(pReqs->req.pArray);
pIter = taosHashIterate(p->pTsmaTbVgMap, pIter);
}
taosHashCleanup(p->pTsmaTbVgMap);
}
taosMemoryFree(p); taosMemoryFree(p);
} }
@ -4154,6 +4166,12 @@ static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
code = terrno; code = terrno;
goto _end; goto _end;
} }
pCtx->pTsmaTbVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (!pCtx->pTsmaTbVgMap) {
code = terrno;
goto _end;
}
*ppCtx = pCtx; *ppCtx = pCtx;
_end: _end:
if (code) mndDestroyDropTbsWithTsmaCtx(pCtx); if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
@ -4192,16 +4210,38 @@ static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, con
} }
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont, static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
int32_t contLen) { int32_t contLen, tmsg_t msgType) {
STransAction action = {0}; STransAction action = {0};
action.epSet = pVgReqs->info.epSet; action.epSet = pVgReqs->info.epSet;
action.pCont = pCont; action.pCont = pCont;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_DROP_TABLE; action.msgType = msgType;
action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST; action.acceptableCode = TSDB_CODE_TDB_TABLE_NOT_EXIST;
return mndTransAppendRedoAction(pTrans, &action); return mndTransAppendRedoAction(pTrans, &action);
} }
static int32_t mndBuildDropTbRedoActions(SMnode* pMnode, STrans* pTrans, SHashObj* pVgMap, tmsg_t msgType) {
int32_t code = 0;
void* pIter = taosHashIterate(pVgMap, NULL);
while (pIter) {
const SVDropTbVgReqs *pVgReqs = pIter;
int32_t len = 0;
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
if (!p) {
taosHashCancelIterate(pVgMap, pIter);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
break;
}
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
taosHashCancelIterate(pVgMap, pIter);
break;
}
pIter = taosHashIterate(pVgMap, pIter);
}
return code;
}
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) { static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
int32_t code = 0; int32_t code = 0;
SMnode *pMnode = pRsp->info.node; SMnode *pMnode = pRsp->info.node;
@ -4216,23 +4256,9 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
void *pIter = taosHashIterate(pCtx->pVgMap, NULL); if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
while (pIter) { if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER;
const SVDropTbVgReqs *pVgReqs = pIter; if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
int32_t len = 0;
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
if (!p) {
taosHashCancelIterate(pCtx->pVgMap, pIter);
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len)) != 0) {
taosHashCancelIterate(pCtx->pVgMap, pIter);
goto _OVER;
}
pIter = taosHashIterate(pCtx->pVgMap, pIter);
}
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
_OVER: _OVER:
@ -4260,7 +4286,8 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId); code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
if (code) goto _OVER; if (code) goto _OVER;
} }
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) { code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
if (code == 0) {
code = TSDB_CODE_ACTION_IN_PROGRESS; code = TSDB_CODE_ACTION_IN_PROGRESS;
} }
_OVER: _OVER:
@ -4445,7 +4472,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWith
code = terrno; code = terrno;
goto _end; goto _end;
} }
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end); TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pTsmaTbVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end);
} }
} }
_end: _end:
@ -4476,7 +4503,8 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId); code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
if (code) goto _end; if (code) goto _end;
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_end: _end:
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx); if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
tDecoderClear(&decoder); tDecoderClear(&decoder);

View File

@ -146,7 +146,7 @@ int32_t tqBuildFName(char** data, const char* path, char* name);
int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name); int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util // tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type); int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId, int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever); int32_t type, int64_t sver, int64_t ever);

View File

@ -1551,7 +1551,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA
_resume_delete: _resume_delete:
version = RSMA_EXEC_MSG_VER(msg); version = RSMA_EXEC_MSG_VER(msg);
if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version, if ((code = tqExtractDelDataBlock(RSMA_EXEC_MSG_BODY(msg), RSMA_EXEC_MSG_LEN(msg), version,
&packData.pDataBlock, 1))) { &packData.pDataBlock, 1, STREAM_DELETE_DATA))) {
taosFreeQitem(msg); taosFreeQitem(msg);
TAOS_CHECK_EXIT(code); TAOS_CHECK_EXIT(code);
} }

View File

@ -363,11 +363,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
tqError("%s failed to create data submit for stream since out of memory", id); tqError("%s failed to create data submit for stream since out of memory", id);
return code; return code;
} }
} else if (pCont->msgType == TDMT_VND_DELETE) { } else if (pCont->msgType == TDMT_VND_DELETE || pCont->msgType == TDMT_VND_DROP_TSMA_CTB) {
void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead));
int32_t len = pCont->bodyLen - sizeof(SMsgHead); int32_t len = pCont->bodyLen - sizeof(SMsgHead);
EStreamType blockType = pCont->msgType == TDMT_VND_DELETE ? STREAM_DELETE_DATA : STREAM_DELETE_GROUP_DATA;
code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0); code = tqExtractDelDataBlock(pBody, len, ver, (void**)pItem, 0, blockType);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
if (*pItem == NULL) { if (*pItem == NULL) {
tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver); tqDebug("s-task:%s empty delete msg, discard it, len:%d, ver:%" PRId64, id, len, ver);

View File

@ -572,7 +572,7 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
return 0; return 0;
} }
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType) {
int32_t code = 0; int32_t code = 0;
int32_t line = 0; int32_t line = 0;
SDecoder* pCoder = &(SDecoder){0}; SDecoder* pCoder = &(SDecoder){0};
@ -593,7 +593,7 @@ int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void*
} }
SSDataBlock* pDelBlock = NULL; SSDataBlock* pDelBlock = NULL;
code = createSpecialDataBlock(STREAM_DELETE_DATA, &pDelBlock); code = createSpecialDataBlock(blockType, &pDelBlock);
TSDB_CHECK_CODE(code, line, END); TSDB_CHECK_CODE(code, line, END);
code = blockDataEnsureCapacity(pDelBlock, numOfTables); code = blockDataEnsureCapacity(pDelBlock, numOfTables);

View File

@ -50,6 +50,8 @@ static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq,
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessConfigChangeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessArbCheckSyncReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
SRpcMsg *pOriginRpc);
static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodePreCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token); static int32_t vnodeCheckAssignedLogSyncd(SVnode *pVnode, char *member0Token, char *member1Token);
@ -481,6 +483,75 @@ static int32_t vnodePreProcessArbCheckSyncMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return code; return code;
} }
static int32_t vnodePreProcessDropTSmaCtbMsg(SVnode *pVnode, SRpcMsg *pMsg) {
SVDropTbBatchReq dropReq = {0};
int32_t code = 0;
int32_t lino = 0;
SDecoder dc = {0};
SEncoder ec = {0};
int32_t nTbs = 0;
SDeleteRes res = {0};
int32_t size = 0;
uint8_t *pCont = NULL;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (tDecodeSVDropTbBatchReq(&dc, &dropReq) < 0) {
code = TSDB_CODE_INVALID_MSG;
TSDB_CHECK_CODE(code, lino, _exit);
}
nTbs = dropReq.nReqs;
res.skey = INT64_MIN;
res.ekey = INT64_MAX;
res.affectedRows = 1;
res.uidList = taosArrayInit(nTbs, sizeof(tb_uid_t));
if (!res.uidList) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
vDebug("vnode preprocess drop tsma ctb, vgId:%d tb num: %d", TD_VID(pVnode), nTbs);
for (int32_t i = 0; i < nTbs; ++i) {
SVDeleteRsp rsp = {.affectedRows = 1};
tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, dropReq.pReqs[i].name);
if (uid == 0) {
vWarn("vgId:%d, drop tsma ctb:%s not found", TD_VID(pVnode), dropReq.pReqs[i].name);
continue;
}
if (NULL == taosArrayPush(res.uidList, &uid)) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
tEncodeSize(tEncodeDeleteRes, &res, size, code);
pCont = rpcMallocCont(size + sizeof(SMsgHead));
if (!pCont) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
((SMsgHead *)pCont)->contLen = size + sizeof(SMsgHead);
((SMsgHead *)pCont)->vgId = TD_VID(pVnode);
tEncoderInit(&ec, pCont + sizeof(SMsgHead), size);
code = tEncodeDeleteRes(&ec, &res);
tEncoderClear(&ec);
if (code != 0) {
vError("vgId:%d %s failed to encode delete response", TD_VID(pVnode), __func__);
TSDB_CHECK_CODE(code, lino, _exit);
}
rpcFreeCont(pMsg->pCont);
pMsg->pCont = pCont;
pCont = NULL;
pMsg->contLen = size + sizeof(SMsgHead);
_exit:
if (res.uidList) {
taosArrayDestroy(res.uidList);
}
tDecoderClear(&dc);
rpcFreeCont(pCont);
return code;
}
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
@ -507,6 +578,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_ARB_CHECK_SYNC: { case TDMT_VND_ARB_CHECK_SYNC: {
code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg); code = vnodePreProcessArbCheckSyncMsg(pVnode, pMsg);
} break; } break;
case TDMT_VND_DROP_TSMA_CTB: {
code = vnodePreProcessDropTSmaCtbMsg(pVnode, pMsg);
} break;
default: default:
break; break;
} }
@ -711,6 +785,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_ARB_CHECK_SYNC: case TDMT_VND_ARB_CHECK_SYNC:
vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp); vnodeProcessArbCheckSyncReq(pVnode, pReq, len, pRsp);
break; break;
case TDMT_VND_DROP_TSMA_CTB:
if (vnodeProcessDropTSmaCtbReq(pVnode, ver, pReq, len, pRsp, pMsg) < 0) {
goto _err;
}
break;
default: default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
@ -2512,3 +2591,10 @@ _OVER:
int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; } int32_t vnodeAsyncCompact(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp) { return 0; }
int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; } int32_t tsdbAsyncCompact(STsdb *tsdb, const STimeWindow *tw, bool sync) { return 0; }
#endif #endif
static int32_t vnodeProcessDropTSmaCtbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp,
SRpcMsg *pOriginalMsg) {
pRsp->msgType = TDMT_VND_DROP_TSMA_CTB_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
return pRsp->code;
}

View File

@ -87,7 +87,7 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int32_t type = pReader->pHead->head.msgType; int32_t type = pReader->pHead->head.msgType;
if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) ||
(IS_META_MSG(type) && pReader->cond.scanMeta)) { (type == TDMT_VND_DROP_TSMA_CTB) || (IS_META_MSG(type) && pReader->cond.scanMeta)) {
TAOS_RETURN(walFetchBody(pReader)); TAOS_RETURN(walFetchBody(pReader));
} else { } else {
TAOS_CHECK_RETURN(walSkipFetchBody(pReader)); TAOS_CHECK_RETURN(walSkipFetchBody(pReader));