This commit is contained in:
Cary Xu 2022-04-21 23:55:20 +08:00
parent 1ec6fa12f6
commit 2d328c8930
3 changed files with 22 additions and 21 deletions

View File

@ -25,6 +25,7 @@ extern "C" {
#endif #endif
int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq);
int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, SSubmitReq *pMsg);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -109,12 +109,12 @@ struct SSmaStat {
T_REF_DECLARE() T_REF_DECLARE()
}; };
#define SSMA_STAT_ITEMS(s) ((s)->smaStatItems) #define SMA_STAT_ITEMS(s) ((s)->smaStatItems)
#define SSMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) #define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash)
#define RSMA_MAX_LEVEL 2 #define RSMA_MAX_LEVEL 2
#define RSMA_TASK_INFO_HASH_SLOT 8 #define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo { struct SRSmaInfo {
void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t
}; };
@ -471,18 +471,18 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) {
} }
if (smaType == TSDB_SMA_TYPE_ROLLUP) { if (smaType == TSDB_SMA_TYPE_ROLLUP) {
SSMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit(
RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
if (SSMA_STAT_INFO_HASH(*pSmaStat) == NULL) { if (SMA_STAT_INFO_HASH(*pSmaStat) == NULL) {
taosMemoryFreeClear(*pSmaStat); taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
} else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
SSMA_STAT_ITEMS(*pSmaStat) = SMA_STAT_ITEMS(*pSmaStat) =
taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
if (SSMA_STAT_ITEMS(*pSmaStat) == NULL) { if (SMA_STAT_ITEMS(*pSmaStat) == NULL) {
taosMemoryFreeClear(*pSmaStat); taosMemoryFreeClear(*pSmaStat);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -528,21 +528,21 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) {
if (pSmaStat) { if (pSmaStat) {
// TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready.
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
void *item = taosHashIterate(SSMA_STAT_ITEMS(pSmaStat), NULL); void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL);
while (item != NULL) { while (item != NULL) {
SSmaStatItem *pItem = *(SSmaStatItem **)item; SSmaStatItem *pItem = *(SSmaStatItem **)item;
tsdbFreeSmaStatItem(pItem); tsdbFreeSmaStatItem(pItem);
item = taosHashIterate(SSMA_STAT_ITEMS(pSmaStat), item); item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item);
} }
taosHashCleanup(SSMA_STAT_ITEMS(pSmaStat)); taosHashCleanup(SMA_STAT_ITEMS(pSmaStat));
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) { } else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
void *infoHash = taosHashIterate(SSMA_STAT_INFO_HASH(pSmaStat), NULL); void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL);
while (infoHash != NULL) { while (infoHash != NULL) {
SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash;
tsdbFreeRSmaInfo(pInfoHash); tsdbFreeRSmaInfo(pInfoHash);
infoHash = taosHashIterate(SSMA_STAT_INFO_HASH(pSmaStat), infoHash); infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash);
} }
taosHashCleanup(SSMA_STAT_INFO_HASH(pSmaStat)); taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat));
} else { } else {
ASSERT(0); ASSERT(0);
} }
@ -1687,7 +1687,7 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) {
} }
/** /**
* @brief Only applicable to stable. * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam.
* *
* @param pTsdb * @param pTsdb
* @param pMeta * @param pMeta
@ -1724,15 +1724,15 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo); pRSmaInfo = tsdbFreeRSmaInfo(pRSmaInfo);
} }
STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo));
if (pReadHandle == NULL) { if (pRSmaInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta);
if (pRSmaInfo == NULL) { if (pReadHandle == NULL) {
taosMemoryFree(pReadHandle); taosMemoryFree(pRSmaInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
@ -1760,7 +1760,8 @@ int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) {
} }
} }
if (taosHashPut(SSMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != 0) { if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) !=
0) {
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }

View File

@ -215,7 +215,6 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
return -1; return -1;
} }
// deploy Rollup SMA
tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq); tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq);
taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pSchema);