diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index fb403f79a7..ebf8c0fb9b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -89,7 +89,7 @@ int metaBegin(SMeta* pMeta); int metaCommit(SMeta* pMeta); int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); -int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); +int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 71345df747..1327a77cfd 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -212,7 +212,7 @@ _err: return -1; } -int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { +int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq, SArray *tbUidList) { void *pKey = NULL; int nKey = 0; void *pData = NULL; @@ -228,8 +228,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { } // drop all child tables - TBC *pCtbIdxc = NULL; - SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t)); + TBC *pCtbIdxc = NULL; tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn); rc = tdbTbcMoveTo(pCtbIdxc, &(SCtbIdxKey){.suid = pReq->suid, .uid = INT64_MIN}, sizeof(SCtbIdxKey), &c); @@ -249,20 +248,18 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) { break; } - taosArrayPush(pArray, &(((SCtbIdxKey *)pKey)->uid)); + taosArrayPush(tbUidList, &(((SCtbIdxKey *)pKey)->uid)); } tdbTbcClose(pCtbIdxc); metaWLock(pMeta); - for (int32_t iChild = 0; iChild < taosArrayGetSize(pArray); iChild++) { - tb_uid_t uid = *(tb_uid_t *)taosArrayGet(pArray, iChild); + for (int32_t iChild = 0; iChild < taosArrayGetSize(tbUidList); iChild++) { + tb_uid_t uid = *(tb_uid_t *)taosArrayGet(tbUidList, iChild); metaDropTableByUid(pMeta, uid, NULL); } - taosArrayDestroy(pArray); - // drop super table _drop_super_table: tdbTbGet(pMeta->pUidIdx, &pReq->suid, sizeof(tb_uid_t), &pData, &nData); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f18b25bef4..4e5762b5f1 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -49,8 +49,8 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, int32_t workerI static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + // TODO add reference to gurantee success if (metaGetTableEntryByUid(&mr, uid) < 0) { - ASSERT(0); return -1; } char* tbName = strdup(mr.me.name); @@ -87,16 +87,18 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa tqDebug("task execute end, get %p", pDataBlock); if (pDataBlock != NULL) { - tqAddBlockDataToRsp(pDataBlock, pRsp); - pRsp->blockNum++; if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader[0]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } else { pRsp->withTbName = 0; } } + tqAddBlockDataToRsp(pDataBlock, pRsp); + pRsp->blockNum++; if (pOffset->type == TMQ_OFFSET__LOG) { continue; } else { @@ -193,13 +195,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; - ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } + tqAddBlockDataToRsp(&block, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; } @@ -211,13 +214,14 @@ int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataR SSDataBlock block = {0}; if (tqRetrieveDataBlock(&block, pReader) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; - ASSERT(0); } - tqAddBlockDataToRsp(&block, pRsp); if (pRsp->withTbName) { int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; - tqAddTbNameToRsp(pTq, uid, pRsp); + if (tqAddTbNameToRsp(pTq, uid, pRsp) < 0) { + continue; + } } + tqAddBlockDataToRsp(&block, pRsp); tqAddBlockSchemaToRsp(pExec, workerId, pRsp); pRsp->blockNum++; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5f730bcfa5..8e59d97286 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -557,6 +557,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe SVDropStbReq req = {0}; int32_t rcode = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; + SArray *tbUidList = NULL; pRsp->msgType = TDMT_VND_CREATE_STB_RSP; pRsp->pCont = NULL; @@ -570,7 +571,14 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe } // process request - if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { + tbUidList = taosArrayInit(8, sizeof(int64_t)); + if (tbUidList == NULL) goto _exit; + if (metaDropSTable(pVnode->pMeta, version, &req, tbUidList) < 0) { + rcode = terrno; + goto _exit; + } + + if (tqUpdateTbUidList(pVnode->pTq, tbUidList, false) < 0) { rcode = terrno; goto _exit; } @@ -582,6 +590,7 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe // return rsp _exit: + if (tbUidList) taosArrayDestroy(tbUidList); pRsp->code = rcode; tDecoderClear(&decoder); return 0; diff --git a/tests/system-test/7-tmq/subscribeDb4.py b/tests/system-test/7-tmq/subscribeDb4.py index b99704b602..145cbbbbf5 100644 --- a/tests/system-test/7-tmq/subscribeDb4.py +++ b/tests/system-test/7-tmq/subscribeDb4.py @@ -88,7 +88,7 @@ class TDTestCase: tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) tdLog.info("After waiting for a period of time, drop one stable") - time.sleep(10) + time.sleep(3) tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) tdLog.info("wait result from consumer, then check it")