refine comments and functions
This commit is contained in:
parent
9a92c136ce
commit
de8576a6a1
|
@ -4063,8 +4063,8 @@ static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
typedef struct SVDropTbVgReqs {
|
||||
SVDropTbBatchReq req;
|
||||
SVgroupInfo info;
|
||||
SArray *pBatchReqs;
|
||||
SVgroupInfo info;
|
||||
} SVDropTbVgReqs;
|
||||
|
||||
typedef struct SMDropTbDbInfo {
|
||||
|
@ -4086,16 +4086,17 @@ typedef struct SMDropTbTsmaInfos {
|
|||
} SMDropTbTsmaInfos;
|
||||
|
||||
typedef struct SMndDropTbsWithTsmaCtx {
|
||||
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
||||
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
|
||||
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>, only for non tsma result child table
|
||||
SHashObj *pTsmaTbVgMap; // <vgid, SVDropTbVgReqs>, only for tsma result child table
|
||||
SArray *pResTbNames; // SArray<char*>
|
||||
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
||||
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
|
||||
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>, only for non tsma result child table
|
||||
SHashObj *pTsmaTbVgMap; // <vgid, SVDropTbVgReqs>, only for tsma result child table
|
||||
SArray *pResTbNames; // SArray<char*>
|
||||
} SMndDropTbsWithTsmaCtx;
|
||||
|
||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||
int32_t vgId);
|
||||
|
||||
static void destroySVDropTbBatchReqs(void *p);
|
||||
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
||||
if (!p) return;
|
||||
|
||||
|
@ -4125,7 +4126,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
|||
void *pIter = taosHashIterate(p->pVgMap, NULL);
|
||||
while (pIter) {
|
||||
SVDropTbVgReqs *pReqs = pIter;
|
||||
taosArrayDestroy(pReqs->req.pArray);
|
||||
taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
|
||||
pIter = taosHashIterate(p->pVgMap, pIter);
|
||||
}
|
||||
taosHashCleanup(p->pVgMap);
|
||||
|
@ -4135,7 +4136,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
|||
void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL);
|
||||
while (pIter) {
|
||||
SVDropTbVgReqs *pReqs = pIter;
|
||||
taosArrayDestroy(pReqs->req.pArray);
|
||||
taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
|
||||
pIter = taosHashIterate(p->pTsmaTbVgMap, pIter);
|
||||
}
|
||||
taosHashCleanup(p->pTsmaTbVgMap);
|
||||
|
@ -4219,20 +4220,25 @@ static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SV
|
|||
return mndTransAppendRedoAction(pTrans, &action);
|
||||
}
|
||||
|
||||
static int32_t mndBuildDropTbRedoActions(SMnode* pMnode, STrans* pTrans, SHashObj* pVgMap, tmsg_t msgType) {
|
||||
static int32_t mndBuildDropTbRedoActions(SMnode *pMnode, STrans *pTrans, SHashObj *pVgMap, tmsg_t msgType) {
|
||||
int32_t code = 0;
|
||||
void* pIter = taosHashIterate(pVgMap, NULL);
|
||||
void *pIter = taosHashIterate(pVgMap, NULL);
|
||||
while (pIter) {
|
||||
const SVDropTbVgReqs *pVgReqs = pIter;
|
||||
int32_t len = 0;
|
||||
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
|
||||
if (!p) {
|
||||
taosHashCancelIterate(pVgMap, pIter);
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
break;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pVgReqs->pBatchReqs) && code == TSDB_CODE_SUCCESS; ++i) {
|
||||
SVDropTbBatchReq *pBatchReq = taosArrayGet(pVgReqs->pBatchReqs, i);
|
||||
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, pBatchReq, &len);
|
||||
if (!p) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
break;
|
||||
}
|
||||
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
if ((code = mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len, msgType)) != 0) {
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
taosHashCancelIterate(pVgMap, pIter);
|
||||
break;
|
||||
}
|
||||
|
@ -4255,9 +4261,7 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx
|
|||
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
//if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TSMA_CTB)) != 0) goto _OVER;
|
||||
if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
|
||||
//if ((code = mndBuildDropTbRedoActions(pMnode, pTrans, pCtx->pTsmaTbVgMap, TDMT_VND_DROP_TABLE)) != 0) goto _OVER;
|
||||
if ((code = mndTransPrepare(pMnode, pTrans)) != 0) goto _OVER;
|
||||
|
||||
_OVER:
|
||||
|
@ -4295,26 +4299,51 @@ _OVER:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t createDropTbBatchReq(const SVDropTbReq *pReq, SVDropTbBatchReq *pBatchReq) {
|
||||
pBatchReq->nReqs = 1;
|
||||
pBatchReq->pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
||||
if (!pBatchReq->pArray) return terrno;
|
||||
if (taosArrayPush(pBatchReq->pArray, pReq) == NULL) {
|
||||
taosArrayDestroy(pBatchReq->pArray);
|
||||
pBatchReq->pArray = NULL;
|
||||
return terrno;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroySVDropTbBatchReqs(void *p) {
|
||||
SVDropTbBatchReq *pReq = p;
|
||||
taosArrayDestroy(pReq->pArray);
|
||||
pReq->pArray = NULL;
|
||||
}
|
||||
|
||||
static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupInfo *pVgInfo, char *name, tb_uid_t suid,
|
||||
bool ignoreNotExists) {
|
||||
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists, .uid = 0};
|
||||
|
||||
SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
SVDropTbVgReqs reqs = {0};
|
||||
if (pReqs == NULL) {
|
||||
reqs.info = *pVgInfo;
|
||||
reqs.req.pArray = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbReq));
|
||||
if (reqs.req.pArray == NULL) {
|
||||
SVDropTbVgReqs *pVgReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||
SVDropTbVgReqs vgReqs = {0};
|
||||
if (pVgReqs == NULL) {
|
||||
vgReqs.info = *pVgInfo;
|
||||
vgReqs.pBatchReqs = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SVDropTbBatchReq));
|
||||
if (!vgReqs.pBatchReqs) return terrno;
|
||||
SVDropTbBatchReq batchReq = {0};
|
||||
int32_t code = createDropTbBatchReq(&req, &batchReq);
|
||||
if (TSDB_CODE_SUCCESS != code) return code;
|
||||
if (taosArrayPush(vgReqs.pBatchReqs, &batchReq) == NULL) {
|
||||
taosArrayDestroy(batchReq.pArray);
|
||||
return terrno;
|
||||
}
|
||||
if (taosArrayPush(reqs.req.pArray, &req) == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &reqs, sizeof(reqs)) != 0) {
|
||||
if (taosHashPut(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId), &vgReqs, sizeof(vgReqs)) != 0) {
|
||||
taosArrayDestroyEx(vgReqs.pBatchReqs, destroySVDropTbBatchReqs);
|
||||
return terrno;
|
||||
}
|
||||
} else {
|
||||
if (taosArrayPush(pReqs->req.pArray, &req) == NULL) {
|
||||
SVDropTbBatchReq batchReq = {0};
|
||||
int32_t code = createDropTbBatchReq(&req, &batchReq);
|
||||
if (TSDB_CODE_SUCCESS != code) return code;
|
||||
if (taosArrayPush(pVgReqs->pBatchReqs, &batchReq) == NULL) {
|
||||
taosArrayDestroy(batchReq.pArray);
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3306,10 +3306,8 @@ static int32_t setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock)
|
|||
int32_t rows = pBlock->info.rows;
|
||||
if (!pInfo->partitionSup.needCalc) {
|
||||
for (int32_t i = 0; i < rows; i++) {
|
||||
qInfo("wjm, get uid: %"PRIu64, uidCol[i]);
|
||||
uint64_t groupId = getGroupIdByUid(pInfo, uidCol[i]);
|
||||
qInfo("wjm, get groupid: %"PRIu64, groupId);
|
||||
code = colDataSetVal(pGpCol, i, (const char*)(uidCol + i), false);
|
||||
code = colDataSetVal(pGpCol, i, (const char*)&groupId, false);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
}
|
||||
|
@ -3538,6 +3536,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
// uid is the same as gid
|
||||
SColumnInfoData* pGpIdCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
SColumnInfoData* pTbnameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
int64_t* gpIdCol = (int64_t*)pGpIdCol->pData;
|
||||
|
@ -3563,6 +3562,7 @@ static int32_t deletePartName(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32
|
|||
code = pInfo->stateStore.streamStateDeleteParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, gpIdCol[i]);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
pBlock->info.id.groupId = gpIdCol[i];
|
||||
// currently, only one valid row in pBlock
|
||||
memcpy(pBlock->info.parTbName, varTbName + VARSTR_HEADER_SIZE, TSDB_TABLE_NAME_LEN + 1);
|
||||
}
|
||||
|
||||
|
@ -3814,8 +3814,6 @@ FETCH_NEXT_BLOCK:
|
|||
} break;
|
||||
case STREAM_DROP_CHILD_TABLE: {
|
||||
int32_t deleteNum = 0;
|
||||
code = setBlockGroupIdByUid(pInfo, pBlock);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
code = deletePartName(pInfo, pBlock, &deleteNum);
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
if (deleteNum == 0) goto FETCH_NEXT_BLOCK;
|
||||
|
|
Loading…
Reference in New Issue