feat: tsma days refactor

This commit is contained in:
Cary Xu 2022-06-01 19:06:58 +08:00
parent 169b3fc1d1
commit e88abb16c2
13 changed files with 1136 additions and 30 deletions

View File

@ -2352,19 +2352,19 @@ typedef struct {
STSma* tSma; STSma* tSma;
} STSmaWrapper; } STSmaWrapper;
static FORCE_INLINE void tdDestroyTSma(STSma* pSma) { static FORCE_INLINE void tDestroyTSma(STSma* pSma) {
if (pSma) { if (pSma) {
taosMemoryFreeClear(pSma->expr); taosMemoryFreeClear(pSma->expr);
taosMemoryFreeClear(pSma->tagsFilter); taosMemoryFreeClear(pSma->tagsFilter);
} }
} }
static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) { static FORCE_INLINE void tDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
if (pSW) { if (pSW) {
if (pSW->tSma) { if (pSW->tSma) {
if (deepCopy) { if (deepCopy) {
for (uint32_t i = 0; i < pSW->number; ++i) { for (uint32_t i = 0; i < pSW->number; ++i) {
tdDestroyTSma(pSW->tSma + i); tDestroyTSma(pSW->tSma + i);
} }
} }
taosMemoryFreeClear(pSW->tSma); taosMemoryFreeClear(pSW->tSma);
@ -2372,8 +2372,8 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool deepCopy)
} }
} }
static FORCE_INLINE void* tdFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) { static FORCE_INLINE void* tFreeTSmaWrapper(STSmaWrapper* pSW, bool deepCopy) {
tdDestroyTSmaWrapper(pSW, deepCopy); tDestroyTSmaWrapper(pSW, deepCopy);
taosMemoryFreeClear(pSW); taosMemoryFreeClear(pSW);
return NULL; return NULL;
} }

View File

@ -194,6 +194,15 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SVnodeCfg vnodeCfg = {0}; SVnodeCfg vnodeCfg = {0};
vmGenerateVnodeCfg(&createReq, &vnodeCfg); vmGenerateVnodeCfg(&createReq, &vnodeCfg);
if (createReq.isTsma) {
SMsgHead *smaMsg = createReq.pTsma;
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
if (smaGetTSmaDays(&vnodeCfg, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen, &vnodeCfg.tsdbCfg.days) < 0) {
dError("vgId:%d, failed to get tsma days since %s", createReq.vgId, terrstr());
return -1;
}
}
SWrapperCfg wrapperCfg = {0}; SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg); vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);

View File

@ -32,7 +32,7 @@ target_sources(
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
"src/sma/smaOpen.c" "src/sma/smaOpen.c"
"src/sma/smaRollup.c" "src/sma/smaRollup.c"
"src/sma/smaTimeRange.c" "src/sma/smaTimeRange2.c"
# tsdb # tsdb
# "src/tsdb/tsdbTDBImpl.c" # "src/tsdb/tsdbTDBImpl.c"

View File

@ -146,6 +146,9 @@ bool tqNextDataBlockFilterOut(STqReadHandle *pHandle, SHashObj *filterOutUids
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid, int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, uint64_t *pUid,
int32_t *pNumOfRows, int16_t *pNumOfCols); int32_t *pNumOfRows, int16_t *pNumOfCols);
// sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
// need to reposition // need to reposition
// structs // structs

View File

@ -223,6 +223,8 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
// TODO: This is the basic params, and should wrap the params to a queryHandle. // TODO: This is the basic params, and should wrap the params to a queryHandle.
int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult); int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
int32_t tdGetTSmaDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -464,7 +464,7 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
_err: _err:
metaReaderClear(&mr); metaReaderClear(&mr);
taosArrayDestroy(pSmaIds); taosArrayDestroy(pSmaIds);
tdFreeTSmaWrapper(pSW, deepCopy); tFreeTSmaWrapper(pSW, deepCopy);
return NULL; return NULL;
} }

View File

