Merge pull request #13386 from taosdata/feature/TD-14481-3.0
feat: tsma days refactor
This commit is contained in:
commit
d7288bfa45
|
@ -2354,19 +2354,19 @@ typedef struct {
|
|||
STSma* tSma;
|
||||
} STSmaWrapper;
|
||||
|
||||
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) {
|
||||
static FORCE_INLINE void tDestroyTSma(STSma* pSma) {
|
||||
if (pSma) {
|
||||
taosMemoryFreeClear(pSma->expr);
|
||||
taosMemoryFreeClear(pSma->tagsFilter);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
|
||||
static FORCE_INLINE void tDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
|
||||
if (pSW) {
|
||||
if (pSW->tSma) {
|
||||
if (deepCopy) {
|
||||
for (uint32_t i = 0; i < pSW->number; ++i) {
|
||||
tdDestroyTSma(pSW->tSma + i);
|
||||
tDestroyTSma(pSW->tSma + i);
|
||||
}
|
||||
}
|
||||
taosMemoryFreeClear(pSW->tSma);
|
||||
|
@ -2374,8 +2374,8 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy)
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
|
||||
tdDestroyTSmaWrapper(pSW, deepCopy);
|
||||
static FORCE_INLINE void* tFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
|
||||
tDestroyTSmaWrapper(pSW, deepCopy);
|
||||
taosMemoryFreeClear(pSW);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -180,6 +180,24 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
|
|||
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCreate->vgId);
|
||||
}
|
||||
|
||||
static int32_t vmTsmaAdjustDays(SVnodeCfg *pCfg, SCreateVnodeReq *pReq) {
|
||||
if (pReq->isTsma) {
|
||||
SMsgHead *smaMsg = pReq->pTsma;
|
||||
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
|
||||
return smaGetTSmaDays(pCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &pCfg->tsdbCfg.days);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t vmTsmaProcessCreate(SVnode *pVnode, SCreateVnodeReq *pReq) {
|
||||
if (pReq->isTsma) {
|
||||
SMsgHead *smaMsg = pReq->pTsma;
|
||||
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
|
||||
return vnodeProcessCreateTSma(pVnode, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
SCreateVnodeReq createReq = {0};
|
||||
SVnodeCfg vnodeCfg = {0};
|
||||
|
@ -195,6 +213,13 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
dDebug("vgId:%d, create vnode req is received, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
|
||||
createReq.standby);
|
||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
||||
|
||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
||||
dError("vgId:%d, failed to adjust tsma days since %s", createReq.vgId, terrstr());
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
|
||||
|
||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
|
||||
|
@ -203,14 +228,16 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
tFreeSCreateVnodeReq(&createReq);
|
||||
vmReleaseVnode(pMgmt, pVnode);
|
||||
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||
return -1;
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
|
||||
if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) {
|
||||
tFreeSCreateVnodeReq(&createReq);
|
||||
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
|
||||
return -1;
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
|
||||
|
@ -227,14 +254,11 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
if (createReq.isTsma) {
|
||||
SMsgHead *smaMsg = createReq.pTsma;
|
||||
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
|
||||
if (vnodeProcessCreateTSma(pImpl, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen) < 0) {
|
||||
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
};
|
||||
code = vmTsmaProcessCreate(pImpl, &createReq);
|
||||
if (code != 0) {
|
||||
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
|
||||
code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = vnodeStart(pImpl);
|
||||
|
|
|
@ -32,7 +32,7 @@ target_sources(
|
|||
"src/sma/smaEnv.c"
|
||||
"src/sma/smaOpen.c"
|
||||
"src/sma/smaRollup.c"
|
||||
"src/sma/smaTimeRange.c"
|
||||
"src/sma/smaTimeRange2.c"
|
||||
|
||||
# tsdb
|
||||
"src/tsdb/tsdbCommit.c"
|
||||
|
|
|
@ -147,6 +147,9 @@ bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids
|
|||
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
|
||||
int32_t *pNumOfRows, int16_t *pNumOfCols);
|
||||
|
||||
// sma
|
||||
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||
|
||||
// need to reposition
|
||||
|
||||
// structs
|
||||
|
|
|
@ -223,6 +223,8 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
|
|||
// TODO: This is the basic params, and should wrap the params to a queryHandle.
|
||||
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
|
||||
|
||||
int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -464,7 +464,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
|||
_err:
|
||||
metaReaderClear(&mr);
|
||||
taosArrayDestroy(pSmaIds);
|
||||
tdFreeTSmaWrapper(pSW, deepCopy);
|
||||
tFreeTSmaWrapper(pSW, deepCopy);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,3 +51,11 @@ int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey
|
|||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t *days) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) {
|
||||
smaWarn("vgId:%d get tSma days failed since %s", pCfg->vgId, tstrerror(terrno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -278,7 +278,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
|
|||
|
||||
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
|
||||
if (pSmaStatItem) {
|
||||
tdDestroyTSma(pSmaStatItem->pTSma);
|
||||
tDestroyTSma(pSmaStatItem->pTSma);
|
||||
taosMemoryFreeClear(pSmaStatItem->pTSma);
|
||||
taosHashCleanup(pSmaStatItem->expiredWindows);
|
||||
taosMemoryFreeClear(pSmaStatItem);
|
||||
|
|
|
@ -828,7 +828,7 @@ int32_t tdDropTSma(SSma *pSma, char *pMsg) {
|
|||
|
||||
// TODO: send msg to stream computing to drop tSma
|
||||
// if ((send msg to stream computing) < 0) {
|
||||
// tdDestroyTSma(&vCreateSmaReq);
|
||||
// tDestroyTSma(&vCreateSmaReq);
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
|
@ -982,25 +982,25 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
|
|||
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) {
|
||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||
pSW = tFreeTSmaWrapper(pSW, false);
|
||||
break;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
STSRow *row = tGetSubmitBlkNext(&blkIter);
|
||||
if (!row) {
|
||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||
pSW = tFreeTSmaWrapper(pSW, false);
|
||||
break;
|
||||
}
|
||||
if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) {
|
||||
if (pSW) {
|
||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||
pSW = tFreeTSmaWrapper(pSW, false);
|
||||
}
|
||||
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) {
|
||||
break;
|
||||
}
|
||||
if ((pSW->number) <= 0 || !pSW->tSma) {
|
||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||
pSW = tFreeTSmaWrapper(pSW, false);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -1020,7 +1020,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, const SSubmitReq *pMsg, int64_t ve
|
|||
if (lastWinSKey != winSKey) {
|
||||
lastWinSKey = winSKey;
|
||||
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
|
||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||
pSW = tFreeTSmaWrapper(pSW, false);
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -147,8 +147,8 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
|||
|
||||
// resource release
|
||||
taosMemoryFreeClear(pSW);
|
||||
tdDestroyTSma(&tSma);
|
||||
tdDestroyTSmaWrapper(&dstTSmaWrapper);
|
||||
tDestroyTSma(&tSma);
|
||||
tDestroyTSmaWrapper(&dstTSmaWrapper);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
@ -218,7 +218,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
||||
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
||||
tdDestroyTSma(qSmaCfg);
|
||||
tDestroyTSma(qSmaCfg);
|
||||
taosMemoryFreeClear(qSmaCfg);
|
||||
|
||||
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2, true);
|
||||
|
@ -229,7 +229,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
||||
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
|
||||
tdDestroyTSma(qSmaCfg);
|
||||
tDestroyTSma(qSmaCfg);
|
||||
taosMemoryFreeClear(qSmaCfg);
|
||||
|
||||
// get index name by table uid
|
||||
|
@ -265,7 +265,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
|
||||
EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid);
|
||||
|
||||
tdDestroyTSmaWrapper(pSW);
|
||||
tDestroyTSmaWrapper(pSW);
|
||||
taosMemoryFreeClear(pSW);
|
||||
|
||||
// get all sma table uids
|
||||
|
@ -282,7 +282,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
metaRemoveSmaFromDb(pMeta, indexUid1);
|
||||
metaRemoveSmaFromDb(pMeta, indexUid2);
|
||||
|
||||
tdDestroyTSma(&tSma);
|
||||
tDestroyTSma(&tSma);
|
||||
metaClose(pMeta);
|
||||
}
|
||||
#endif
|
||||
|
@ -576,7 +576,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
taosArrayDestroy(pDataBlocks);
|
||||
|
||||
// release meta
|
||||
tdDestroyTSma(&tSma);
|
||||
tDestroyTSma(&tSma);
|
||||
tfsClose(pTsdb->pTfs);
|
||||
tsdbClose(pTsdb);
|
||||
metaClose(pMeta);
|
||||
|
|
Loading…
Reference in New Issue