diff --git a/source/common/src/trow.c b/source/common/src/trow.c index c8a28d7f28..fb4413478d 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -523,8 +523,24 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols SCellVal sVal = {0}; if (pRowCol->colId == pDataCol->colId) { if (tdGetTpRowValOfCol(&sVal, pRow, pBitmap, pRowCol->type, pRowCol->offset - sizeof(TSKEY), rcol - 1) < 0) { + ASSERT(0); return terrno; } + + if (pRowCol->colId == 2) { + ASSERT(sVal.valType == 0); + int32_t val = *(int32_t *)sVal.val; + ASSERT(val == 7); + } else if (pRowCol->colId == 3) { + ASSERT(sVal.valType == 0); + int64_t val = *(int64_t *)sVal.val; + ASSERT(val == 77777); + } else if (pRowCol->colId == 4) { + ASSERT(sVal.valType == 0); + int16_t val = *(int16_t *)sVal.val; + ASSERT(val == 777); + } + tdAppendValToDataCol(pDataCol, sVal.valType, sVal.val, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, isMerge); ++dcol; @@ -535,6 +551,7 @@ static int32_t tdAppendTpRowToDataCol(STSRow *pRow, STSchema *pSchema, SDataCols tdAppendValToDataCol(pDataCol, TD_VTYPE_NULL, NULL, pCols->numOfRows, pCols->maxPoints, pCols->bitmapMode, isMerge); ++dcol; + ASSERT(0); } } #if 0 diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c new file mode 100644 index 0000000000..bca5b1543e --- /dev/null +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "sma.h" +#include "tsdb.h" + +#define SMA_STORAGE_MINUTES_MAX 86400 +#define SMA_STORAGE_MINUTES_DAY 1440 +#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file + +/** + * @brief Judge the tsma file split days + * + * @param pCfg + * @param pCont + * @param contLen + * @param days unit is minute + * @return int32_t + */ +int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) { + SDecoder coder = {0}; + tDecoderInit(&coder, pCont, contLen); + + STSma tsma = {0}; + if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) { + terrno = TSDB_CODE_MSG_DECODE_ERROR; + goto _err; + } + STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg; + int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND); + if (sInterval <= 0) { + *days = pTsdbCfg->days; + return 0; + } + int64_t records = pTsdbCfg->days * 60 / sInterval; + if (records >= SMA_STORAGE_SPLIT_FACTOR) { + *days = pTsdbCfg->days; + } else { + int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE); + int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2; + + if (daysPerFile > SMA_STORAGE_MINUTES_MAX) { + *days = SMA_STORAGE_MINUTES_MAX; + } else { + *days = (int32_t)daysPerFile; + } + + if (*days < pTsdbCfg->days) { + *days = pTsdbCfg->days; + } + } + tDecoderClear(&coder); + return 0; +_err: + tDecoderClear(&coder); + return -1; +} + +/** + * @brief create tsma meta and result stable + * + * @param pSma + * @param version + * @param pMsg + * @return int32_t + */ +int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) { + SSmaCfg *pCfg = (SSmaCfg *)pMsg; + + if (TD_VID(pSma->pVnode) == pCfg->dstVgId) { + // create tsma meta in dstVgId + if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) { + return -1; + } + + // create stable to save tsma result in dstVgId + SVCreateStbReq pReq = {0}; + pReq.name = pCfg->dstTbName; + pReq.suid = pCfg->dstTbUid; + pReq.schemaRow = pCfg->schemaRow; + pReq.schemaTag = pCfg->schemaTag; + + if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) { + return -1; + } + } + + return 0; +} + +/** + * @brief Insert/Update Time-range-wise SMA data. + * + * @param pSma + * @param msg + * @return int32_t + */ +int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { + const SArray *pDataBlocks = (const SArray *)msg; + // TODO: destroy SSDataBlocks(msg) + if (!pDataBlocks) { + terrno = TSDB_CODE_TSMA_INVALID_PTR; + smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); + return terrno; + } + + if (taosArrayGetSize(pDataBlocks) <= 0) { + terrno = TSDB_CODE_TSMA_INVALID_PARA; + smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma)); + return TSDB_CODE_FAILED; + } + + if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) { + terrno = TSDB_CODE_TSMA_INIT_FAILED; + return TSDB_CODE_FAILED; + } + + SSmaEnv *pEnv = SMA_TSMA_ENV(pSma); + SSmaStat *pStat = NULL; + SSmaStatItem *pItem = NULL; + + if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) { + terrno = TSDB_CODE_TSMA_INVALID_STAT; + return TSDB_CODE_FAILED; + } + + tdRefSmaStat(pSma, pStat); + pItem = &pStat->tsmaStatItem; + + ASSERT(pItem); + + if (!pItem->pTSma) { + // cache smaMeta + STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid); + if (!pTSma) { + terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META; + smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + pItem->pTSma = pTSma; + } + + STSma *pTSma = pItem->pTSma; + + ASSERT(pTSma->indexUid == indexUid); + + SMetaReader mr = {0}; + + const char *dbName = "testDb"; + if (metaGetTableEntryByName(&mr, dbName) != 0) { + smaDebug("vgId:%d, tsma no table testTb exists for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno)); + SVCreateStbReq pReq = {0}; + pReq.name = dbName; + pReq.suid = pTSma->dstTbUid; + pReq.schemaRow = pCfg->schemaRow; + pReq.schemaTag = pCfg->schemaTag; + } + + SSubmitReq *pSubmitReq = NULL; + buildSubmitReqFromDataBlock(&pSubmitReq, (const SArray *)msg, NULL, pItem->pTSma->dstVgId, + pItem->pTSma->dstTbUid); + + tdUnRefSmaStat(pSma, pStat); + + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ab2efa4791..9bb0834547 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -15,6 +15,8 @@ #include "vnd.h" +int32_t gForceCommit = 1; + static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -182,7 +184,8 @@ int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } // commit if need - if (vnodeShouldCommit(pVnode)) { + if ((gForceCommit == 1) || vnodeShouldCommit(pVnode)) { + gForceCommit = 0; vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); // commit current change vnodeCommit(pVnode); @@ -719,6 +722,7 @@ static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const } static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { + gForceCommit = 1; SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0};