diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c new file mode 100644 index 0000000000..bca5b1543e --- /dev/null +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" +#include "tsdb.h" + +#define SMA_STORAGE_MINUTES_MAX 86400 +#define SMA_STORAGE_MINUTES_DAY 1440 +#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file + +/** + * @brief Judge the tsma file split days + * + * @param pCfg + * @param pCont + * @param contLen + * @param days unit is minute + * @return int32_t + */ +int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { + SDecoder coder = {0}; + tDecoderInit(&coder, pCont, contLen); + + STSma tsma = {0}; + if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) { + terrno = TSDB_CODE_MSG_DECODE_ERROR; + goto _err; + } + STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; + int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND); + if (sInterval <= 0) { + *days = pTsdbCfg->days; + return 0; + } + int64_t records = pTsdbCfg->days * 60 / sInterval; + if (records >= SMA_STORAGE_SPLIT_FACTOR) { + *days = pTsdbCfg->days; + } else { + int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE); + int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; + + if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { + *days = SMA_STORAGE_MINUTES_MAX; + } else { + *days = (int32_t)daysPerFile; + } + + if (*days < pTsdbCfg->days) { + *days = pTsdbCfg->days; + } + } + tDecoderClear(&coder); + return 0; +_err: + tDecoderClear(&coder); + return -1; +} + +/** + * @brief create tsma meta and result stable + * + * @param pSma + * @param version + * @param pMsg + * @return int32_t + */ +int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { + SSmaCfg *pCfg = (SSmaCfg *)pMsg; + + if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { + // create tsma meta in dstVgId + if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { + return -1; + } + + // create stable to save tsma result in dstVgId + SVCreateStbReq pReq = {0}; + pReq.name = pCfg->dstTbName; + pReq.suid = pCfg->dstTbUid; + pReq.schemaRow = pCfg->schemaRow; + pReq.schemaTag = pCfg->schemaTag; + + if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { + return -1; + } + } + + return 0; +} + +/** + * @brief Insert/Update Time-range-wise SMA data. + * + * @param pSma + * @param msg + * @return int32_t + */ +int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { + const SArray *pDataBlocks = (const SArray *)msg; + // TODO: destroy SSDataBlocks(msg) + if (!pDataBlocks) { + terrno = TSDB_CODE_TSMA_INVALID_PTR; + smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); + return terrno; + } + + if (taosArrayGetSize(pDataBlocks) <= 0) { + terrno = TSDB_CODE_TSMA_INVALID_PARA; + smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma)); + return TSDB_CODE_FAILED; + } + + if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) { + terrno = TSDB_CODE_TSMA_INIT_FAILED; + return TSDB_CODE_FAILED; + } + + SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); + SSmaStat *pStat = NULL; + SSmaStatItem *pItem = NULL; + + if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { + terrno = TSDB_CODE_TSMA_INVALID_STAT; + return TSDB_CODE_FAILED; + } + + tdRefSmaStat(pSma, pStat); + pItem = &pStat->tsmaStatItem; + + ASSERT(pItem); + + if (!pItem->pTSma) { + // cache smaMeta + STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); + if (!pTSma) { + terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META; + smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + pItem->pTSma = pTSma; + } + + STSma *pTSma = pItem->pTSma; + + ASSERT(pTSma->indexUid == indexUid); + + SMetaReader mr = {0}; + + const char *dbName = "testDb"; + if (metaGetTableEntryByName(&mr, dbName) != 0) { + smaDebug("vgId:%d, tsma no table testTb exists for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); + SVCreateStbReq pReq = {0}; + pReq.name = dbName; + pReq.suid = pTSma->dstTbUid; + pReq.schemaRow = pCfg->schemaRow; + pReq.schemaTag = pCfg->schemaTag; + } + + SSubmitReq *pSubmitReq = NULL; + buildSubmitReqFromDataBlock(&pSubmitReq, (const SArray *)msg, NULL, pItem->pTSma->dstVgId, + pItem->pTSma->dstTbUid); + + tdUnRefSmaStat(pSma, pStat); + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 90093f2510..fe89321ae9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -110,6 +110,8 @@ int32_t tsdbBegin(STsdb *pTsdb) { } int32_t tsdbCommit(STsdb *pTsdb) { + if (!pTsdb) return 0; + int32_t code = 0; SCommitH commith = {0}; SDFileSet *pSet = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index f1941a3bad..055b6c62de 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -20,6 +20,7 @@ extern const char *TSDB_LEVEL_DNAME[]; typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; static const char *tsdbTxnFname[] = {"current.t", "current"}; #define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3) +#define TSDB_MAX_INIT_FSETS (365000) static int tsdbComparFidFSet(const void *arg1, const void *arg2); static void tsdbResetFSStatus(SFSStatus *pStatus); @@ -210,6 +211,10 @@ STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) { return NULL; } + if (maxFSet > TSDB_MAX_INIT_FSETS) { + maxFSet = TSDB_MAX_INIT_FSETS; + } + pfs->cstatus = tsdbNewFSStatus(maxFSet); if (pfs->cstatus == NULL) { tsdbFreeFS(pfs);