diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9572bd7aad..8a8948fb17 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -156,6 +156,7 @@ typedef enum EStreamType { STREAM_PARTITION_DELETE_DATA, STREAM_GET_RESULT, STREAM_DELETE_GROUP_DATA, + STREAM_DROP_CHILD_TABLE, } EStreamType; #pragma pack(push, 1) @@ -402,6 +403,7 @@ int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol); #define TSMA_RES_STB_EXTRA_COLUMN_NUM 4 // 3 columns: _wstart, _wend, _wduration, 1 tag: tbname static inline bool isTsmaResSTb(const char* stbName) { + return false; const char* pos = strstr(stbName, TSMA_RES_STB_POSTFIX); if (pos && strlen(stbName) == (pos - stbName) + strlen(TSMA_RES_STB_POSTFIX)) { return true; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7ff70b243a..3c7d715aa8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3220,6 +3220,7 @@ int tDecodeSVCreateTbBatchRsp(SDecoder* pCoder, SVCreateTbBatchRsp* pRsp); typedef struct { char* name; uint64_t suid; // for tmq in wal format + int64_t uid; int8_t igNotExists; } SVDropTbReq; diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index f95b3f20ca..999adc2eff 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -138,6 +138,7 @@ typedef struct { int8_t scanMeta; int8_t deleteMsg; int8_t enableRef; + int8_t scanDropCtb; } SWalFilterCond; // todo hide this struct diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6d1699b911..7e79ef48b8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10277,6 +10277,7 @@ static int32_t tEncodeSVDropTbReq(SEncoder *pCoder, const SVDropTbReq *pReq) { TAOS_CHECK_RETURN(tStartEncode(pCoder)); TAOS_CHECK_RETURN(tEncodeCStr(pCoder, pReq->name)); TAOS_CHECK_RETURN(tEncodeU64(pCoder, pReq->suid)); + TAOS_CHECK_RETURN(tEncodeI64(pCoder, pReq->uid)); TAOS_CHECK_RETURN(tEncodeI8(pCoder, pReq->igNotExists)); tEndEncode(pCoder); @@ -10287,6 +10288,7 @@ static int32_t tDecodeSVDropTbReq(SDecoder *pCoder, SVDropTbReq *pReq) { TAOS_CHECK_RETURN(tStartDecode(pCoder)); TAOS_CHECK_RETURN(tDecodeCStr(pCoder, &pReq->name)); TAOS_CHECK_RETURN(tDecodeU64(pCoder, &pReq->suid)); + TAOS_CHECK_RETURN(tDecodeI64(pCoder, &pReq->uid)); TAOS_CHECK_RETURN(tDecodeI8(pCoder, &pReq->igNotExists)); tEndDecode(pCoder); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 6efc58119b..a1044a9f86 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -4256,9 +4256,9 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); - if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER; + //if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER; if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; - if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; + //if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; _OVER: @@ -4298,7 +4298,7 @@ _OVER: static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid, bool ignoreNotExists) { - SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists}; + SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0}; SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); SVDropTbVgReqs reqs = {0}; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index bdb94d4a6e..3c40100f9d 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -158,6 +158,7 @@ int32_t doMergeExistedRows(SSubmitTbData* pExisted, const SSubmitTbData* pNew, c int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t numOfCols, SSDataBlock* pDataBlock, SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq); +int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type); #define TQ_ERR_GO_TO_END(c) \ do { \ diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 5c3516a962..93324d6eb4 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -1233,6 +1233,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi metaWLock(pMeta); rc = metaDropTableByUid(pMeta, uid, &type, &suid, &sysTbl); + metaInfo("wjm meta drop table by uid: %"PRId64, uid); metaULock(pMeta); if (rc < 0) goto _exit; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd78f62cae..6195899566 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -758,7 +758,8 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files + bool scanDropCtb = pTask->subtableWithoutMd5 ? true : false; + SWalFilterCond cond = {.deleteMsg = 1, .scanDropCtb = scanDropCtb}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); if (pTask->exec.pWalReader == NULL) { tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno)); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c8c34b10a4..391cbe78fb 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -382,6 +382,20 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con return code; } + } else if (pCont->msgType == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) { + void* pBody = POINTER_SHIFT(pCont->body, sizeof(SMsgHead)); + int32_t len = pCont->bodyLen - sizeof(SMsgHead); + code = tqExtractDropCtbDataBlock(pBody, len, ver, (void**)pItem, 0); + if (TSDB_CODE_SUCCESS == code) { + if (!*pItem) { + continue; + } else { + tqDebug("s-task:%s drop ctb msg extract from WAL, len:%d, ver:%"PRId64, id, len, ver); + } + } else { + terrno = code; + return code; + } } else { tqError("s-task:%s invalid msg type:%d, ver:%" PRId64, id, pCont->msgType, ver); return TSDB_CODE_STREAM_INTERNAL_ERROR; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e972c928f0..3c14870d92 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -53,6 +53,7 @@ static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode); static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs); +static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid); int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr, bool newSubTableRule) { @@ -138,7 +139,7 @@ int32_t tqBuildDeleteReq(STQ* pTq, const char* stbFullName, const SSDataBlock* p return 0; } -static int32_t encodeCreateChildTableForRPC(SVCreateTbBatchReq* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { +static int32_t encodeCreateChildTableForRPC(void* pReqs, int32_t vgId, void** pBuf, int32_t* contLen) { int32_t ret = 0; tEncodeSize(tEncodeSVCreateTbBatchReq, pReqs, *contLen, ret); @@ -170,17 +171,50 @@ end: return ret; } -static int32_t tqPutReqToQueue(SVnode* pVnode, SVCreateTbBatchReq* pReqs) { +static int32_t encodeDropChildTableForRPC(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen) { + int32_t code = 0; + SEncoder ec = {0}; + tEncodeSize(tEncodeSVDropTbBatchReq, pReqs, *contLen, code); + if (code < 0) { + code = TSDB_CODE_INVALID_MSG; + goto end; + } + *contLen += sizeof(SMsgHead); + *ppBuf = rpcMallocCont(*contLen); + + if (!*ppBuf) { + code = terrno; + goto end; + } + + ((SMsgHead*)(*ppBuf))->vgId = vgId; + ((SMsgHead*)(*ppBuf))->contLen = htonl(*contLen); + + tEncoderInit(&ec, POINTER_SHIFT(*ppBuf, sizeof(SMsgHead)), (*contLen) - sizeof(SMsgHead)); + code = tEncodeSVDropTbBatchReq(&ec, pReqs); + tEncoderClear(&ec); + if (code < 0) { + rpcFreeCont(*ppBuf); + *ppBuf = NULL; + *contLen = 0; + code = TSDB_CODE_INVALID_MSG; + goto end; + } +end: + return code; +} + +static int32_t tqPutReqToQueue(SVnode* pVnode, void* pReqs, int32_t(*encoder)(void* pReqs, int32_t vgId, void** ppBuf, int32_t *contLen), tmsg_t msgType) { void* buf = NULL; int32_t tlen = 0; - int32_t code = encodeCreateChildTableForRPC(pReqs, TD_VID(pVnode), &buf, &tlen); + int32_t code = encoder(pReqs, TD_VID(pVnode), &buf, &tlen); if (code) { tqError("vgId:%d failed to encode create table msg, create table failed, code:%s", TD_VID(pVnode), tstrerror(code)); return code; } - SRpcMsg msg = {.msgType = TDMT_VND_CREATE_TABLE, .pCont = buf, .contLen = tlen}; + SRpcMsg msg = {.msgType = msgType, .pCont = buf, .contLen = tlen}; code = tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg); if (code) { tqError("failed to put into write-queue since %s", terrstr()); @@ -388,7 +422,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S } reqs.nReqs = taosArrayGetSize(reqs.pArray); - code = tqPutReqToQueue(pVnode, &reqs); + code = tqPutReqToQueue(pVnode, &reqs, encodeCreateChildTableForRPC, TDMT_VND_CREATE_TABLE); if (code != TSDB_CODE_SUCCESS) { tqError("s-task:%s failed to send create table msg", id); } @@ -399,6 +433,58 @@ _end: return code; } +static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SSDataBlock* pDataBlock, + SStreamTask* pTask, int64_t suid) { + int32_t lino = 0; + int32_t code = 0; + int32_t rows = pDataBlock->info.rows; + const char* id = pTask->id.idStr; + SVDropTbBatchReq batchReq = {0}; + SVDropTbReq req = {0}; + + if (rows <= 0 || pTask->subtableWithoutMd5 == 0) return TSDB_CODE_SUCCESS; + + batchReq.pArray = taosArrayInit(rows, sizeof(SVDropTbReq)); + if (!batchReq.pArray) return terrno; + batchReq.nReqs = rows; + req.suid = suid; + req.igNotExists = true; + + SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + SColumnInfoData* pUidCol = taosArrayGet(pDataBlock->pDataBlock, UID_COLUMN_INDEX); + char tbName[TSDB_TABLE_NAME_LEN + 1] = {0}; + for (int32_t i = 0; i < rows; ++i) { + void* pData = colDataGetVarData(pTbNameCol, i); + memcpy(tbName, varDataVal(pData), varDataLen(pData)); + tbName[varDataLen(pData) + 1] = 0; + req.name = tbName; + // TODO wjm remove uid, it's not my uid + req.uid = *(int64_t*)colDataGetData(pUidCol, i); + if (taosArrayPush(batchReq.pArray, &req) == NULL) { + TSDB_CHECK_CODE(terrno, lino, _exit); + } + } + tqDebug("s-task:%s build drop %d table(s) msg", id, rows); + code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE); + TSDB_CHECK_CODE(code, lino, _exit); + + for (int32_t i = 0; i < rows; ++i) { + void* pData = colDataGetVarData(pTbNameCol, i); + memcpy(tbName, varDataVal(pData), varDataLen(pData)); + tbName[varDataLen(pData) + 1] = 0; + int64_t uid = *(int64_t*)colDataGetData(pUidCol, i); + code = doWaitForDstTableDropped(pVnode, pTask, tbName, uid); + TSDB_CHECK_CODE(code, lino, _exit); + } + return code; + +_exit: + if (batchReq.pArray) { + taosArrayDestroy(batchReq.pArray); + } + return code; +} + int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* pReq, int32_t numOfBlocks) { const char* id = pTask->id.idStr; int32_t vgId = TD_VID(pVnode); @@ -807,6 +893,42 @@ int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkI return TSDB_CODE_SUCCESS; } +static int32_t doWaitForDstTableDropped(SVnode* pVnode, SStreamTask* pTask, const char* dstTableName, int64_t uid) { + int32_t vgId = TD_VID(pVnode); + int64_t suid = pTask->outputInfo.tbSink.stbUid; + const char* id = pTask->id.idStr; + + while (1) { + if (streamTaskShouldStop(pTask)) { + tqDebug("s-task:%s task will stop, quit from waiting for table:%s drop", id, dstTableName); + return TSDB_CODE_STREAM_EXEC_CANCELLED; + } + SMetaReader mr = {0}; + metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK); + int32_t code = metaGetTableEntryByName(&mr, dstTableName); + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + tqDebug("wjm s-task:%s table:%s has been dropped", id, dstTableName); + metaReaderClear(&mr); + break; + } else if (TSDB_CODE_SUCCESS == code) { + if (isValidDstChildTable(&mr, vgId, dstTableName, suid)) { + metaReaderClear(&mr); + taosMsleep(100); + tqDebug("s-task:%s wait 100ms for table:%s drop", id, dstTableName); + } else { + tqDebug("wjm s-task:%s table:%s exist, but not mine", id, dstTableName); + metaReaderClear(&mr); + break; + } + } else { + tqError("s-task:%s failed to wait for table:%s drop", id, dstTableName); + metaReaderClear(&mr); + return terrno; + } + } + return TSDB_CODE_SUCCESS; +} + int32_t doCreateSinkTableInfo(const char* pDstTableName, STableSinkInfo** pInfo) { int32_t nameLen = strlen(pDstTableName); (*pInfo) = taosMemoryCalloc(1, sizeof(STableSinkInfo) + nameLen + 1); @@ -1052,6 +1174,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { code = doBuildAndSendCreateTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); } else if (pDataBlock->info.type == STREAM_CHECKPOINT) { continue; + } else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) { + code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid); } else { code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 54b063d692..0b3d7b180a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -751,3 +751,54 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b return TSDB_CODE_SUCCESS; } + +int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type) { + int32_t code = 0; + int32_t lino = 0; + SDecoder dc = {0}; + SVDropTbBatchReq batchReq = {0}; + tDecoderInit(&dc, (uint8_t*)data, len); + code = tDecodeSVDropTbBatchReq(&dc, &batchReq); + TSDB_CHECK_CODE(code, lino, _exit); + if (batchReq.nReqs <= 0) goto _exit; + + SSDataBlock* pBlock = NULL; + code = createSpecialDataBlock(STREAM_DROP_CHILD_TABLE, &pBlock); + TSDB_CHECK_CODE(code, lino, _exit); + + code = blockDataEnsureCapacity(pBlock, batchReq.nReqs); + TSDB_CHECK_CODE(code, lino, _exit); + + pBlock->info.rows = batchReq.nReqs; + pBlock->info.version = ver; + for (int32_t i = 0; i < batchReq.nReqs; ++i) { + SVDropTbReq* pReq = batchReq.pReqs + i; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); + TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno); + code = colDataSetVal(pCol, i, (const char* )&pReq->uid, false); + TSDB_CHECK_CODE(code, lino, _exit); + + /* + pCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + TSDB_CHECK_NULL(pCol, code, lino, _exit, terrno); + char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0}; + varDataSetLen(varTbName, strlen(pReq->name)); + tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pReq->name); + code = colDataSetVal(pCol, i, varTbName, false); + */ + } + + code = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0, pRefBlock); + TSDB_CHECK_CODE(code, lino, _exit); + ((SStreamRefDataBlock*)(*pRefBlock))->type = STREAM_INPUT__REF_DATA_BLOCK; + ((SStreamRefDataBlock*)(*pRefBlock))->pBlock = pBlock; + +_exit: + tDecoderClear(&dc); + if (TSDB_CODE_SUCCESS != code) { + tqError("faled to extract drop ctb data block, line:%d code:%s", lino, tstrerror(code)); + blockDataCleanup(pBlock); + taosMemoryFree(pBlock); + } + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1ea2aac809..a2b5f49b5c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -552,6 +552,61 @@ _exit: return code; } +int32_t vnodePreProcessDropTbMsg(SVnode* pVnode, SRpcMsg* pMsg) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t size = 0; + SDecoder dc = {0}; + SEncoder ec = {0}; + SVDropTbBatchReq receivedBatchReqs = {0}; + SVDropTbBatchReq sentBatchReqs = {0}; + + tDecoderInit(&dc, pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead)); + + code = tDecodeSVDropTbBatchReq(&dc, &receivedBatchReqs); + if (code < 0) { + terrno = code; + TSDB_CHECK_CODE(code, lino, _exit); + } + sentBatchReqs.pArray = taosArrayInit(receivedBatchReqs.nReqs, sizeof(SVDropTbReq)); + if (!sentBatchReqs.pArray) { + code = terrno; + goto _exit; + } + + for (int32_t i = 0; i < receivedBatchReqs.nReqs; ++i) { + SVDropTbReq* pReq = receivedBatchReqs.pReqs + i; + tb_uid_t uid = metaGetTableEntryUidByName(pVnode->pMeta, pReq->name); + if (uid == 0) { + vWarn("vgId:%d, preprocess drop ctb: %s not found", TD_VID(pVnode), pReq->name); + continue; + } + pReq->uid = uid; + vDebug("vgId:%d %s for: %s, uid: %"PRId64, TD_VID(pVnode), __func__, pReq->name, pReq->uid); + if (taosArrayPush(sentBatchReqs.pArray, pReq) == NULL) { + code = terrno; + goto _exit; + } + } + sentBatchReqs.nReqs = sentBatchReqs.pArray->size; + + tEncodeSize(tEncodeSVDropTbBatchReq, &sentBatchReqs, size, code); + tEncoderInit(&ec, pMsg->pCont + sizeof(SMsgHead), size); + code = tEncodeSVDropTbBatchReq(&ec, &sentBatchReqs); + tEncoderClear(&ec); + if (code != TSDB_CODE_SUCCESS) { + vError("vgId:%d %s failed to encode drop tb batch req: %s", TD_VID(pVnode), __func__, tstrerror(code)); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + tDecoderClear(&dc); + if (sentBatchReqs.pArray) { + taosArrayDestroy(sentBatchReqs.pArray); + } + return code; +} + int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t code = 0; @@ -581,6 +636,9 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_DROP_TSMA_CTB: { code = vnodePreProcessDropTSmaCtbMsg(pVnode, pMsg); } break; + case TDMT_VND_DROP_TABLE: { + code = vnodePreProcessDropTbMsg(pVnode, pMsg); + } break; default: break; } @@ -1189,7 +1247,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, STbUidStore *pStore = NULL; SArray *tbUids = NULL; SArray *tbNames = NULL; - pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; pRsp->pCont = NULL; @@ -1245,9 +1302,11 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, continue; } + vInfo("wjm process create tb req:%s, uid: %"PRId64, pCreateReq->name, pCreateReq->uid); // do create table if (metaCreateTable(pVnode->pMeta, ver, pCreateReq, &cRsp.pMeta) < 0) { if (pCreateReq->flags & TD_CREATE_IF_NOT_EXISTS && terrno == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { + vInfo("wjm already exists-----------------"); cRsp.code = TSDB_CODE_SUCCESS; } else { cRsp.code = terrno; @@ -1324,6 +1383,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, } _exit: + vInfo("wjm process create table request exit"); tDeleteSVCreateTbBatchReq(&req); taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp); taosArrayDestroy(tbUids); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0cdb0e7954..b5cb22cf12 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3309,7 +3309,9 @@ static int32_t setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) int32_t rows = pBlock->info.rows; if (!pInfo->partitionSup.needCalc) { for (int32_t i = 0; i < rows; i++) { + qInfo("wjm, get uid: %"PRIu64, uidCol[i]); uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]); + qInfo("wjm, get groupid: %"PRIu64, groupId); code = colDataSetVal(pGpCol, i, (const char*)&groupId, false); QUERY_CHECK_CODE(code, lino, _end); } @@ -3535,12 +3537,32 @@ static int32_t copyGetResultBlock(SSDataBlock* dest, TSKEY start, TSKEY end) { return appendDataToSpecialBlock(dest, &start, &end, NULL, NULL, NULL); } -static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { +static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t *deleteNum) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; for (int32_t i = 0; i < pBlock->info.rows; i++) { SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); int64_t* gpIdCol = (int64_t*)pGpIdCol->pData; + void* pParName = NULL; + int32_t winCode = 0; + // TODO wjm test remove non stream child tables + code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i], + &pParName, false, &winCode); + if (TSDB_CODE_SUCCESS == code && winCode != 0) { + qInfo("delete stream part Name for:%"PRId64 " not found", gpIdCol[i]); + colDataSetNULL(pTbnameCol, i); + continue; + } + (*deleteNum)++; + QUERY_CHECK_CODE(code, lino, _end); + char varTbName[TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE + 1] = {0}; + varDataSetLen(varTbName, strlen(pParName)); + tsnprintf(varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1, "%s", pParName); + code = colDataSetVal(pTbnameCol, i, varTbName, false); + qDebug("delete stream part for:%"PRId64 " res tb: %s", gpIdCol[i], (char*)pParName); + pInfo->stateStore.streamStateFreeVal(pParName); + QUERY_CHECK_CODE(code, lino, _end); code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]); QUERY_CHECK_CODE(code, lino, _end); } @@ -3791,15 +3813,13 @@ FETCH_NEXT_BLOCK: prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; } break; - case STREAM_DELETE_GROUP_DATA: { - printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "delete group recv", - GET_TASKID(pTaskInfo)); + case STREAM_DROP_CHILD_TABLE: { + int32_t deleteNum = 0; code = setBlockGroupIdByUid(pInfo, pBlock); QUERY_CHECK_CODE(code, lino, _end); - - code = deletePartName(pInfo, pBlock); + code = deletePartName(pInfo, pBlock, &deleteNum); QUERY_CHECK_CODE(code, lino, _end); - goto FETCH_NEXT_BLOCK; + if (deleteNum == 0) goto FETCH_NEXT_BLOCK; } break; case STREAM_CHECKPOINT: { qError("stream check point error. msg type: STREAM_INPUT__DATA_BLOCK"); diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 8fd00e9313..2e906d2ba6 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -5215,7 +5215,7 @@ static int32_t doStreamIntervalAggNext(SOperatorInfo* pOperator, SSDataBlock** p code = getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); QUERY_CHECK_CODE(code, lino, _end); continue; - } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_DROP_CHILD_TABLE) { printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); (*ppRes) = pBlock; return code; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 26c97cbb6e..91ed919446 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -480,6 +480,7 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) { if (tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) { code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN); + qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap)); QUERY_CHECK_CODE(code, lino, _end); } code = streamStatePutParName_rocksdb(pState, groupId, tbname); @@ -505,6 +506,7 @@ int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal (*pWinCode) = streamStateGetParName_rocksdb(pState, groupId, pVal); if ((*pWinCode) == TSDB_CODE_SUCCESS && tSimpleHashGetSize(pState->parNameMap) < MAX_TABLE_NAME_NUM) { code = tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN); + qInfo("wjm put group id into parnamemap: %"PRId64 " cur mapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap)); QUERY_CHECK_CODE(code, lino, _end); } goto _end; @@ -526,6 +528,7 @@ _end: } int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) { + qTrace("wjm delete par for group:%"PRId64 " parnameMapsize: %d", groupId, tSimpleHashGetSize(pState->parNameMap)); int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t)); qTrace("catche %s at line %d res %d", __func__, __LINE__, code); code = streamStateDeleteParName_rocksdb(pState, groupId); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 9a3ea34eff..94dff3f71c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -89,6 +89,8 @@ int32_t walNextValidMsg(SWalReader *pReader) { if (type == TDMT_VND_SUBMIT || ((type == TDMT_VND_DELETE) && (pReader->cond.deleteMsg == 1)) || (type == TDMT_VND_DROP_TSMA_CTB) || (IS_META_MSG(type) && pReader->cond.scanMeta)) { TAOS_RETURN(walFetchBody(pReader)); + } else if (type == TDMT_VND_DROP_TABLE && pReader->cond.scanDropCtb) { + TAOS_RETURN(walFetchBody(pReader)); } else { TAOS_CHECK_RETURN(walSkipFetchBody(pReader)); diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index a1638ae4cb..78a3c1406e 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -604,7 +604,7 @@ class TSMATestSQLGenerator: class TDTestCase: - updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3} + updatecfgDict = {'asynclog': 0, 'ttlUnit': 1, 'ttlPushInterval': 5, 'ratioOfVnodeStreamThrea': 4, 'maxTsmaNum': 3, 'debugFlag': 143} def __init__(self): self.vgroups = 4 @@ -804,9 +804,10 @@ class TDTestCase: self.tsma_tester.check_sql(ctx.sql, ctx) def test_query_with_tsma(self): - self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '5m') - self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)'], '30m') - self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') + self.create_tsma('tsma1', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '5m') + #self.create_tsma('tsma2', 'test', 'meters', ['avg(c1)', 'avg(c2)', 'count(ts)'], '30m') + #self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') + return self.test_query_with_tsma_interval() self.test_query_with_tsma_agg() @@ -1227,17 +1228,28 @@ class TDTestCase: def run(self): self.init_data() - self.test_ddl() + #self.test_ddl() self.test_query_with_tsma() # bug to fix - self.test_flush_query() + #self.test_flush_query() #cluster test cluster_dnode_list = tdSql.get_cluseter_dnodes() clust_dnode_nums = len(cluster_dnode_list) if clust_dnode_nums > 1: self.test_redistribute_vgroups() - + self.test_td_32519() + + def test_td_32519(self): + tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:45:00", 1,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:55:00", 2,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('DROP TABLE t1', queryTimes=1) + tdSql.execute('CREATE TABLE t1 USING meters TAGS(1, "a", "b", 1,1,1)') + tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 11:59:00", 3,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:10:00", 4,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('INSERT INTO t1 VALUES("2024-10-24 12:20:00", 5,1,1,1,1,1,1, "a", "a")', queryTimes=1) + tdSql.execute('FLUSH DATABASE test', queryTimes=1) + def test_create_tsma(self): function_name = sys._getframe().f_code.co_name tdLog.debug(f'-----{function_name}------')