diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e782d505a9..3712196f38 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -4063,8 +4063,8 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) { } typedef struct SVDropTbVgReqs { - SVDropTbBatchReq req; - SVgroupInfo info; + SArray *pBatchReqs; + SVgroupInfo info; } SVDropTbVgReqs; typedef struct SMDropTbDbInfo { @@ -4086,16 +4086,17 @@ typedef struct SMDropTbTsmaInfos { } SMDropTbTsmaInfos; typedef struct SMndDropTbsWithTsmaCtx { - SHashObj *pTsmaMap; // - SHashObj *pDbMap; // - SHashObj *pVgMap; // , only for non tsma result child table - SHashObj *pTsmaTbVgMap; // , only for tsma result child table - SArray *pResTbNames; // SArray + SHashObj *pTsmaMap; // + SHashObj *pDbMap; // + SHashObj *pVgMap; // , only for non tsma result child table + SHashObj *pTsmaTbVgMap; // , only for tsma result child table + SArray *pResTbNames; // SArray } SMndDropTbsWithTsmaCtx; static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs, int32_t vgId); +static void destroySVDropTbBatchReqs(void *p); static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { if (!p) return; @@ -4125,7 +4126,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { void *pIter = taosHashIterate(p->pVgMap, NULL); while (pIter) { SVDropTbVgReqs *pReqs = pIter; - taosArrayDestroy(pReqs->req.pArray); + taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs); pIter = taosHashIterate(p->pVgMap, pIter); } taosHashCleanup(p->pVgMap); @@ -4135,7 +4136,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) { void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL); while (pIter) { SVDropTbVgReqs *pReqs = pIter; - taosArrayDestroy(pReqs->req.pArray); + taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs); pIter = taosHashIterate(p->pTsmaTbVgMap, pIter); } taosHashCleanup(p->pTsmaTbVgMap); @@ -4219,20 +4220,25 @@ static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SV return mndTransAppendRedoAction(pTrans, &action); } -static int32_t mndBuildDropTbRedoActions(SMnode* pMnode, STrans* pTrans, SHashObj* pVgMap, tmsg_t msgType) { +static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) { int32_t code = 0; - void* pIter = taosHashIterate(pVgMap, NULL); + void *pIter = taosHashIterate(pVgMap, NULL); while (pIter) { const SVDropTbVgReqs *pVgReqs = pIter; int32_t len = 0; - void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len); - if (!p) { - taosHashCancelIterate(pVgMap, pIter); - code = TSDB_CODE_MND_RETURN_VALUE_NULL; - if (terrno != 0) code = terrno; - break; + for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) { + SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i); + void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len); + if (!p) { + code = TSDB_CODE_MND_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + break; + } + if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) { + break; + } } - if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) { + if (TSDB_CODE_SUCCESS != code) { taosHashCancelIterate(pVgMap, pIter); break; } @@ -4255,9 +4261,7 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); - //if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER; if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; - //if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER; if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER; _OVER: @@ -4295,26 +4299,51 @@ _OVER: TAOS_RETURN(code); } +static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) { + pBatchReq->nReqs = 1; + pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); + if (!pBatchReq->pArray) return terrno; + if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) { + taosArrayDestroy(pBatchReq->pArray); + pBatchReq->pArray = NULL; + return terrno; + } + return TSDB_CODE_SUCCESS; +} + +static void destroySVDropTbBatchReqs(void *p) { + SVDropTbBatchReq *pReq = p; + taosArrayDestroy(pReq->pArray); + pReq->pArray = NULL; +} + static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid, bool ignoreNotExists) { SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0}; - SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); - SVDropTbVgReqs reqs = {0}; - if (pReqs == NULL) { - reqs.info = *pVgInfo; - reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq)); - if (reqs.req.pArray == NULL) { + SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId)); + SVDropTbVgReqs vgReqs = {0}; + if (pVgReqs == NULL) { + vgReqs.info = *pVgInfo; + vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq)); + if (!vgReqs.pBatchReqs) return terrno; + SVDropTbBatchReq batchReq = {0}; + int32_t code = createDropTbBatchReq(&req, &batchReq); + if (TSDB_CODE_SUCCESS != code) return code; + if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) { + taosArrayDestroy(batchReq.pArray); return terrno; } - if (taosArrayPush(reqs.req.pArray, &req) == NULL) { - return terrno; - } - if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs)) != 0) { + if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) { + taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs); return terrno; } } else { - if (taosArrayPush(pReqs->req.pArray, &req) == NULL) { + SVDropTbBatchReq batchReq = {0}; + int32_t code = createDropTbBatchReq(&req, &batchReq); + if (TSDB_CODE_SUCCESS != code) return code; + if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) { + taosArrayDestroy(batchReq.pArray); return terrno; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4cd32589d8..b13572d5b2 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3306,10 +3306,8 @@ static int32_t setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) int32_t rows = pBlock->info.rows; if (!pInfo->partitionSup.needCalc) { for (int32_t i = 0; i < rows; i++) { - qInfo("wjm, get uid: %"PRIu64, uidCol[i]); uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]); - qInfo("wjm, get groupid: %"PRIu64, groupId); - code = colDataSetVal(pGpCol, i, (const char*)(uidCol + i), false); + code = colDataSetVal(pGpCol, i, (const char*)&groupId, false); QUERY_CHECK_CODE(code, lino, _end); } } @@ -3538,6 +3536,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32 int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; for (int32_t i = 0; i < pBlock->info.rows; i++) { + // uid is the same as gid SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX); SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); int64_t* gpIdCol = (int64_t*)pGpIdCol->pData; @@ -3563,6 +3562,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32 code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]); QUERY_CHECK_CODE(code, lino, _end); pBlock->info.id.groupId = gpIdCol[i]; + // currently, only one valid row in pBlock memcpy(pBlock->info.parTbName, varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1); } @@ -3814,8 +3814,6 @@ FETCH_NEXT_BLOCK: } break; case STREAM_DROP_CHILD_TABLE: { int32_t deleteNum = 0; - code = setBlockGroupIdByUid(pInfo, pBlock); - QUERY_CHECK_CODE(code, lino, _end); code = deletePartName(pInfo, pBlock, &deleteNum); QUERY_CHECK_CODE(code, lino, _end); if (deleteNum == 0) goto FETCH_NEXT_BLOCK;