diff --git a/include/common/taosdef.h b/include/common/taosdef.h index f252143fa6..4d307def10 100644 --- a/include/common/taosdef.h +++ b/include/common/taosdef.h @@ -63,9 +63,11 @@ typedef enum { } ETsdbStatisStatus; typedef enum { - TSDB_SMA_STAT_OK = 0, // ready to provide service - TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired -} ETsdbSmaStat; + TSDB_SMA_STAT_UNKNOWN = -1, // unknown + TSDB_SMA_STAT_OK = 0, // ready to provide service + TSDB_SMA_STAT_EXPIRED = 1, // not ready or expired + TSDB_SMA_STAT_DROPPED = 2, // sma dropped +} ETsdbSmaStat; // bit operation typedef enum { TSDB_SMA_TYPE_BLOCK = 0, // Block-wise SMA diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 9e1e1c53dc..4391d58a64 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -336,8 +336,9 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615) #define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616) #define TSDB_CODE_TDB_TABLE_RECREATED TAOS_DEF_ERROR_CODE(0, 0x0617) -#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0618) -#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0619) +#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618) +#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0619) +#define TSDB_CODE_TDB_INVALID_SMA_STAT TAOS_DEF_ERROR_CODE(0, 0x0620) // query #define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 1ea00566f4..e78015d56f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -313,7 +313,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + if(pReq->rollup && pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); @@ -336,12 +336,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { tlen += taosEncodeFixedI32(buf, pReq->ntbCfg.pSchema[i].bytes); tlen += taosEncodeString(buf, pReq->ntbCfg.pSchema[i].name); } - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nBSmaCols); - for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { - tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); + tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.nBSmaCols); + for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) { + tlen += taosEncodeFixedI16(buf, pReq->ntbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { - SRSmaParam *param = pReq->stbCfg.pRSmaParam; + if(pReq->rollup && pReq->ntbCfg.pRSmaParam) { + SRSmaParam *param = pReq->ntbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->nFuncIds); @@ -424,18 +424,18 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeFixedI32(buf, &pReq->ntbCfg.pSchema[i].bytes); buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); } - buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); - if(pReq->stbCfg.nBSmaCols > 0) { - pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); - for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { - buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); + buf = taosDecodeFixedI16(buf, &(pReq->ntbCfg.nBSmaCols)); + if(pReq->ntbCfg.nBSmaCols > 0) { + pReq->ntbCfg.pBSmaCols = (col_id_t *)malloc(pReq->ntbCfg.nBSmaCols * sizeof(col_id_t)); + for (col_id_t i = 0; i < pReq->ntbCfg.nBSmaCols; ++i) { + buf = taosDecodeFixedI16(buf, pReq->ntbCfg.pBSmaCols + i); } } else { - pReq->stbCfg.pBSmaCols = NULL; + pReq->ntbCfg.pBSmaCols = NULL; } if(pReq->rollup) { - pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); - SRSmaParam *param = pReq->stbCfg.pRSmaParam; + pReq->ntbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); + SRSmaParam *param = pReq->ntbCfg.pRSmaParam; buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); @@ -448,7 +448,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } buf = taosDecodeFixedI64(buf, ¶m->delay); } else { - pReq->stbCfg.pRSmaParam = NULL; + pReq->ntbCfg.pRSmaParam = NULL; } break; default: diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index a48f437c97..149aac1206 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -51,7 +51,7 @@ int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); int metaDropTable(SMeta *pMeta, tb_uid_t uid); int metaCommit(SMeta *pMeta); int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); -int32_t metaDropTSma(SMeta *pMeta, char *indexName); +int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); // For Query STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 7b93f7580d..2b10c885b9 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -96,6 +96,7 @@ int tsdbCommit(STsdb *pTsdb); */ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); +int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); /** * @brief Insert RSma(Time-range-wise Rollup SMA) data. diff --git a/source/dnode/vnode/src/inc/metaDef.h b/source/dnode/vnode/src/inc/metaDef.h index 16a53baef0..bc1017f0c7 100644 --- a/source/dnode/vnode/src/inc/metaDef.h +++ b/source/dnode/vnode/src/inc/metaDef.h @@ -34,7 +34,7 @@ void metaCloseDB(SMeta* pMeta); int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); -int metaRemoveSmaFromDb(SMeta* pMeta, const char* indexName); +int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); // SMetaCache int metaOpenCache(SMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index f934b0263d..874f85994a 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -16,15 +16,15 @@ #ifndef _TD_TSDB_SMA_H_ #define _TD_TSDB_SMA_H_ -typedef struct SSmaStat SSmaStat; -typedef struct SSmaEnv SSmaEnv; +typedef struct SSmaStat SSmaStat; +typedef struct SSmaEnv SSmaEnv; struct SSmaEnv { TdThreadRwlock lock; - SDiskID did; - TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level? - char * path; // relative path - SSmaStat * pStat; + SDiskID did; + TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level? + char *path; // relative path + SSmaStat *pStat; }; #define SMA_ENV_LOCK(env) ((env)->lock) @@ -77,4 +77,6 @@ static FORCE_INLINE int tsdbUnLockSma(SSmaEnv *pEnv) { return 0; } + + #endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index 2dd8386d7a..e4bad9e94b 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -259,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { return 0; } -int metaRemoveSmaFromDb(SMeta *pMeta, const char *indexName) { +int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) { // TODO #if 0 DBT key = {0}; diff --git a/source/dnode/vnode/src/meta/metaIdx.c b/source/dnode/vnode/src/meta/metaIdx.c index 881ea4f46d..818da14738 100644 --- a/source/dnode/vnode/src/meta/metaIdx.c +++ b/source/dnode/vnode/src/meta/metaIdx.c @@ -121,11 +121,11 @@ int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg) { return TSDB_CODE_SUCCESS; } -int32_t metaDropTSma(SMeta *pMeta, char* indexName) { +int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid) { // TODO: Validate the cfg // TODO: add atomicity - if (metaRemoveSmaFromDb(pMeta, indexName) < 0) { + if (metaRemoveSmaFromDb(pMeta, indexUid) < 0) { // TODO: handle error return -1; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 0eb2d525b3..502ba60709 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -25,6 +25,7 @@ static const char *TSDB_SMA_DNAME[] = { #define SMA_STORAGE_TSDB_TIMES 10 #define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 +#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds #define SMA_STATE_HASH_SLOT 4 #define SMA_STATE_ITEM_HASH_SLOT 32 @@ -60,10 +61,11 @@ typedef struct { typedef struct { /** * @brief The field 'state' is here to demonstrate if one smaIndex is ready to provide service. - * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, - * without information about its previous state. * - TSDB_SMA_STAT_OK: 1) The sma calculation of history data is finished; 2) Or recevied information from * Streaming Module or TSDB local persistence. + * - TSDB_SMA_STAT_EXPIRED: 1) If sma calculation of history TS data is not finished; 2) Or if the TSDB is open, + * without information about its previous state. + * - TSDB_SMA_STAT_DROPPED: 1)sma dropped */ int8_t state; // ETsdbSmaStat SHashObj *expiredWindows; // key: skey of time window, value: N/A @@ -80,6 +82,7 @@ struct SSmaStat { // expired window static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, ETsdbSmaType smaType, char *msg); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); +static void * tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path, SDiskID did); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SDiskID did, SSmaEnv **pEnv); @@ -109,7 +112,55 @@ static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +// mgmt interface +static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); + // implementation +static FORCE_INLINE int8_t tsdbSmaStat(SSmaStatItem *pStatItem) { + if (pStatItem) { + return atomic_load_8(&pStatItem->state); + } + return TSDB_SMA_STAT_UNKNOWN; +} + +static FORCE_INLINE bool tsdbSmaStatIsOK(SSmaStatItem *pStatItem, int8_t *state) { + if(!pStatItem) { + return false; + } + + if (state) { + *state = atomic_load_8(&pStatItem->state); + return *state == TSDB_SMA_STAT_OK; + } + return atomic_load_8(&pStatItem->state) == TSDB_SMA_STAT_OK; +} + +static FORCE_INLINE bool tsdbSmaStatIsExpired(SSmaStatItem *pStatItem) { + return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_EXPIRED) : true; +} + +static FORCE_INLINE bool tsdbSmaStatIsDropped(SSmaStatItem *pStatItem) { + return pStatItem ? (atomic_load_8(&pStatItem->state) & TSDB_SMA_STAT_DROPPED) : true; +} + +static FORCE_INLINE void tsdbSmaStatSetOK(SSmaStatItem *pStatItem) { + if (pStatItem) { + atomic_store_8(&pStatItem->state, TSDB_SMA_STAT_OK); + } +} + +static FORCE_INLINE void tsdbSmaStatSetExpired(SSmaStatItem *pStatItem) { + if (pStatItem) { + atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_EXPIRED); + } +} + +static FORCE_INLINE void tsdbSmaStatSetDropped(SSmaStatItem *pStatItem) { + if (pStatItem) { + atomic_or_fetch_8(&pStatItem->state, TSDB_SMA_STAT_DROPPED); + } +} + static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode%svnode%d%stsdb%s%s", TD_DIRSEP, vgId, TD_DIRSEP, TD_DIRSEP, TSDB_SMA_DNAME[smaType]); @@ -252,6 +303,16 @@ static SSmaStatItem *tsdbNewSmaStatItem(int8_t state) { return pItem; } +static void *tsdbFreeSmaStatItem(SSmaStatItem *pSmaStatItem) { + if (pSmaStatItem != NULL) { + tdDestroyTSma(pSmaStatItem->pSma); + tfree(pSmaStatItem->pSma); + taosHashCleanup(pSmaStatItem->expiredWindows); + tfree(pSmaStatItem); + } + return NULL; +} + /** * @brief Release resources allocated for its member fields, not including itself. * @@ -264,12 +325,7 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { void *item = taosHashIterate(pSmaStat->smaStatItems, NULL); while (item != NULL) { SSmaStatItem *pItem = *(SSmaStatItem **)item; - if (pItem != NULL) { - tdDestroyTSma(pItem->pSma); - tfree(pItem->pSma); - taosHashCleanup(pItem->expiredWindows); - tfree(pItem); - } + tsdbFreeSmaStatItem(pItem); item = taosHashIterate(pSmaStat->smaStatItems, item); } taosHashCleanup(pSmaStat->smaStatItems); @@ -711,6 +767,148 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + int64_t indexUid = SMA_TEST_INDEX_UID; + + + if (pEnv == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb)); + return terrno; + } + + if (pData->dataLen <= 0) { + TASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return TSDB_CODE_FAILED; + } + + STSmaWriteH tSmaH = {0}; + + if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) { + return TSDB_CODE_FAILED; + } + + SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv); + SSmaStatItem *pItem = NULL; + + tsdbRefSmaStat(pTsdb, pStat); + + if (pStat && pStat->smaStatItems) { + pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid)); + } + if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) { + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + + char rPath[TSDB_FILENAME_LEN] = {0}; + char aPath[TSDB_FILENAME_LEN] = {0}; + snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid); + tfsAbsoluteName(REPO_TFS(pTsdb), SMA_ENV_DID(pEnv), rPath, aPath); + if (!taosCheckExistFile(aPath)) { + if (tfsMkdirRecurAt(REPO_TFS(pTsdb), rPath, SMA_ENV_DID(pEnv)) != TSDB_CODE_SUCCESS) { + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + } + + // Step 1: Judge the storage level and days + int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); + int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); + int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, pData, indexUid, fid); + if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { + tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), + tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + + if (tsdbInsertTSmaDataSection(&tSmaH, pData) != 0) { + tsdbWarn("vgId:%d insert tSma data section failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_FAILED; + } + // TODO:tsdbEndTSmaCommit(); + + // Step 3: reset the SSmaStat + tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey); + + tsdbDestroyTSmaWriteH(&tSmaH); + tsdbUnRefSmaStat(pTsdb, pStat); + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Drop tSma data and local cache + * - insert/query reference + * @param pTsdb + * @param msg + * @return int32_t + */ +static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid) { + SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv); + + // clear local cache + if (pEnv) { + tsdbDebug("vgId:%d drop tSma local cache for %" PRIi64, REPO_ID(pTsdb), indexUid); + + SSmaStatItem *pItem = taosHashGet(SMA_ENV_STAT_ITEMS(pEnv), &indexUid, sizeof(indexUid)); + if ((pItem != NULL) || ((pItem = *(SSmaStatItem **)pItem) != NULL)) { + if (tsdbSmaStatIsDropped(pItem)) { + tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid); + return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg should be intercept by mnode + } + + tsdbWLockSma(pEnv); + if (tsdbSmaStatIsDropped(pItem)) { + tsdbUnLockSma(pEnv); + tsdbDebug("vgId:%d tSma stat is already dropped for %" PRIi64, REPO_ID(pTsdb), indexUid); + return TSDB_CODE_TDB_INVALID_ACTION; // TODO: duplicate drop msg should be intercept by mnode + } + tsdbSmaStatSetDropped(pItem); + tsdbUnLockSma(pEnv); + + int32_t nSleep = 0; + while (true) { + if (T_REF_VAL_GET(SMA_ENV_STAT(pEnv)) <= 0) { + break; + } + taosSsleep(1); + if (++nSleep > SMA_DROP_EXPIRED_TIME) { + break; + }; + } + + tsdbFreeSmaStatItem(pItem); + tsdbDebug("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid); + } + } + // clear sma data files + // TODO: +} + +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { + STsdb *pTsdb = pSmaH->pTsdb; + + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); + pSmaH->dFile.path = strdup(tSmaFile); + + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; + SSmaEnv * pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv); if (pEnv == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -771,51 +969,6 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { return TSDB_CODE_SUCCESS; } -static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { - STsdb *pTsdb = pSmaH->pTsdb; - - char tSmaFile[TSDB_FILENAME_LEN] = {0}; - snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); - pSmaH->dFile.path = strdup(tSmaFile); - - return TSDB_CODE_SUCCESS; -} - -static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { - STsdbCfg * pCfg = REPO_CFG(pTsdb); - STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; - STSmaWriteH tSmaH = {0}; - - tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); - - if (pData->dataLen <= 0) { - TASSERT(0); - terrno = TSDB_CODE_INVALID_PARA; - return terrno; - } - - // Step 1: Judge the storage level - int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); - int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; - - // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file - // - Set and open the DFile or the B+Tree file - - int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); - - // Save all the TSma data to one file - // TODO: tsdbStartTSmaCommit(); - tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); - - tsdbInsertTSmaDataSection(&tSmaH, pData); - // TODO:tsdbEndTSmaCommit(); - - // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pRSmaEnv), pData->indexUid, pData->skey); - - return TSDB_CODE_SUCCESS; -} - /** * @brief * @@ -934,6 +1087,15 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_ #endif #if 1 + int8_t smaStat = 0; + if (!tsdbSmaStatIsOK(pItem, &smaStat)) { // TODO: multiple check for large scale sma query + tsdbUnRefSmaStat(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv)); + terrno = TSDB_CODE_TDB_INVALID_SMA_STAT; + tsdbDebug("vgId:%d getTSmaDataImpl failed from index %" PRIi64 " since %s %" PRIi8, REPO_ID(pTsdb), indexUid, + tstrerror(terrno), smaStat); + return TSDB_CODE_FAILED; + } + if (taosHashGet(pItem->expiredWindows, &querySKey, sizeof(TSKEY)) != NULL) { // TODO: mark this window as expired. tsdbDebug("vgId:%d skey %" PRIi64 " of window exists in expired window for index %" PRIi64, REPO_ID(pTsdb), @@ -1086,6 +1248,20 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { return code; } +/** + * @brief Get tSma data + * + * @param pTsdb + * @param pData + * @param indexUid + * @param interval + * @param intervalUnit + * @param tableUid + * @param colId + * @param querySKey + * @param nMaxResult + * @return int32_t + */ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySKey, int32_t nMaxResult) { int32_t code = TSDB_CODE_SUCCESS; @@ -1094,4 +1270,19 @@ int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; +} + +/** + * @brief Drop tSma Data and caches + * + * @param pTsdb + * @param msg + * @return int32_t + */ +int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbDropTSmaDataImpl(pTsdb, indexUid)) < 0) { + tsdbWarn("vgId:%d drop tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ade9adb1e1..f8234de813 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -199,15 +199,23 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return -1; } - if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexName) < 0) { - // TODO: handle error - return -1; - } // TODO: send msg to stream computing to drop tSma // if ((send msg to stream computing) < 0) { // tdDestroyTSma(&vCreateSmaReq); // return -1; // } + // + + if (metaDropTSma(pVnode->pMeta, vDropSmaReq.indexUid) < 0) { + // TODO: handle error + return -1; + } + + if(tsdbDropTSmaData(pVnode->pTsdb, vDropSmaReq.indexUid) < 0) { + // TODO: handle error + return -1; + } + // TODO: return directly or go on follow steps? } break; default: diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 3c51ad5d71..3fa5799f54 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -272,8 +272,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { taosArrayDestroy(pUids); // resource release - metaRemoveSmaFromDb(pMeta, smaIndexName1); - metaRemoveSmaFromDb(pMeta, smaIndexName2); + metaRemoveSmaFromDb(pMeta, indexUid1); + metaRemoveSmaFromDb(pMeta, indexUid2); tdDestroyTSma(&tSma); metaClose(pMeta); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b7ab007cf5..55fef59ae1 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -332,8 +332,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECREATED, "Table re-created") -TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta") TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta") +TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_SMA_STAT, "Invalid sma state") // query TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")