refactor code
This commit is contained in:
parent
072733d3f7
commit
7ddfe085f1
|
@ -97,5 +97,5 @@ int8_t validColCompress(uint8_t type, uint8_t l2);
|
||||||
int8_t validColEncode(uint8_t type, uint8_t l1);
|
int8_t validColEncode(uint8_t type, uint8_t l1);
|
||||||
|
|
||||||
uint32_t createDefaultColCmprByType(uint8_t type);
|
uint32_t createDefaultColCmprByType(uint8_t type);
|
||||||
bool validColCmprByType(uint8_t type, uint32_t cmpr);
|
int32_t validColCmprByType(uint8_t type, uint32_t cmpr);
|
||||||
#endif /*_TD_TCOL_H_*/
|
#endif /*_TD_TCOL_H_*/
|
||||||
|
|
|
@ -397,10 +397,17 @@ uint32_t createDefaultColCmprByType(uint8_t type) {
|
||||||
SET_COMPRESS(encode, compress, lvl, ret);
|
SET_COMPRESS(encode, compress, lvl, ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
bool validColCmprByType(uint8_t type, uint32_t cmpr) {
|
int32_t validColCmprByType(uint8_t type, uint32_t cmpr) {
|
||||||
DEFINE_VAR(cmpr);
|
DEFINE_VAR(cmpr);
|
||||||
if (validColEncode(type, l1) && validColCompress(type, l2) && validColCompressLevel(type, lvl)) {
|
if (!validColEncode(type, l1)) {
|
||||||
return true;
|
return TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
||||||
}
|
}
|
||||||
return false;
|
if (!validColCompress(type, l2)) {
|
||||||
|
return TSDB_CODE_TSC_COMPRESS_PARAM_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!validColCompressLevel(type, lvl)) {
|
||||||
|
return TSDB_CODE_TSC_COMPRESS_LEVEL_ERROR;
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
|
||||||
|
|
||||||
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
|
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq);
|
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
|
static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pReq);
|
||||||
|
|
||||||
int32_t mndInitStb(SMnode *pMnode) {
|
int32_t mndInitStb(SMnode *pMnode) {
|
||||||
|
@ -1006,7 +1006,8 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
|
||||||
pHead->vgId = htonl(pVgroup->vgId);
|
pHead->vgId = htonl(pVgroup->vgId);
|
||||||
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
|
tSerializeSVDropTtlTableReq((char *)pHead + sizeof(SMsgHead), reqLen, &ttlReq);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
SRpcMsg rpcMsg = {
|
||||||
|
.msgType = TDMT_VND_FETCH_TTL_EXPIRED_TBS, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -1752,9 +1753,10 @@ static int32_t mndUpdateSuperTableColumnCompress(SMnode *pMnode, const SStbObj *
|
||||||
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
if (mndAllocStbSchemas(pOld, pNew) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (!validColCmprByType(pTarget->type, p->bytes)) {
|
code = validColCmprByType(pTarget->type, p->bytes);
|
||||||
terrno = TSDB_CODE_TSC_ENCODE_PARAM_ERROR;
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return -1;
|
terrno = code;
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int8_t updated = 0;
|
int8_t updated = 0;
|
||||||
|
@ -3892,25 +3894,26 @@ typedef struct SMDropTbTsmaInfo {
|
||||||
} SMDropTbTsmaInfo;
|
} SMDropTbTsmaInfo;
|
||||||
|
|
||||||
typedef struct SMDropTbTsmaInfos {
|
typedef struct SMDropTbTsmaInfos {
|
||||||
SArray* pTsmaInfos; // SMDropTbTsmaInfo
|
SArray *pTsmaInfos; // SMDropTbTsmaInfo
|
||||||
} SMDropTbTsmaInfos;
|
} SMDropTbTsmaInfos;
|
||||||
|
|
||||||
typedef struct SMndDropTbsWithTsmaCtx {
|
typedef struct SMndDropTbsWithTsmaCtx {
|
||||||
SHashObj* pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
||||||
SHashObj* pDbMap; // <dbuid, SMDropTbDbInfo>
|
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
|
||||||
SHashObj* pVgMap; // <vgId, SVDropTbVgReqs>
|
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
|
||||||
SArray* pResTbNames; // SArray<char*>
|
SArray *pResTbNames; // SArray<char*>
|
||||||
} SMndDropTbsWithTsmaCtx;
|
} SMndDropTbsWithTsmaCtx;
|
||||||
|
|
||||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId);
|
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||||
|
int32_t vgId);
|
||||||
|
|
||||||
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
||||||
if (!p) return;
|
if (!p) return;
|
||||||
|
|
||||||
if (p->pDbMap) {
|
if (p->pDbMap) {
|
||||||
void* pIter = taosHashIterate(p->pDbMap, NULL);
|
void *pIter = taosHashIterate(p->pDbMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SMDropTbDbInfo* pInfo = pIter;
|
SMDropTbDbInfo *pInfo = pIter;
|
||||||
taosArrayDestroy(pInfo->dbVgInfos);
|
taosArrayDestroy(pInfo->dbVgInfos);
|
||||||
pIter = taosHashIterate(p->pDbMap, pIter);
|
pIter = taosHashIterate(p->pDbMap, pIter);
|
||||||
}
|
}
|
||||||
|
@ -3920,9 +3923,9 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
|
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
|
||||||
}
|
}
|
||||||
if (p->pTsmaMap) {
|
if (p->pTsmaMap) {
|
||||||
void* pIter = taosHashIterate(p->pTsmaMap, NULL);
|
void *pIter = taosHashIterate(p->pTsmaMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SMDropTbTsmaInfos* pInfos = pIter;
|
SMDropTbTsmaInfos *pInfos = pIter;
|
||||||
taosArrayDestroy(pInfos->pTsmaInfos);
|
taosArrayDestroy(pInfos->pTsmaInfos);
|
||||||
pIter = taosHashIterate(p->pTsmaMap, pIter);
|
pIter = taosHashIterate(p->pTsmaMap, pIter);
|
||||||
}
|
}
|
||||||
|
@ -3930,7 +3933,7 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->pVgMap) {
|
if (p->pVgMap) {
|
||||||
void* pIter = taosHashIterate(p->pVgMap, NULL);
|
void *pIter = taosHashIterate(p->pVgMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SVDropTbVgReqs *pReqs = pIter;
|
SVDropTbVgReqs *pReqs = pIter;
|
||||||
taosArrayDestroy(pReqs->req.pArray);
|
taosArrayDestroy(pReqs->req.pArray);
|
||||||
|
@ -3941,9 +3944,9 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx* p) {
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx** ppCtx) {
|
static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SMndDropTbsWithTsmaCtx* pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
|
SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
|
||||||
if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY;
|
if (!pCtx) return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (!pCtx->pTsmaMap) {
|
if (!pCtx->pTsmaMap) {
|
||||||
|
@ -3969,8 +3972,8 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *mndBuildVDropTbsReq(SMnode *pMnode, const SVgroupInfo *pVgInfo, const SVDropTbBatchReq *pReq,
|
||||||
static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, const SVDropTbBatchReq* pReq, int32_t *len) {
|
int32_t *len) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
SMsgHead *pHead = NULL;
|
SMsgHead *pHead = NULL;
|
||||||
|
@ -3999,7 +4002,8 @@ static void* mndBuildVDropTbsReq(SMnode* pMnode, const SVgroupInfo* pVgInfo, con
|
||||||
return pHead;
|
return pHead;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SVDropTbVgReqs* pVgReqs, void* pCont, int32_t contLen) {
|
static int32_t mndSetDropTbsRedoActions(SMnode *pMnode, STrans *pTrans, const SVDropTbVgReqs *pVgReqs, void *pCont,
|
||||||
|
int32_t contLen) {
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = pVgReqs->info.epSet;
|
action.epSet = pVgReqs->info.epSet;
|
||||||
action.pCont = pCont;
|
action.pCont = pCont;
|
||||||
|
@ -4009,7 +4013,7 @@ static int32_t mndSetDropTbsRedoActions(SMnode* pMnode, STrans* pTrans, const SV
|
||||||
return mndTransAppendRedoAction(pTrans, &action);
|
return mndTransAppendRedoAction(pTrans, &action);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx* pCtx) {
|
static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg *pRsp, SMndDropTbsWithTsmaCtx *pCtx) {
|
||||||
SMnode *pMnode = pRsp->info.node;
|
SMnode *pMnode = pRsp->info.node;
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pRsp, "drop-tbs");
|
||||||
mndTransSetChangeless(pTrans);
|
mndTransSetChangeless(pTrans);
|
||||||
|
@ -4017,11 +4021,11 @@ static int32_t mndCreateDropTbsTxnPrepare(SRpcMsg* pRsp, SMndDropTbsWithTsmaCtx*
|
||||||
|
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
void* pIter = taosHashIterate(pCtx->pVgMap, NULL);
|
void *pIter = taosHashIterate(pCtx->pVgMap, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
const SVDropTbVgReqs* pVgReqs = pIter;
|
const SVDropTbVgReqs *pVgReqs = pIter;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
void* p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
|
void *p = mndBuildVDropTbsReq(pMnode, &pVgReqs->info, &pVgReqs->req, &len);
|
||||||
if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) {
|
if (!p || mndSetDropTbsRedoActions(pMnode, pTrans, pVgReqs, p, len) != 0) {
|
||||||
taosHashCancelIterate(pCtx->pVgMap, pIter);
|
taosHashCancelIterate(pCtx->pVgMap, pIter);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
@ -4035,7 +4039,7 @@ _OVER:
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
|
static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SDbObj *pDb = NULL;
|
SDbObj *pDb = NULL;
|
||||||
|
@ -4047,16 +4051,15 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg* pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMndDropTbsWithTsmaCtx* pCtx = NULL;
|
SMndDropTbsWithTsmaCtx *pCtx = NULL;
|
||||||
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
|
terrno = mndInitDropTbsWithTsmaCtx(&pCtx);
|
||||||
if (terrno) goto _OVER;
|
if (terrno) goto _OVER;
|
||||||
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
||||||
SMDropTbReqsOnSingleVg* pReq = taosArrayGet(dropReq.pVgReqs, i);
|
SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
|
||||||
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||||
if (terrno) goto _OVER;
|
if (terrno) goto _OVER;
|
||||||
}
|
}
|
||||||
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0)
|
if (mndCreateDropTbsTxnPrepare(pReq, pCtx) == 0) code = 0;
|
||||||
code = 0;
|
|
||||||
_OVER:
|
_OVER:
|
||||||
tFreeSMDropTbsReq(&dropReq);
|
tFreeSMDropTbsReq(&dropReq);
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
|
@ -4067,7 +4070,7 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
|
||||||
bool ignoreNotExists) {
|
bool ignoreNotExists) {
|
||||||
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
|
SVDropTbReq req = {.name = name, .suid = suid, .igNotExists = ignoreNotExists};
|
||||||
|
|
||||||
SVDropTbVgReqs * pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
SVDropTbVgReqs *pReqs = taosHashGet(pVgHashMap, &pVgInfo->vgId, sizeof(pVgInfo->vgId));
|
||||||
SVDropTbVgReqs reqs = {0};
|
SVDropTbVgReqs reqs = {0};
|
||||||
if (pReqs == NULL) {
|
if (pReqs == NULL) {
|
||||||
reqs.info = *pVgInfo;
|
reqs.info = *pVgInfo;
|
||||||
|
@ -4080,16 +4083,16 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGetDbVgInfoForTsma(SMnode* pMnode, const char* dbname, SMDropTbTsmaInfo* pInfo) {
|
static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDbObj* pDb = mndAcquireDb(pMnode, dbname);
|
SDbObj *pDb = mndAcquireDb(pMnode, dbname);
|
||||||
if (!pDb) {
|
if (!pDb) {
|
||||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
if ( !pInfo->dbInfo.dbVgInfos) {
|
if (!pInfo->dbInfo.dbVgInfos) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
@ -4108,9 +4111,9 @@ _end:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vgHashValCmp(const void* lp, const void* rp) {
|
int32_t vgHashValCmp(const void *lp, const void *rp) {
|
||||||
uint32_t* key = (uint32_t*)lp;
|
uint32_t *key = (uint32_t *)lp;
|
||||||
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
SVgroupInfo *pVg = (SVgroupInfo *)rp;
|
||||||
|
|
||||||
if (*key < pVg->hashBegin) {
|
if (*key < pVg->hashBegin) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -4121,23 +4124,26 @@ int32_t vgHashValCmp(const void* lp, const void* rp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWithTsmaCtx* pCtx, SArray* pTbs, int32_t vgId) {
|
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||||
|
int32_t vgId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SVgObj* pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
if (!pVgObj) {
|
if (!pVgObj) {
|
||||||
code = 0;
|
code = 0;
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin, .hashEnd = pVgObj->hashEnd, .numOfTable = pVgObj->numOfTables, .vgId = pVgObj->vgId};
|
SVgroupInfo vgInfo = {.hashBegin = pVgObj->hashBegin,
|
||||||
|
.hashEnd = pVgObj->hashEnd,
|
||||||
|
.numOfTable = pVgObj->numOfTables,
|
||||||
|
.vgId = pVgObj->vgId};
|
||||||
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
|
||||||
// get all stb uids
|
// get all stb uids
|
||||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||||
const SVDropTbReq* pTb = taosArrayGet(pTbs, i);
|
const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||||
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
|
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
SMDropTbTsmaInfos infos = {0};
|
SMDropTbTsmaInfos infos = {0};
|
||||||
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
|
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
|
||||||
|
@ -4156,14 +4162,14 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
if (!pIter) break;
|
if (!pIter) break;
|
||||||
SMDropTbTsmaInfos* pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
|
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
|
||||||
if (pInfos) {
|
if (pInfos) {
|
||||||
SMDropTbTsmaInfo info = {0};
|
SMDropTbTsmaInfo info = {0};
|
||||||
int32_t len = sprintf(buf, "%s", pSma->name);
|
int32_t len = sprintf(buf, "%s", pSma->name);
|
||||||
len = taosCreateMD5Hash(buf, len);
|
len = taosCreateMD5Hash(buf, len);
|
||||||
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
|
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
|
||||||
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_NAME_LEN, "%s", buf);
|
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_NAME_LEN, "%s", buf);
|
||||||
SMDropTbDbInfo* pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
|
SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
|
||||||
info.suid = pSma->dstTbUid;
|
info.suid = pSma->dstTbUid;
|
||||||
if (!pDbInfo) {
|
if (!pDbInfo) {
|
||||||
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
|
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
|
||||||
|
@ -4183,7 +4189,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
|
|
||||||
// generate vg req map
|
// generate vg req map
|
||||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||||
SVDropTbReq* pTb = taosArrayGet(pTbs, i);
|
SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||||
mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists);
|
mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists);
|
||||||
|
|
||||||
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
|
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
|
||||||
|
@ -4195,7 +4201,7 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode* pMnode, SMndDropTbsWith
|
||||||
uint32_t hashVal =
|
uint32_t hashVal =
|
||||||
taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix);
|
taosGetTbHashVal(buf, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix, pInfo->dbInfo.hashSuffix);
|
||||||
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
|
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
|
||||||
void* p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
|
void *p = taosStrdup(buf + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
|
||||||
taosArrayPush(pCtx->pResTbNames, &p);
|
taosArrayPush(pCtx->pResTbNames, &p);
|
||||||
mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true);
|
mndDropTbAdd(pMnode, pCtx->pVgMap, pVgInfo, p, pInfo->suid, true);
|
||||||
}
|
}
|
||||||
|
@ -4225,8 +4231,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
||||||
|
|
||||||
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
terrno = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||||
if (terrno) goto _end;
|
if (terrno) goto _end;
|
||||||
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0)
|
if (mndCreateDropTbsTxnPrepare(pRsp, pCtx) == 0) code = 0;
|
||||||
code = 0;
|
|
||||||
_end:
|
_end:
|
||||||
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
if (pCtx) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
Loading…
Reference in New Issue