@ -52,3 +52,11 @@ int32_t tdGetTSmaData(SSma* pSma, char* pData, int64_t indexUid, TSKEY querySKey
} }
return code; return code;
} }
int32_t smaGetTSmaDays(SVnodeCfg* pCfg, void* pCont, uint32_t contLen, int32_t *days) {
int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdGetTSmaDaysImpl(pCfg, pCont, contLen, days)) < 0) {
smaWarn("vgId:%d get tSma days failed since %s", pCfg->vgId, tstrerror(terrno));
}
return code;
}

View File

@ -278,7 +278,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { void *tdFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem) { if (pSmaStatItem) {
tdDestroyTSma(pSmaStatItem->pTSma); tDestroyTSma(pSmaStatItem->pTSma);
taosMemoryFreeClear(pSmaStatItem->pTSma); taosMemoryFreeClear(pSmaStatItem->pTSma);
taosHashCleanup(pSmaStatItem->expiredWindows); taosHashCleanup(pSmaStatItem->expiredWindows);
taosMemoryFreeClear(pSmaStatItem); taosMemoryFreeClear(pSmaStatItem);

View File

@ -828,7 +828,7 @@ int32_t tdDropTSma(SSma *pSma, char *pMsg) {
// TODO: send msg to stream computing to drop tSma // TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) { // if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq); // tDestroyTSma(&vCreateSmaReq);
// return -1; // return -1;
// } // }
// //
@ -982,25 +982,25 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) { if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) < 0) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tFreeTSmaWrapper(pSW, false);
break; break;
} }
while (true) { while (true) {
STSRow *row = tGetSubmitBlkNext(&blkIter); STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) { if (!row) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tFreeTSmaWrapper(pSW, false);
break; break;
} }
if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) { if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) {
if (pSW) { if (pSW) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tFreeTSmaWrapper(pSW, false);
} }
if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) { if (!(pSW = metaGetSmaInfoByTable(SMA_META(pSma), msgIter.suid, false))) {
break; break;
} }
if ((pSW->number) <= 0 || !pSW->tSma) { if ((pSW->number) <= 0 || !pSW->tSma) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tFreeTSmaWrapper(pSW, false);
break; break;
} }
@ -1020,7 +1020,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
if (lastWinSKey != winSKey) { if (lastWinSKey != winSKey) {
lastWinSKey = winSKey; lastWinSKey = winSKey;
if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) { if (tdSetExpiredWindow(pSma, pItemsHash, pTSma->indexUid, winSKey, version) < 0) {
pSW = tdFreeTSmaWrapper(pSW, false); pSW = tFreeTSmaWrapper(pSW, false);
tdUnRefSmaStat(pSma, pStat); tdUnRefSmaStat(pSma, pStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

File diff suppressed because it is too large Load Diff

View File

@ -229,7 +229,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
} }
memcpy(data, msg, msgLen); memcpy(data, msg, msgLen);
tqProcessStreamTrigger(pTq, data); // tqProcessStreamTrigger(pTq, data);
} }
return 0; return 0;

View File

