refine unused codes
This commit is contained in:
parent
2ddd07142a
commit
274a7fd876
|
@ -4086,42 +4086,16 @@ typedef struct SMDropTbTsmaInfos {
|
|||
} SMDropTbTsmaInfos;
|
||||
|
||||
typedef struct SMndDropTbsWithTsmaCtx {
|
||||
SHashObj *pTsmaMap; // <suid, SMDropTbTsmaInfos>
|
||||
SHashObj *pDbMap; // <dbuid, SMDropTbDbInfo>
|
||||
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>, only for non tsma result child table
|
||||
SHashObj *pTsmaTbVgMap; // <vgid, SVDropTbVgReqs>, only for tsma result child table
|
||||
SArray *pResTbNames; // SArray<char*>
|
||||
SHashObj *pVgMap; // <vgId, SVDropTbVgReqs>
|
||||
} SMndDropTbsWithTsmaCtx;
|
||||
|
||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||
int32_t vgId);
|
||||
|
||||
static void destroySVDropTbBatchReqs(void *p);
|
||||
static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
||||
if (!p) return;
|
||||
|
||||
if (p->pDbMap) {
|
||||
void *pIter = taosHashIterate(p->pDbMap, NULL);
|
||||
while (pIter) {
|
||||
SMDropTbDbInfo *pInfo = pIter;
|
||||
taosArrayDestroy(pInfo->dbVgInfos);
|
||||
pIter = taosHashIterate(p->pDbMap, pIter);
|
||||
}
|
||||
taosHashCleanup(p->pDbMap);
|
||||
}
|
||||
if (p->pResTbNames) {
|
||||
taosArrayDestroyP(p->pResTbNames, taosMemoryFree);
|
||||
}
|
||||
if (p->pTsmaMap) {
|
||||
void *pIter = taosHashIterate(p->pTsmaMap, NULL);
|
||||
while (pIter) {
|
||||
SMDropTbTsmaInfos *pInfos = pIter;
|
||||
taosArrayDestroy(pInfos->pTsmaInfos);
|
||||
pIter = taosHashIterate(p->pTsmaMap, pIter);
|
||||
}
|
||||
taosHashCleanup(p->pTsmaMap);
|
||||
}
|
||||
|
||||
if (p->pVgMap) {
|
||||
void *pIter = taosHashIterate(p->pVgMap, NULL);
|
||||
while (pIter) {
|
||||
|
@ -4131,16 +4105,6 @@ static void mndDestroyDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx *p) {
|
|||
}
|
||||
taosHashCleanup(p->pVgMap);
|
||||
}
|
||||
|
||||
if (p->pTsmaTbVgMap) {
|
||||
void *pIter = taosHashIterate(p->pTsmaTbVgMap, NULL);
|
||||
while (pIter) {
|
||||
SVDropTbVgReqs *pReqs = pIter;
|
||||
taosArrayDestroyEx(pReqs->pBatchReqs, destroySVDropTbBatchReqs);
|
||||
pIter = taosHashIterate(p->pTsmaTbVgMap, pIter);
|
||||
}
|
||||
taosHashCleanup(p->pTsmaTbVgMap);
|
||||
}
|
||||
taosMemoryFree(p);
|
||||
}
|
||||
|
||||
|
@ -4148,18 +4112,6 @@ static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
|
|||
int32_t code = 0;
|
||||
SMndDropTbsWithTsmaCtx *pCtx = taosMemoryCalloc(1, sizeof(SMndDropTbsWithTsmaCtx));
|
||||
if (!pCtx) return terrno;
|
||||
pCtx->pTsmaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
if (!pCtx->pTsmaMap) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pCtx->pDbMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (!pCtx->pDbMap) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
pCtx->pResTbNames = taosArrayInit(TARRAY_MIN_SIZE, POINTER_BYTES);
|
||||
|
||||
pCtx->pVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (!pCtx->pVgMap) {
|
||||
|
@ -4167,11 +4119,6 @@ static int32_t mndInitDropTbsWithTsmaCtx(SMndDropTbsWithTsmaCtx **ppCtx) {
|
|||
goto _end;
|
||||
}
|
||||
|
||||
pCtx->pTsmaTbVgMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
if (!pCtx->pTsmaTbVgMap) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
*ppCtx = pCtx;
|
||||
_end:
|
||||
if (code) mndDestroyDropTbsWithTsmaCtx(pCtx);
|
||||
|
@ -4286,7 +4233,7 @@ static int32_t mndProcessDropTbWithTsma(SRpcMsg *pReq) {
|
|||
if (code) goto _OVER;
|
||||
for (int32_t i = 0; i < dropReq.pVgReqs->size; ++i) {
|
||||
SMDropTbReqsOnSingleVg *pReq = taosArrayGet(dropReq.pVgReqs, i);
|
||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||
code = mndDropTbForSingleVg(pMnode, pCtx, pReq->pTbs, pReq->vgInfo.vgId);
|
||||
if (code) goto _OVER;
|
||||
}
|
||||
code = mndCreateDropTbsTxnPrepare(pReq, pCtx);
|
||||
|
@ -4350,61 +4297,7 @@ static int32_t mndDropTbAdd(SMnode *pMnode, SHashObj *pVgHashMap, const SVgroupI
|
|||
return 0;
|
||||
}
|
||||
|
||||
int vgInfoCmp(const void *lp, const void *rp) {
|
||||
SVgroupInfo *pLeft = (SVgroupInfo *)lp;
|
||||
SVgroupInfo *pRight = (SVgroupInfo *)rp;
|
||||
if (pLeft->hashBegin < pRight->hashBegin) {
|
||||
return -1;
|
||||
} else if (pLeft->hashBegin > pRight->hashBegin) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndGetDbVgInfoForTsma(SMnode *pMnode, const char *dbname, SMDropTbTsmaInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, dbname);
|
||||
if (!pDb) {
|
||||
code = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
pInfo->dbInfo.dbVgInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||
if (!pInfo->dbInfo.dbVgInfos) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
mndBuildDBVgroupInfo(pDb, pMnode, pInfo->dbInfo.dbVgInfos);
|
||||
taosArraySort(pInfo->dbInfo.dbVgInfos, vgInfoCmp);
|
||||
|
||||
pInfo->dbInfo.hashPrefix = pDb->cfg.hashPrefix;
|
||||
pInfo->dbInfo.hashSuffix = pDb->cfg.hashSuffix;
|
||||
pInfo->dbInfo.hashMethod = pDb->cfg.hashMethod;
|
||||
|
||||
_end:
|
||||
if (pDb) mndReleaseDb(pMnode, pDb);
|
||||
if (code && pInfo->dbInfo.dbVgInfos) {
|
||||
taosArrayDestroy(pInfo->dbInfo.dbVgInfos);
|
||||
pInfo->dbInfo.dbVgInfos = NULL;
|
||||
}
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t vgHashValCmp(const void *lp, const void *rp) {
|
||||
uint32_t *key = (uint32_t *)lp;
|
||||
SVgroupInfo *pVg = (SVgroupInfo *)rp;
|
||||
|
||||
if (*key < pVg->hashBegin) {
|
||||
return -1;
|
||||
} else if (*key > pVg->hashEnd) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||
static int32_t mndDropTbForSingleVg(SMnode *pMnode, SMndDropTbsWithTsmaCtx *pCtx, SArray *pTbs,
|
||||
int32_t vgId) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -4420,88 +4313,9 @@ static int32_t mndDropTbAddTsmaResTbsForSingleVg(SMnode *pMnode, SMndDropTbsWith
|
|||
vgInfo.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
||||
// get all stb uids
|
||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||
const SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||
if (taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid))) {
|
||||
} else {
|
||||
SMDropTbTsmaInfos infos = {0};
|
||||
infos.pTsmaInfos = taosArrayInit(2, sizeof(SMDropTbTsmaInfo));
|
||||
if (!infos.pTsmaInfos) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
if (taosHashPut(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid), &infos, sizeof(infos)) != 0) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void *pIter = NULL;
|
||||
SSmaObj *pSma = NULL;
|
||||
char buf[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
// get used tsmas and it's dbs
|
||||
while (1) {
|
||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (!pIter) break;
|
||||
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pSma->stbUid, sizeof(pSma->stbUid));
|
||||
if (pInfos) {
|
||||
SMDropTbTsmaInfo info = {0};
|
||||
int32_t len = sprintf(buf, "%s", pSma->name);
|
||||
sprintf(info.tsmaResTbDbFName, "%s", pSma->db);
|
||||
snprintf(info.tsmaResTbNamePrefix, TSDB_TABLE_FNAME_LEN, "%s", buf);
|
||||
SMDropTbDbInfo *pDbInfo = taosHashGet(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN);
|
||||
info.suid = pSma->dstTbUid;
|
||||
if (!pDbInfo) {
|
||||
code = mndGetDbVgInfoForTsma(pMnode, pSma->db, &info);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
goto _end;
|
||||
}
|
||||
if (taosHashPut(pCtx->pDbMap, pSma->db, TSDB_DB_FNAME_LEN, &info.dbInfo, sizeof(SMDropTbDbInfo)) != 0) {
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
goto _end;
|
||||
}
|
||||
} else {
|
||||
info.dbInfo = *pDbInfo;
|
||||
}
|
||||
if (taosArrayPush(pInfos->pTsmaInfos, &info) == NULL) {
|
||||
code = terrno;
|
||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
sdbRelease(pMnode->pSdb, pSma);
|
||||
}
|
||||
|
||||
// generate vg req map
|
||||
for (int32_t i = 0; i < pTbs->size; ++i) {
|
||||
SVDropTbReq *pTb = taosArrayGet(pTbs, i);
|
||||
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pVgMap, &vgInfo, pTb->name, pTb->suid, pTb->igNotExists), NULL, _end);
|
||||
|
||||
SMDropTbTsmaInfos *pInfos = taosHashGet(pCtx->pTsmaMap, &pTb->suid, sizeof(pTb->suid));
|
||||
SArray *pVgInfos = NULL;
|
||||
char buf[TSDB_TABLE_FNAME_LEN + TSDB_TABLE_NAME_LEN + 1];
|
||||
char resTbFullName[TSDB_TABLE_FNAME_LEN + 1] = {0};
|
||||
for (int32_t j = 0; j < pInfos->pTsmaInfos->size; ++j) {
|
||||
SMDropTbTsmaInfo *pInfo = taosArrayGet(pInfos->pTsmaInfos, j);
|
||||
int32_t len = sprintf(buf, "%s_%s", pInfo->tsmaResTbNamePrefix, pTb->name);
|
||||
len = taosCreateMD5Hash(buf, len);
|
||||
len = snprintf(resTbFullName, TSDB_TABLE_FNAME_LEN + 1, "%s.%s", pInfo->tsmaResTbDbFName, buf);
|
||||
uint32_t hashVal = taosGetTbHashVal(resTbFullName, len, pInfo->dbInfo.hashMethod, pInfo->dbInfo.hashPrefix,
|
||||
pInfo->dbInfo.hashSuffix);
|
||||
const SVgroupInfo *pVgInfo = taosArraySearch(pInfo->dbInfo.dbVgInfos, &hashVal, vgHashValCmp, TD_EQ);
|
||||
void *p = taosStrdup(resTbFullName + strlen(pInfo->tsmaResTbDbFName) + TSDB_NAME_DELIMITER_LEN);
|
||||
if (taosArrayPush(pCtx->pResTbNames, &p) == NULL) {
|
||||
code = terrno;
|
||||
goto _end;
|
||||
}
|
||||
TAOS_CHECK_GOTO(mndDropTbAdd(pMnode, pCtx->pTsmaTbVgMap, pVgInfo, p, pInfo->suid, true), NULL, _end);
|
||||
}
|
||||
}
|
||||
_end:
|
||||
return code;
|
||||
|
@ -4529,7 +4343,7 @@ static int32_t mndProcessFetchTtlExpiredTbs(SRpcMsg *pRsp) {
|
|||
code = mndInitDropTbsWithTsmaCtx(&pCtx);
|
||||
if (code) goto _end;
|
||||
|
||||
code = mndDropTbAddTsmaResTbsForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||
code = mndDropTbForSingleVg(pMnode, pCtx, rsp.pExpiredTbs, rsp.vgId);
|
||||
if (code) goto _end;
|
||||
code = mndCreateDropTbsTxnPrepare(pRsp, pCtx);
|
||||
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
|
|
|
@ -451,20 +451,19 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
|||
req.igNotExists = true;
|
||||
|
||||
SColumnInfoData* pTbNameCol = taosArrayGet(pDataBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
void* pData = colDataGetVarData(pTbNameCol, i);
|
||||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||
tbName[varDataLen(pData) + 1] = 0;
|
||||
req.name = tbName;
|
||||
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
|
||||
TSDB_CHECK_CODE(terrno, lino, _exit);
|
||||
}
|
||||
char tbName[TSDB_TABLE_NAME_LEN + 1] = {0};
|
||||
int32_t i = 0;
|
||||
void* pData = colDataGetVarData(pTbNameCol, i);
|
||||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||
tbName[varDataLen(pData) + 1] = 0;
|
||||
req.name = tbName;
|
||||
if (taosArrayPush(batchReq.pArray, &req) == NULL) {
|
||||
TSDB_CHECK_CODE(terrno, lino, _exit);
|
||||
}
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderDoInit(&mr, pVnode->pMeta, META_READER_LOCK);
|
||||
// only one row
|
||||
|
||||
code = metaGetTableEntryByName(&mr, tbName);
|
||||
if (TSDB_CODE_SUCCESS == code && isValidDstChildTable(&mr, TD_VID(pVnode), tbName, pTask->outputInfo.tbSink.stbUid)) {
|
||||
STableSinkInfo* pTableSinkInfo = NULL;
|
||||
|
@ -478,13 +477,9 @@ static int32_t doBuildAndSendDropTableMsg(SVnode* pVnode, char* pStbFullname, SS
|
|||
code = tqPutReqToQueue(pVnode, &batchReq, encodeDropChildTableForRPC, TDMT_VND_DROP_TABLE);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
for (int32_t i = 0; i < rows; ++i) {
|
||||
void* pData = colDataGetVarData(pTbNameCol, i);
|
||||
memcpy(tbName, varDataVal(pData), varDataLen(pData));
|
||||
tbName[varDataLen(pData) + 1] = 0;
|
||||
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = doWaitForDstTableDropped(pVnode, pTask, tbName);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (batchReq.pArray) {
|
||||
|
|
|
@ -527,9 +527,13 @@ _end:
|
|||
|
||||
int32_t streamStateDeleteParName(SStreamState* pState, int64_t groupId) {
|
||||
int32_t code = tSimpleHashRemove(pState->parNameMap, &groupId, sizeof(int64_t));
|
||||
qTrace("catche %s at line %d res %d", __func__, __LINE__, code);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qWarn("failed to remove parname from cache, code:%d", code);
|
||||
}
|
||||
code = streamStateDeleteParName_rocksdb(pState, groupId);
|
||||
qTrace("disk %s at line %d res %d", __func__, __LINE__, code);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
qWarn("failed to remove parname from rocksdb, code:%d", code);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue