diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5a2dd502c1..d0926948dc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -227,8 +227,16 @@ typedef struct { } SSubmitBlkIter; typedef struct { - int32_t totalLen; - int32_t len; + int32_t totalLen; + int32_t len; + // head of SSubmitBlk + // int64_t uid; // table unique id + // int64_t suid; // stable id + // int32_t sversion; // data schema version + // int32_t dataLen; // data part length, not including the SSubmitBlk head + // int32_t schemaLen; // schema length, if length is 0, no schema exists + // int16_t numOfRows; // total number of rows in current submit block + // head of SSubmitBlk const void* pMsg; } SSubmitMsgIter; @@ -237,6 +245,15 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter); STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter); +// TODO: KEEP one suite of iterator API finally. +// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts +// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later +// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter +// int32_t tInitSubmitMsgIterEx(const SSubmitReq* pMsg, SSubmitMsgIter* pIter); +// int32_t tGetSubmitMsgNextEx(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock); +// int32_t tInitSubmitBlkIterEx(SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock, SSubmitBlkIter* pIter); +// STSRow* tGetSubmitBlkNextEx(SSubmitBlkIter* pIter); + typedef struct { int32_t index; // index of failed block in submit blocks int32_t vnode; // vnode index of failed block @@ -1496,8 +1513,8 @@ typedef struct { int32_t qmsg1Len; int32_t qmsg2Len; func_id_t* pFuncIds; - char* qmsg1; // not null: pAst1:qmsg1:SRetention1 => trigger aggr task1 - char* qmsg2; // not null: pAst2:qmsg2:SRetention2 => trigger aggr task2 + char* qmsg1; // pAst1:qmsg1:SRetention1 => trigger aggr task1 + char* qmsg2; // pAst2:qmsg2:SRetention2 => trigger aggr task2 int8_t nFuncIds; } SRSmaParam; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index efd4cbfcec..05fd9e301b 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -93,7 +93,87 @@ STSRow *tGetSubmitBlkNext(SSubmitBlkIter *pIter) { return row; } } +#if 0 +// TODO: KEEP one suite of iterator API finally. +// 1) use tInitSubmitMsgIterEx firstly as not decrease the merge conflicts +// 2) replace tInitSubmitMsgIterEx with tInitSubmitMsgIter later +// 3) finally, rename tInitSubmitMsgIterEx to tInitSubmitMsgIter +int32_t tInitSubmitMsgIterEx(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { + if (pMsg == NULL) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + pIter->totalLen = htonl(pMsg->length); + ASSERT(pIter->totalLen > 0); + pIter->len = 0; + pIter->pMsg = pMsg; + if (pIter->totalLen <= sizeof(SSubmitReq)) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + return -1; + } + + return 0; +} + +int32_t tGetSubmitMsgNextEx(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) { + ASSERT(pIter->len >= 0); + + if (pIter->len == 0) { + pIter->len += sizeof(SSubmitReq); + } else { + if (pIter->len >= pIter->totalLen) { + ASSERT(0); + } + + SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->len += (sizeof(SSubmitBlk) + pIter->dataLen + pIter->schemaLen); + ASSERT(pIter->len > 0); + } + + if (pIter->len > pIter->totalLen) { + terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; + *pPBlock = NULL; + return -1; + } + + if (pIter->len == pIter->totalLen) { + *pPBlock = NULL; + } else { + *pPBlock = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len); + pIter->uid = htobe64((*pPBlock)->uid); + pIter->suid = htobe64((*pPBlock)->suid); + pIter->sversion = htonl((*pPBlock)->sversion); + pIter->dataLen = htonl((*pPBlock)->dataLen); + pIter->schemaLen = htonl((*pPBlock)->schemaLen); + pIter->numOfRows = htons((*pPBlock)->numOfRows); + } + return 0; +} + +int32_t tInitSubmitBlkIterEx(SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkIter *pIter) { + if (pMsgIter->dataLen <= 0) return -1; + pIter->totalLen = pMsgIter->dataLen; + pIter->len = 0; + pIter->row = (STSRow *)(pBlock->data + pMsgIter->schemaLen); + return 0; +} + +STSRow *tGetSubmitBlkNextEx(SSubmitBlkIter *pIter) { + STSRow *row = pIter->row; + + if (pIter->len >= pIter->totalLen) { + return NULL; + } else { + pIter->len += TD_ROW_LEN(row); + if (pIter->len < pIter->totalLen) { + pIter->row = POINTER_SHIFT(row, TD_ROW_LEN(row)); + } + return row; + } +} +#endif int32_t tEncodeSEpSet(SCoder *pEncoder, const SEpSet *pEp) { if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index f304e3153d..1e63dd9833 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -453,7 +453,6 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt } } if (pStb->ast2Len > 0) { - int32_t qmsgLen2 = 0; if (mndConvertRSmaTask(pStb->pAst2, 0, 0, &pRSmaParam->qmsg2, &pRSmaParam->qmsg2Len) != TSDB_CODE_SUCCESS) { taosMemoryFreeClear(pRSmaParam->pFuncIds); taosMemoryFreeClear(pRSmaParam->qmsg1); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index fb875a46e0..7647929246 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -51,7 +51,7 @@ static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64() #define META_CHILD_TABLE TD_CHILD_TABLE #define META_NORMAL_TABLE TD_NORMAL_TABLE -int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); +int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); int metaDropTable(SMeta* pMeta, tb_uid_t uid); int metaCommit(SMeta* pMeta); int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); @@ -74,7 +74,7 @@ tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); -int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); +int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index a84776abfd..dddc170813 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -57,7 +57,6 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); - typedef enum { TSDB_FILE_HEAD = 0, // .head TSDB_FILE_DATA, // .data diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h new file mode 100644 index 0000000000..e20d2989b9 --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_TSDB_SMA_H_ +#define _TD_VNODE_TSDB_SMA_H_ + +#include "os.h" +#include "thash.h" +#include "tmsg.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef int32_t (*__tb_ddl_fn_t)(void *ahandle, void **result, void *p1, void *p2); + +struct STbDdlH { + void *ahandle; + void *result; + __tb_ddl_fn_t fp; +}; + +typedef struct { + tb_uid_t suid; + SArray *tbUids; + SHashObj *uidHash; +} STbUidStore; + +static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { + ASSERT(*pStore == NULL); + *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); + if (*pStore == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); +void tsdbUidStoreDestory(STbUidStore *pStore); +void *tsdbUidStoreFree(STbUidStore *pStore); + +int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq); +int32_t tsdbFetchTbUidList(void *pTsdb, void **result, void *suid, void *uid); +int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pUidStore); +int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_TSDB_SMA_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0027424829..0f354401bf 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -103,6 +103,8 @@ struct SVnode { #define TD_VID(PVNODE) (PVNODE)->config.vgId +typedef struct STbDdlH STbDdlH; + // sma void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data); @@ -116,6 +118,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data); #include "vnodeSync.h" +#include "tsdbSma.h" + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index 8389286596..d61d3346a2 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -250,7 +250,7 @@ void metaCloseDB(SMeta *pMeta) { } } -int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { +int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { tb_uid_t uid; SMetaDB *pMetaDb; void *pKey; @@ -349,6 +349,12 @@ int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { if (ret < 0) { return -1; } + // child table handle for rsma + if (pHandle && pHandle->fp) { + if (((*pHandle->fp)(pHandle->ahandle, &pHandle->result, &ctbIdxKey.suid, &uid)) < 0) { + return -1; + }; + } } else if (pTbCfg->type == META_NORMAL_TABLE) { pKey = &uid; kLen = sizeof(uid); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 7f06ba8855..7f8ec3a656 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -15,7 +15,7 @@ #include "vnodeInt.h" -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg, STbDdlH *pHandle) { // Validate the tbOptions // if (metaValidateTbCfg(pMeta, pTbCfg) < 0) { // // TODO: handle error @@ -24,7 +24,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg) { // TODO: add atomicity - if (metaSaveTableToDB(pMeta, pTbCfg) < 0) { + if (metaSaveTableToDB(pMeta, pTbCfg, pHandle) < 0) { // TODO: handle error return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index 2753579e9e..8691920c8f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -81,8 +81,8 @@ static STsdb *tsdbNew(const char *path, SVnode *pVnode, const STsdbCfg *pTsdbCfg static void tsdbFree(STsdb *pTsdb) { if (pTsdb) { - // tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb)); - // tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb)); + tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb)); + tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb)); tsdbFreeFS(pTsdb->fs); taosMemoryFreeClear(pTsdb->path); taosMemoryFree(pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 098b86975b..483475601b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3636,6 +3636,8 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; goto _error; + } else { + tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); } if (pTbCfg->type != META_SUPER_TABLE) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index bc9fe46c0f..9143e8ed12 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -33,6 +33,8 @@ static const char *TSDB_SMA_DNAME[] = { #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test #define SMA_TEST_INDEX_UID 2000000001 // TODO: just for test + +typedef struct SRSmaInfo SRSmaInfo; typedef enum { SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f200.tsma SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2f1906.tsma @@ -46,6 +48,7 @@ typedef struct SPoolMem { struct SSmaEnv { TdThreadRwlock lock; + int8_t type; TXN txn; SPoolMem *pPool; SDiskID did; @@ -55,6 +58,7 @@ struct SSmaEnv { }; #define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_TYPE(env) ((env)->type) #define SMA_ENV_DID(env) ((env)->did) #define SMA_ENV_ENV(env) ((env)->dbEnv) #define SMA_ENV_PATH(env) ((env)->path) @@ -91,16 +95,45 @@ typedef struct { * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, * without information about its previous state. * - TSDB_SMA_STAT_DROPPED: 1)sma dropped + * N.B. only applicable to tsma */ int8_t state; // ETsdbSmaStat SHashObj *expiredWindows; // key: skey of time window, value: N/A STSma *pSma; // cache schema } SSmaStatItem; +#define RSMA_MAX_LEVEL 2 +#define RSMA_TASK_INFO_HASH_SLOT 8 +struct SRSmaInfo { + void *taskInfo[RSMA_MAX_LEVEL]; // qTaskInfo_t +}; + struct SSmaStat { - SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem + union { + SHashObj *smaStatItems; // key: indexUid, value: SSmaStatItem for tsma + SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; + }; T_REF_DECLARE() }; +#define SMA_STAT_ITEMS(s) ((s)->smaStatItems) +#define SMA_STAT_INFO_HASH(s) ((s)->rsmaInfoHash) + +static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) { + // Note: free/kill may in RC + qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); + if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { + qDestroyTask(otaskHandle); + } +} + +static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) { + for (int32_t i = 0; i < RSMA_MAX_LEVEL; ++i) { + if (pInfo->taskInfo[i]) { + tsdbFreeTaskHandle(pInfo->taskInfo[i]); + } + } + return NULL; +} // declaration of static functions @@ -108,11 +141,11 @@ struct SSmaStat { static int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version); -static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType); static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); -static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); -static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did); -static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv); +static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did); +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv); static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t indexUid, TSKEY skey); static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat); @@ -139,6 +172,7 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); +static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -229,7 +263,7 @@ static void *poolMalloc(void *arg, size_t size) { SPoolMem *pMem; pMem = (SPoolMem *)tdbOsMalloc(sizeof(*pMem) + size); - if (pMem == NULL) { + if (!pMem) { assert(0); } @@ -317,15 +351,17 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TSDB_SMA_DNAME[smaType]); } -static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) { +static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did) { SSmaEnv *pEnv = NULL; pEnv = (SSmaEnv *)taosMemoryCalloc(1, sizeof(SSmaEnv)); - if (pEnv == NULL) { + if (!pEnv) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + SMA_ENV_TYPE(pEnv) = smaType; + int code = taosThreadRwlockInit(&(pEnv->lock), NULL); if (code) { terrno = TAOS_SYSTEM_ERROR(code); @@ -334,15 +370,15 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) } ASSERT(path && (strlen(path) > 0)); - pEnv->path = strdup(path); - if (pEnv->path == NULL) { + SMA_ENV_PATH(pEnv) = strdup(path); + if (!SMA_ENV_PATH(pEnv)) { tsdbFreeSmaEnv(pEnv); return NULL; } - pEnv->did = did; + SMA_ENV_DID(pEnv) = did; - if (tsdbInitSmaStat(&pEnv->pStat) != TSDB_CODE_SUCCESS) { + if (tsdbInitSmaStat(&SMA_ENV_STAT(pEnv), smaType) != TSDB_CODE_SUCCESS) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -354,7 +390,7 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) return NULL; } - if ((pEnv->pPool = openPool()) == NULL) { + if (!(pEnv->pPool = openPool())) { tsdbFreeSmaEnv(pEnv); return NULL; } @@ -362,14 +398,14 @@ static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did) return pEnv; } -static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv) { +static int32_t tsdbInitSmaEnv(STsdb *pTsdb, int8_t smaType, const char *path, SDiskID did, SSmaEnv **pEnv) { if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } - if (*pEnv == NULL) { - if ((*pEnv = tsdbNewSmaEnv(pTsdb, path, did)) == NULL) { + if (!(*pEnv)) { + if (!(*pEnv = tsdbNewSmaEnv(pTsdb, smaType, path, did))) { return TSDB_CODE_FAILED; } } @@ -385,7 +421,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaE */ void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv) { if (pSmaEnv) { - tsdbDestroySmaState(pSmaEnv->pStat); + tsdbDestroySmaState(pSmaEnv->pStat, SMA_ENV_TYPE(pSmaEnv)); taosMemoryFreeClear(pSmaEnv->pStat); taosMemoryFreeClear(pSmaEnv->path); taosThreadRwlockDestroy(&(pSmaEnv->lock)); @@ -401,7 +437,7 @@ void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv) { } static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { - if (pStat == NULL) return 0; + if (!pStat) return 0; int ref = T_REF_INC(pStat); tsdbDebug("vgId:%d ref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); @@ -409,17 +445,17 @@ static int32_t tsdbRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { } static int32_t tsdbUnRefSmaStat(STsdb *pTsdb, SSmaStat *pStat) { - if (pStat == NULL) return 0; + if (!pStat) return 0; int ref = T_REF_DEC(pStat); tsdbDebug("vgId:%d unref sma stat:%p, val:%d", REPO_ID(pTsdb), pStat, ref); return 0; } -static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { +static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat, int8_t smaType) { ASSERT(pSmaStat != NULL); - if (*pSmaStat != NULL) { // no lock + if (*pSmaStat) { // no lock return TSDB_CODE_SUCCESS; } @@ -428,19 +464,31 @@ static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { * 2. Currently, there is mutex lock when init SSmaEnv, thus no need add lock on SSmaStat, and please add lock if * tsdbInitSmaStat invoked in other multithread environment later. */ - if (*pSmaStat == NULL) { + if (!(*pSmaStat)) { *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat)); - if (*pSmaStat == NULL) { + if (!(*pSmaStat)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_FAILED; } - (*pSmaStat)->smaStatItems = - taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (smaType == TSDB_SMA_TYPE_ROLLUP) { + SMA_STAT_INFO_HASH(*pSmaStat) = taosHashInit( + RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if ((*pSmaStat)->smaStatItems == NULL) { - taosMemoryFreeClear(*pSmaStat); - return TSDB_CODE_FAILED; + if (!SMA_STAT_INFO_HASH(*pSmaStat)) { + taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + SMA_STAT_ITEMS(*pSmaStat) = + taosHashInit(SMA_STATE_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + + if (!SMA_STAT_ITEMS(*pSmaStat)) { + taosMemoryFreeClear(*pSmaStat); + return TSDB_CODE_FAILED; + } + } else { + ASSERT(0); } } return TSDB_CODE_SUCCESS; @@ -462,7 +510,7 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { } static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { - if (pSmaStatItem != NULL) { + if (pSmaStatItem) { tdDestroyTSma(pSmaStatItem->pSma); taosMemoryFreeClear(pSmaStatItem->pSma); taosHashCleanup(pSmaStatItem->expiredWindows); @@ -477,16 +525,28 @@ static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { * @param pSmaStat * @return int32_t */ -int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { +int32_t tsdbDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { if (pSmaStat) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. - void *item = taosHashIterate(pSmaStat->smaStatItems, NULL); - while (item != NULL) { - SSmaStatItem *pItem = *(SSmaStatItem **)item; - tsdbFreeSmaStatItem(pItem); - item = taosHashIterate(pSmaStat->smaStatItems, item); + if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { + void *item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), NULL); + while (item) { + SSmaStatItem *pItem = *(SSmaStatItem **)item; + tsdbFreeSmaStatItem(pItem); + item = taosHashIterate(SMA_STAT_ITEMS(pSmaStat), item); + } + taosHashCleanup(SMA_STAT_ITEMS(pSmaStat)); + } else if (smaType == TSDB_SMA_TYPE_ROLLUP) { + void *infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), NULL); + while (infoHash) { + SRSmaInfo *pInfoHash = *(SRSmaInfo **)infoHash; + tsdbFreeRSmaInfo(pInfoHash); + infoHash = taosHashIterate(SMA_STAT_INFO_HASH(pSmaStat), infoHash); + } + taosHashCleanup(SMA_STAT_INFO_HASH(pSmaStat)); + } else { + ASSERT(0); } - taosHashCleanup(pSmaStat->smaStatItems); } return TSDB_CODE_SUCCESS; } @@ -497,12 +557,12 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { // return if already init switch (smaType) { case TSDB_SMA_TYPE_TIME_RANGE: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb))) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)))) { return TSDB_CODE_SUCCESS; } break; case TSDB_SMA_TYPE_ROLLUP: - if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb))) != NULL) { + if ((pEnv = (SSmaEnv *)atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)))) { return TSDB_CODE_SUCCESS; } break; @@ -515,7 +575,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { tsdbLockRepo(pTsdb); pEnv = (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_load_ptr(&REPO_TSMA_ENV(pTsdb)) : atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); - if (pEnv == NULL) { + if (!pEnv) { char rname[TSDB_FILENAME_LEN] = {0}; SDiskID did = {0}; @@ -531,7 +591,7 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { return TSDB_CODE_FAILED; } - if (tsdbInitSmaEnv(pTsdb, rname, did, &pEnv) != TSDB_CODE_SUCCESS) { + if (tsdbInitSmaEnv(pTsdb, smaType, rname, did, &pEnv) != TSDB_CODE_SUCCESS) { tsdbUnlockRepo(pTsdb); return TSDB_CODE_FAILED; } @@ -547,10 +607,10 @@ static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) { static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t indexUid, int64_t winSKey, int64_t version) { SSmaStatItem *pItem = taosHashGet(pItemsHash, &indexUid, sizeof(indexUid)); - if (pItem == NULL) { + if (!pItem) { // TODO: use TSDB_SMA_STAT_EXPIRED and update by stream computing later pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_OK); // TODO use the real state - if (pItem == NULL) { + if (!pItem) { // Response to stream computing: OOM // For query, if the indexUid not found, the TSDB should tell query module to query raw TS data. return TSDB_CODE_FAILED; @@ -558,7 +618,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t // cache smaMeta STSma *pSma = metaGetSmaInfoByIndex(REPO_META(pTsdb), indexUid, true); - if (pSma == NULL) { + if (!pSma) { terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META; taosHashCleanup(pItem->expiredWindows); taosMemoryFree(pItem); @@ -574,7 +634,7 @@ static int32_t tsdbSetExpiredWindow(STsdb *pTsdb, SHashObj *pItemsHash, int64_t taosMemoryFree(pItem); return TSDB_CODE_FAILED; } - } else if ((pItem = *(SSmaStatItem **)pItem) == NULL) { + } else if (!(pItem = *(SSmaStatItem **)pItem)) { terrno = TSDB_CODE_INVALID_PTR; return TSDB_CODE_FAILED; } @@ -634,7 +694,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers SSmaStat *pStat = SMA_ENV_STAT(pEnv); SHashObj *pItemsHash = SMA_ENV_STAT_ITEMS(pEnv); - TASSERT(pEnv != NULL && pStat != NULL && pItemsHash != NULL); + TASSERT(pEnv && pStat && pItemsHash); // basic procedure // TODO: optimization @@ -651,7 +711,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers while (true) { tGetSubmitMsgNext(&msgIter, &pBlock); - if (pBlock == NULL) break; + if (!pBlock) break; STSmaWrapper *pSW = NULL; STSma *pTSma = NULL; @@ -664,7 +724,7 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers while (true) { STSRow *row = tGetSubmitBlkNext(&blkIter); - if (row == NULL) { + if (!row) { tdFreeTSmaWrapper(pSW); break; } @@ -672,10 +732,10 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers if (pSW) { pSW = tdFreeTSmaWrapper(pSW); } - if ((pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid)) == NULL) { + if (!(pSW = metaGetSmaInfoByTable(REPO_META(pTsdb), pBlock->suid))) { break; } - if ((pSW->number) <= 0 || (pSW->tSma == NULL)) { + if ((pSW->number) <= 0 || !pSW->tSma) { pSW = tdFreeTSmaWrapper(pSW); break; } @@ -721,10 +781,10 @@ static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, SSmaStat *pStat, int64_t ind tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem != NULL) && ((pItem = *(SSmaStatItem **)pItem) != NULL)) { + if ((pItem) && ((pItem = *(SSmaStatItem **)pItem))) { // pItem resides in hash buffer all the time unless drop sma index // TODO: multithread protect if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { @@ -934,7 +994,7 @@ static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; - ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); + ASSERT(!pSmaH->dFile.path && !pSmaH->dFile.pDB); pSmaH->dFile.fid = fid; char tSmaFile[TSDB_FILENAME_LEN] = {0}; @@ -1004,6 +1064,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char STsdbCfg *pCfg = REPO_CFG(pTsdb); const SArray *pDataBlocks = (const SArray *)msg; + // TODO: destroy SSDataBlocks(msg) + // For super table aggregation, the sma data is stored in vgroup calculated from the hash value of stable name. Thus // the sma data would arrive ahead of the update-expired-window msg. if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { @@ -1011,7 +1073,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char return TSDB_CODE_FAILED; } - if (pDataBlocks == NULL) { + if (!pDataBlocks) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert tSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; @@ -1029,11 +1091,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) { terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; @@ -1061,9 +1123,8 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit); int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); - // key: skey + groupId - char smaKey[SMA_KEY_LEN] = {0}; - char dataBuf[512] = {0}; + char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId + char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer? void *pDataBuf = NULL; int32_t sz = taosArrayGetSize(pDataBlocks); for (int32_t i = 0; i < sz; ++i) { @@ -1228,7 +1289,7 @@ static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid); SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); - if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) { + if ((pItem) || ((pItem = *(SSmaStatItem **)pItem))) { if (tsdbSmaStatIsDropped(pItem)) { tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid); return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg would be intercepted by mnode @@ -1284,19 +1345,13 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { SSmaEnv *pEnv = atomic_load_ptr(&REPO_RSMA_ENV(pTsdb)); int64_t indexUid = SMA_TEST_INDEX_UID; - if (pEnv == NULL) { + if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); return terrno; } - if (pEnv == NULL) { - terrno = TSDB_CODE_INVALID_PTR; - tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); - return terrno; - } - - if (pDataBlocks == NULL) { + if (!pDataBlocks) { terrno = TSDB_CODE_INVALID_PTR; tsdbWarn("vgId:%d insert rSma data failed since pDataBlocks is NULL", REPO_ID(pTsdb)); return terrno; @@ -1313,11 +1368,11 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg) { tsdbRefSmaStat(pTsdb, pStat); - if (pStat && pStat->smaStatItems) { - pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + if (pStat && SMA_STAT_ITEMS(pStat)) { + pItem = taosHashGet(SMA_STAT_ITEMS(pStat), &indexUid, sizeof(indexUid)); } - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem) || tsdbSmaStatIsDropped(pItem)) { terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; tsdbUnRefSmaStat(pTsdb, pStat); return TSDB_CODE_FAILED; @@ -1438,7 +1493,7 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { ++pReadH->smaFsIter.iter; } - if (pReadH->pDFile != NULL) { + if (pReadH->pDFile) { tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); return true; } @@ -1471,7 +1526,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, tsdbRefSmaStat(pTsdb, pStat); SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); - if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL)) { + if (!pItem || !(pItem = *(SSmaStatItem **)pItem)) { // Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if // it's NULL. tsdbUnRefSmaStat(pTsdb, pStat); @@ -1484,7 +1539,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, int32_t nQueryWin = taosArrayGetSize(pQuerySKey); for (int32_t n = 0; n < nQueryWin; ++n) { TSKEY skey = taosArrayGet(pQuerySKey, n); - if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) { + if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY))) { // TODO: mark this window as expired. } } @@ -1500,7 +1555,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, return TSDB_CODE_FAILED; } - if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { + if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY))) { // TODO: mark this window as expired. tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), querySKey, indexUid); @@ -1534,7 +1589,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, void *result = NULL; int32_t valueSize = 0; - if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { + if (!(result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize))) { tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIx64 " since %s", REPO_ID(pTsdb), indexUid, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), tstrerror(terrno)); tsdbCloseDBF(&tReadH.dFile); @@ -1578,7 +1633,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { SSmaCfg vCreateSmaReq = {0}; - if (tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq) == NULL) { + if (!tDeserializeSVCreateTSmaReq(pMsg, &vCreateSmaReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; tsdbWarn("vgId:%d TDMT_VND_CREATE_SMA received but deserialize failed since %s", REPO_ID(pTsdb), terrstr(terrno)); return -1; @@ -1604,7 +1659,7 @@ int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg) { int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { SVDropTSmaReq vDropSmaReq = {0}; - if (tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq) == NULL) { + if (!tDeserializeSVDropTSmaReq(pMsg, &vDropSmaReq)) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1632,6 +1687,395 @@ int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg) { return TSDB_CODE_SUCCESS; } +/** + * @brief Check and init qTaskInfo_t, only applicable to stable with SRSmaParam. + * + * @param pTsdb + * @param pMeta + * @param pReq + * @return int32_t + */ +int32_t tsdbRegisterRSma(STsdb *pTsdb, SMeta *pMeta, SVCreateTbReq *pReq) { + SRSmaParam *param = pReq->stbCfg.pRSmaParam; + + if (!param) { + tsdbDebug("vgId:%d return directly since no rollup for stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, + pReq->stbCfg.suid); + return TSDB_CODE_SUCCESS; + } + + if ((param->qmsg1Len == 0) && (param->qmsg2Len == 0)) { + tsdbWarn("vgId:%d no qmsg1/qmsg2 for rollup stable %s %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); + return TSDB_CODE_SUCCESS; + } + + if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_ROLLUP) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INIT_FAILED; + return TSDB_CODE_FAILED; + } + + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = NULL; + + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t)); + if (pRSmaInfo) { + tsdbWarn("vgId:%d rsma info already exists for stb: %s, %" PRIi64, REPO_ID(pTsdb), pReq->name, pReq->stbCfg.suid); + return TSDB_CODE_SUCCESS; + } + + pRSmaInfo = (SRSmaInfo *)taosMemoryCalloc(1, sizeof(SRSmaInfo)); + if (!pRSmaInfo) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + STqReadHandle *pReadHandle = tqInitSubmitMsgScanner(pMeta); + if (!pReadHandle) { + taosMemoryFree(pRSmaInfo); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + + SReadHandle handle = { + .reader = pReadHandle, + .meta = pMeta, + }; + + if (param->qmsg1) { + pRSmaInfo->taskInfo[0] = qCreateStreamExecTaskInfo(param->qmsg1, &handle); + if (!pRSmaInfo->taskInfo[0]) { + taosMemoryFree(pRSmaInfo); + taosMemoryFree(pReadHandle); + return TSDB_CODE_FAILED; + } + } + + if (param->qmsg2) { + pRSmaInfo->taskInfo[1] = qCreateStreamExecTaskInfo(param->qmsg2, &handle); + if (!pRSmaInfo->taskInfo[1]) { + taosMemoryFree(pRSmaInfo); + taosMemoryFree(pReadHandle); + return TSDB_CODE_FAILED; + } + } + + if (taosHashPut(SMA_STAT_INFO_HASH(pStat), &pReq->stbCfg.suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) != + TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } else { + tsdbDebug("vgId:%d register rsma info succeed for suid:%" PRIi64, REPO_ID(pTsdb), pReq->stbCfg.suid); + } + + return TSDB_CODE_SUCCESS; +} + +/** + * @brief store suid/[uids], prefer to use array and then hash + * + * @param pStore + * @param suid + * @param uid + * @return int32_t + */ +int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid) { + // prefer to store suid/uids in array + if ((suid == pStore->suid) || (pStore->suid == 0)) { + if (pStore->suid == 0) { + pStore->suid = suid; + } + if (uid) { + if (!pStore->tbUids) { + if (!(pStore->tbUids = taosArrayInit(1, sizeof(tb_uid_t)))) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + } + if (!taosArrayPush(pStore->tbUids, &uid)) { + return TSDB_CODE_FAILED; + } + } + } else { + // store other suid/uids in hash when multiple stable/table included in 1 batch of request + if (!pStore->uidHash) { + pStore->uidHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); + if (!pStore->uidHash) { + return TSDB_CODE_FAILED; + } + } + if (uid) { + SArray *uidArray = taosHashGet(pStore->uidHash, &suid, sizeof(tb_uid_t)); + if (uidArray && ((uidArray = *(SArray **)uidArray))) { + taosArrayPush(uidArray, &uid); + } else { + SArray *pUidArray = taosArrayInit(1, sizeof(tb_uid_t)); + if (!pUidArray) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (!taosArrayPush(pUidArray, &uid)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), &pUidArray, sizeof(pUidArray)) != 0) { + return TSDB_CODE_FAILED; + } + } + } else { + if (taosHashPut(pStore->uidHash, &suid, sizeof(suid), NULL, 0) != 0) { + return TSDB_CODE_FAILED; + } + } + } + return TSDB_CODE_SUCCESS; +} + +void tsdbUidStoreDestory(STbUidStore *pStore) { + if (pStore) { + if (pStore->uidHash) { + if (pStore->tbUids) { + void *pIter = taosHashIterate(pStore->uidHash, NULL); + while (pIter) { + SArray *arr = *(SArray **)pIter; + taosArrayDestroy(arr); + pIter = taosHashIterate(pStore->uidHash, pIter); + } + } + taosHashCleanup(pStore->uidHash); + } + taosArrayDestroy(pStore->tbUids); + } +} + +void *tsdbUidStoreFree(STbUidStore *pStore) { + tsdbUidStoreDestory(pStore); + taosMemoryFree(pStore); + return NULL; +} + +/** + * @brief fetch suid/uids when create child tables of rollup SMA + * + * @param pTsdb + * @param ppStore + * @param suid + * @param uid + * @return int32_t + */ +int32_t tsdbFetchTbUidList(void *pTsdb, void **ppStore, void *suid, void *uid) { + SSmaEnv *pEnv = REPO_RSMA_ENV((STsdb *)pTsdb); + + // only applicable to rollup SMA ctables + if (!pEnv) { + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SHashObj *infoHash = NULL; + if (!pStat || !(infoHash = SMA_STAT_INFO_HASH(pStat))) { + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + return TSDB_CODE_FAILED; + } + + // info cached when create rsma stable and return directly for non-rsma ctables + if (!taosHashGet(infoHash, suid, sizeof(tb_uid_t))) { + return TSDB_CODE_SUCCESS; + } + + if (!(*ppStore)) { + if (tsdbUidStoreInit((STbUidStore **)ppStore) != 0) { + return TSDB_CODE_FAILED; + } + } + + if (tsdbUidStorePut(*ppStore, *(tb_uid_t *)suid, (tb_uid_t *)uid) != 0) { + *ppStore = tsdbUidStoreFree(*ppStore); + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids) { + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = NULL; + + if (!suid || !tbUids) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; + } + + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tsdbError("vgId:%d failed to get rsma info for uid:%" PRIi64, REPO_ID(pTsdb), *suid); + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + return TSDB_CODE_FAILED; + } + + if (pRSmaInfo->taskInfo[0] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[0], tbUids, true) != 0)) { + tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; + } else { + tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb), + pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + } + + if (pRSmaInfo->taskInfo[1] && (qUpdateQualifiedTableId(pRSmaInfo->taskInfo[1], tbUids, true) != 0)) { + tsdbError("vgId:%d update tbUidList failed for uid:%" PRIi64 " since %s", REPO_ID(pTsdb), *suid, terrstr(terrno)); + return TSDB_CODE_FAILED; + } else { + tsdbDebug("vgId:%d update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 ", uid:%" PRIi64, REPO_ID(pTsdb), + pRSmaInfo->taskInfo[1], *suid, *(int64_t *)taosArrayGet(tbUids, 0)); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbUpdateTbUidList(STsdb *pTsdb, STbUidStore *pStore) { + if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { + tsdbDebug("vgId:%d no need to update tbUids since empty uidStore", REPO_ID(pTsdb)); + tsdbUidStoreFree(pStore); + return TSDB_CODE_SUCCESS; + } + + if (tsdbUpdateTbUidListImpl(pTsdb, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { + tsdbUidStoreFree(pStore); + return TSDB_CODE_FAILED; + } + + void *pIter = taosHashIterate(pStore->uidHash, NULL); + while (pIter) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + SArray *pTbUids = *(SArray **)pIter; + + if (tsdbUpdateTbUidListImpl(pTsdb, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { + taosHashCancelIterate(pStore->uidHash, pIter); + tsdbUidStoreFree(pStore); + return TSDB_CODE_FAILED; + } + + pIter = taosHashIterate(pStore->uidHash, pIter); + } + + tsdbUidStoreFree(pStore); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { + ASSERT(pMsg != NULL); + SSubmitMsgIter msgIter = {0}; + SSubmitBlk *pBlock = NULL; + SSubmitBlkIter blkIter = {0}; + STSRow *row = NULL; + + terrno = TSDB_CODE_SUCCESS; + // pMsg->length = htonl(pMsg->length); + // pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); + + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + while (true) { + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + + if (!pBlock) break; + tsdbUidStorePut(pStore, pBlock->suid, NULL); + } + + if (terrno != TSDB_CODE_SUCCESS) return -1; + return 0; +} + +int32_t tsdbExecuteRSma(STsdb *pTsdb, SMeta *pMeta, const void *pMsg, int32_t inputType, tb_uid_t *suid) { + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + if (!pEnv) { + // only applicable when rsma env exists + return TSDB_CODE_SUCCESS; + } + + SSmaStat *pStat = SMA_ENV_STAT(pEnv); + SRSmaInfo *pRSmaInfo = NULL; + + pRSmaInfo = taosHashGet(SMA_STAT_INFO_HASH(pStat), suid, sizeof(tb_uid_t)); + + if (!pRSmaInfo || !(pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + tsdbDebug("vgId:%d no rsma info for suid:%" PRIu64, REPO_ID(pTsdb), *suid); + return TSDB_CODE_SUCCESS; + } + + SArray *pResult = NULL; + + pResult = taosArrayInit(0, sizeof(SSDataBlock)); + if (!pResult) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + if (pRSmaInfo->taskInfo[0]) { + qSetStreamInput(pRSmaInfo->taskInfo[0], pMsg, inputType); + while (1) { + SSDataBlock *output; + uint64_t ts; + if (qExecTask(pRSmaInfo->taskInfo[0], &output, &ts) < 0) { + ASSERT(false); + } + if (!output) { + break; + } + taosArrayPush(pResult, output); + } + blockDebugShowData(pResult); + } + + // if (pRSmaInfo->taskInfo[1]) { + // qSetStreamInput(pRSmaInfo->taskInfo[1], pMsg, inputType); + // while (1) { + // SSDataBlock *output; + // uint64_t ts; + // if (qExecTask(pRSmaInfo->taskInfo[1], &output, &ts) < 0) { + // ASSERT(false); + // } + // if (!output) { + // break; + // } + // taosArrayPush(pResult, output); + // } + // blockDebugShowData(pResult); + // } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbTriggerRSma(STsdb *pTsdb, SMeta *pMeta, void *pMsg, int32_t inputType) { + SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb); + if (!pEnv) { + // only applicable when rsma env exists + return TSDB_CODE_SUCCESS; + } + + if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { + STbUidStore uidStore = {0}; + tsdbFetchSubmitReqSuids(pMsg, &uidStore); + + if (uidStore.suid != 0) { + tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, &uidStore.suid); + + void *pIter = taosHashIterate(uidStore.uidHash, NULL); + while (pIter) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + tsdbExecuteRSma(pTsdb, pMeta, pMsg, inputType, pTbSuid); + pIter = taosHashIterate(uidStore.uidHash, pIter); + } + + tsdbUidStoreDestory(&uidStore); + } + } + return TSDB_CODE_SUCCESS; +} + #if 0 /** * @brief Get the start TS key of the last data block of one interval/sliding. @@ -1674,6 +2118,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg) { if ((code = tsdbInsertTSmaDataImpl(pTsdb, indexUid, msg)) < 0) { tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } + // TODO: destroy SSDataBlocks(msg) return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index e3c504c93d..66cc928c12 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -83,6 +83,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg case TDMT_VND_SUBMIT: pRsp->msgType = TDMT_VND_SUBMIT_RSP; vnodeProcessSubmitReq(pVnode, ptr, pRsp); + tsdbTriggerRSma(pVnode->pTsdb, pVnode->pMeta, ptr, STREAM_DATA_TYPE_SUBMIT_BLOCK); break; case TDMT_VND_MQ_VG_CHANGE: if (tqProcessVgChangeReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), @@ -101,7 +102,6 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg } } break; case TDMT_VND_CREATE_SMA: { // timeRangeSMA - if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO } @@ -277,19 +277,12 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) { SVCreateTbReq vCreateTbReq = {0}; tDeserializeSVCreateTbReq(pReq, &vCreateTbReq); - if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq)) < 0) { + if (metaCreateTable(pVnode->pMeta, &(vCreateTbReq), NULL) < 0) { // TODO return -1; } - // TODO: remove the debug log - SRSmaParam *param = vCreateTbReq.stbCfg.pRSmaParam; - if (param) { - printf("qmsg1 len = %d, body = %s\n", param->qmsg1 ? (int32_t)strlen(param->qmsg1) : 0, - param->qmsg1 ? param->qmsg1 : ""); - printf("qmsg1 len = %d, body = %s\n", param->qmsg2 ? (int32_t)strlen(param->qmsg2) : 0, - param->qmsg2 ? param->qmsg2 : ""); - } + tsdbRegisterRSma(pVnode->pTsdb, pVnode->pMeta, &vCreateTbReq); taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); @@ -309,6 +302,13 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR SVCreateTbBatchRsp vCreateTbBatchRsp = {0}; tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq); int reqNum = taosArrayGetSize(vCreateTbBatchReq.pArray); + + STbDdlH ddlHandle = { + .ahandle = pVnode->pTsdb, + .result = NULL, + .fp = tsdbFetchTbUidList, + }; + for (int i = 0; i < reqNum; i++) { SVCreateTbReq *pCreateTbReq = taosArrayGet(vCreateTbBatchReq.pArray, i); @@ -324,7 +324,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR taosArrayPush(vCreateTbBatchRsp.rspList, &rsp); } - if (metaCreateTable(pVnode->pMeta, pCreateTbReq) < 0) { + if (metaCreateTable(pVnode->pMeta, pCreateTbReq, &ddlHandle) < 0) { // TODO: handle error vError("vgId:%d, failed to create table: %s", TD_VID(pVnode), pCreateTbReq->name); } @@ -348,6 +348,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR } } + tsdbUpdateTbUidList(pVnode->pTsdb, ddlHandle.result); + vTrace("vgId:%d process create %" PRIzu " tables", TD_VID(pVnode), taosArrayGetSize(vCreateTbBatchReq.pArray)); taosArrayDestroy(vCreateTbBatchReq.pArray); if (vCreateTbBatchRsp.rspList) { diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index b0217a0462..ab617cb186 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -407,7 +407,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) { } } - EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); + // EXPECT_EQ(tdScanAndConvertSubmitMsg(pMsg), TSDB_CODE_SUCCESS); EXPECT_EQ(tsdbUpdateSmaWindow(pTsdb, pMsg, 0), 0);