@ -515,7 +515,7 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) {
static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) {
if (pSmaStatItem) { if (pSmaStatItem) {
tdDestroyTSma(pSmaStatItem->pSma); tDestroyTSma(pSmaStatItem->pSma);
taosMemoryFreeClear(pSmaStatItem->pSma); taosMemoryFreeClear(pSmaStatItem->pSma);
taosHashCleanup(pSmaStatItem->expiredWindows); taosHashCleanup(pSmaStatItem->expiredWindows);
taosMemoryFreeClear(pSmaStatItem); taosMemoryFreeClear(pSmaStatItem);
@ -718,25 +718,25 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
SSubmitBlkIter blkIter = {0}; SSubmitBlkIter blkIter = {0};
if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) { if (tInitSubmitBlkIter(&msgIter, pBlock, &blkIter) != TSDB_CODE_SUCCESS) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tFreeTSmaWrapper(pSW);
break; break;
} }
while (true) { while (true) {
STSRow *row = tGetSubmitBlkNext(&blkIter); STSRow *row = tGetSubmitBlkNext(&blkIter);
if (!row) { if (!row) {
tdFreeTSmaWrapper(pSW); tFreeTSmaWrapper(pSW);
break; break;
} }
if (!pSW || (pTSma->tableUid != pBlock->suid)) { if (!pSW || (pTSma->tableUid != pBlock->suid)) {
if (pSW) { if (pSW) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tFreeTSmaWrapper(pSW);
} }
if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) { if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) {
break; break;
} }
if ((pSW->number) <= 0 || !pSW->tSma) { if ((pSW->number) <= 0 || !pSW->tSma) {
pSW = tdFreeTSmaWrapper(pSW); pSW = tFreeTSmaWrapper(pSW);
break; break;
} }
@ -1649,13 +1649,13 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) {
// TODO: handle error // TODO: handle error
tsdbWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", REPO_ID(pTsdb), tsdbWarn("vgId:%d tsma %s:%" PRIi64 " create failed for table %" PRIi64 " since %s", REPO_ID(pTsdb),
vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno)); vCreateSmaReq.tSma.indexName, vCreateSmaReq.tSma.indexUid, vCreateSmaReq.tSma.tableUid, terrstr(terrno));
tdDestroyTSma(&vCreateSmaReq.tSma); tDestroyTSma(&vCreateSmaReq.tSma);
return -1; return -1;
} }
tsdbTSmaAdd(pTsdb, 1); tsdbTSmaAdd(pTsdb, 1);
tdDestroyTSma(&vCreateSmaReq.tSma); tDestroyTSma(&vCreateSmaReq.tSma);
// TODO: return directly or go on follow steps? // TODO: return directly or go on follow steps?
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1669,7 +1669,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
// TODO: send msg to stream computing to drop tSma // TODO: send msg to stream computing to drop tSma
// if ((send msg to stream computing) < 0) { // if ((send msg to stream computing) < 0) {
// tdDestroyTSma(&vCreateSmaReq); // tDestroyTSma(&vCreateSmaReq);
// return -1; // return -1;
// } // }
// //

View File

@ -147,8 +147,8 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
// resource release // resource release
taosMemoryFreeClear(pSW); taosMemoryFreeClear(pSW);
tdDestroyTSma(&tSma); tDestroyTSma(&tSma);
tdDestroyTSmaWrapper(&dstTSmaWrapper); tDestroyTSmaWrapper(&dstTSmaWrapper);
} }
#endif #endif
@ -218,7 +218,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid); EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
tdDestroyTSma(qSmaCfg); tDestroyTSma(qSmaCfg);
taosMemoryFreeClear(qSmaCfg); taosMemoryFreeClear(qSmaCfg);
qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2, true); qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2, true);
@ -229,7 +229,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
EXPECT_EQ(qSmaCfg->interval, tSma.interval); EXPECT_EQ(qSmaCfg->interval, tSma.interval);
tdDestroyTSma(qSmaCfg); tDestroyTSma(qSmaCfg);
taosMemoryFreeClear(qSmaCfg); taosMemoryFreeClear(qSmaCfg);
// get index name by table uid // get index name by table uid
@ -265,7 +265,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2); EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid); EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid);
tdDestroyTSmaWrapper(pSW); tDestroyTSmaWrapper(pSW);
taosMemoryFreeClear(pSW); taosMemoryFreeClear(pSW);
// get all sma table uids // get all sma table uids
@ -282,7 +282,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
metaRemoveSmaFromDb(pMeta, indexUid1); metaRemoveSmaFromDb(pMeta, indexUid1);
metaRemoveSmaFromDb(pMeta, indexUid2); metaRemoveSmaFromDb(pMeta, indexUid2);
tdDestroyTSma(&tSma); tDestroyTSma(&tSma);
metaClose(pMeta); metaClose(pMeta);
} }
#endif #endif
@ -576,7 +576,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
taosArrayDestroy(pDataBlocks); taosArrayDestroy(pDataBlocks);
// release meta // release meta
tdDestroyTSma(&tSma); tDestroyTSma(&tSma);
tfsClose(pTsdb->pTfs); tfsClose(pTsdb->pTfs);
tsdbClose(pTsdb); tsdbClose(pTsdb);
metaClose(pMeta); metaClose(pMeta);