diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d1e02af287..65860d4959 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1866,10 +1866,59 @@ typedef struct { uint64_t tableUid; // super/common table uid int64_t interval; int64_t sliding; - col_id_t* colIds; // N.B. sorted column ids - uint16_t* funcIds; // N.B. sorted sma function ids + col_id_t* colIds; // sorted column ids + uint16_t* funcIds; // sorted sma function ids } STSma; // Time-range-wise SMA +typedef struct { + int8_t msgType; // 0 create, 1 recreate + STSma tSma; + STimeWindow window; +} SCreateTSmaMsg; + +typedef struct { + STimeWindow window; + char indexName[TSDB_INDEX_NAME_LEN + 1]; +} SDropTSmaMsg; + +typedef struct { + STimeWindow tsWindow; // [skey, ekey] + uint64_t tableUid; // sub/common table uid + int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId + int32_t dataLen; // total data length + col_id_t* colIds; // e.g. 2,4,9,10 + col_id_t numOfColIds; // e.g. 4 + char data[]; // the sma blocks +} STSmaData; + +// TODO: move to the final location afte schema of STSma/STSmaData defined +static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) { + if (pSmaData) { + if (pSmaData->colIds) { + tfree(pSmaData->colIds); + } + tfree(pSmaData); + } +} + +// RSma: Time-range-wise Rollup SMA +// TODO: refactor when rSma grammar defined finally => +typedef struct { + int64_t interval; + int32_t retention; // unit: day + uint16_t days; // unit: day + int8_t intervalUnit; +} SSmaParams; +// TODO: refactor when rSma grammar defined finally <= + +typedef struct { + // TODO: refactor to use the real schema => + STSma tsma; + float xFilesFactor; + SArray* smaParams; // SSmaParams + // TODO: refactor to use the real schema <= +} SRSma; + typedef struct { uint32_t number; STSma* tSma; @@ -1885,12 +1934,17 @@ static FORCE_INLINE void tdDestroyTSma(STSma* pSma, bool releaseSelf) { } } -static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { - if (pSW && pSW->tSma) { - for (uint32_t i = 0; i < pSW->number; ++i) { - tdDestroyTSma(pSW->tSma + i, false); +static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW, bool releaseSelf) { + if (pSW) { + if (pSW->tSma) { + for (uint32_t i = 0; i < pSW->number; ++i) { + tdDestroyTSma(pSW->tSma + i, false); + } + tfree(pSW->tSma); + } + if (releaseSelf) { + free(pSW); } - tfree(pSW->tSma); } } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index cae186ba16..0f0c4729bc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -184,6 +184,9 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) // Requests handled by QNODE TD_NEW_MSG_SEG(TDMT_QND_MSG) diff --git a/include/common/trow.h b/include/common/trow.h index 49bc2f3515..ef30796d78 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -118,6 +118,8 @@ typedef struct { } SKvRow; typedef struct { + /// timestamp + TSKEY ts; union { /// union field for encode and decode uint32_t info; @@ -138,8 +140,6 @@ typedef struct { uint32_t len; /// row version uint64_t ver; - /// timestamp - TSKEY ts; /// the inline data, maybe a tuple or a k-v tuple char data[]; } STSRow; @@ -173,7 +173,7 @@ typedef struct { #define TD_ROW_DATA(r) ((r)->data) #define TD_ROW_LEN(r) ((r)->len) #define TD_ROW_KEY(r) ((r)->ts) -#define TD_ROW_KEY_ADDR(r) POINTER_SHIFT((r), 16) +#define TD_ROW_KEY_ADDR(r) (r) // N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and // (int32_t)ceil((double)nCols/TD_VTYPE_PARTS) should be added if TD_SUPPORT_BITMAP defined. diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index e5ad43a4ee..b20be691ef 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -57,6 +57,7 @@ STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); SSmaCfg * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); +STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); void metaCloseTbCursor(SMTbCursor *pTbCur); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 6bc89ddd66..5513742c73 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -87,6 +87,27 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); +/** + * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData); + +/** + * @brief Insert RSma(Time-range-wise Rollup SMA) data. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbDef.h b/source/dnode/vnode/src/inc/tsdbDef.h index 98c0de32a8..96a76ea7d4 100644 --- a/source/dnode/vnode/src/inc/tsdbDef.h +++ b/source/dnode/vnode/src/inc/tsdbDef.h @@ -35,6 +35,7 @@ #include "tsdbMemory.h" #include "tsdbOptions.h" #include "tsdbReadImpl.h" +#include "tsdbSma.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index 71f35a9eca..641255a294 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,6 +42,7 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array + SArray * smaf; // sma data file array } SFSStatus; typedef struct { diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h new file mode 100644 index 0000000000..2a326eece8 --- /dev/null +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TSDB_SMA_H_ +#define _TD_TSDB_SMA_H_ + +// insert/update interface +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); + + +// query interface +// TODO: This is the basic params, and should wrap the params to a queryHandle. +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); + +// management interface +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void* result); +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); + + + + +// internal func +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { + int32_t len = 0; + len += taosEncodeFixedU64(pData, tableUid); + len += taosEncodeFixedU16(pData, colId); + len += taosEncodeFixedI64(pData, tsKey); + return len; +} + +#if 0 + +typedef struct { + int minFid; + int midFid; + int maxFid; + TSKEY minKey; +} SRtn; + +typedef struct { + uint64_t uid; + int64_t offset; + int64_t size; +} SKVRecord; + +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); + +static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) { + if (key < 0) { + return (int)((key + 1) / tsTickPerDay[precision] / days - 1); + } else { + return (int)((key / tsTickPerDay[precision] / days)); + } +} + +static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { + if (fid >= pRtn->maxFid) { + return 0; + } else if (fid >= pRtn->midFid) { + return 1; + } else if (fid >= pRtn->minFid) { + return 2; + } else { + return -1; + } +} + +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + +int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); +void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); +void *tsdbCommitData(STsdbRepo *pRepo); +int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); +int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, + bool isLast, bool isSuper, void **ppBuf, void **ppCBuf); +int tsdbApplyRtn(STsdbRepo *pRepo); + +#endif + +#endif /* _TD_TSDB_SMA_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index c31f28d983..03a2937679 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -833,6 +833,7 @@ SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid) { } pCur->uid = uid; + // TODO: lock? ret = pDB->pCtbIdx->cursor(pDB->pSmaIdx, NULL, &(pCur->pCur), 0); if (ret != 0) { free(pCur); @@ -852,25 +853,68 @@ void metaCloseSmaCurosr(SMSmaCursor *pCur) { } } -const char* metaSmaCursorNext(SMSmaCursor *pCur) { - DBT skey = {0}; - DBT pkey = {0}; - DBT pval = {0}; - void *pBuf; +const char *metaSmaCursorNext(SMSmaCursor *pCur) { + DBT skey = {0}; + DBT pkey = {0}; + DBT pval = {0}; // Set key skey.data = &(pCur->uid); skey.size = sizeof(pCur->uid); - + // TODO: lock? if (pCur->pCur->pget(pCur->pCur, &skey, &pkey, &pval, DB_NEXT) == 0) { - const char* indexName = (const char *)pkey.data; + const char *indexName = (const char *)pkey.data; assert(indexName != NULL); return indexName; } else { - return 0; + return NULL; } } +STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { + STSmaWrapper *pSW = NULL; + + pSW = calloc(sizeof(*pSW), 1); + if (pSW == NULL) { + return NULL; + } + + SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid); + if (pCur == NULL) { + free(pSW); + return NULL; + } + + DBT skey = {.data = &(pCur->uid)}; + DBT pval = {.size = sizeof(pCur->uid)}; + void *pBuf = NULL; + + while (true) { + // TODO: lock? + if (pCur->pCur->pget(pCur->pCur, &skey, NULL, &pval, DB_NEXT) == 0) { + ++pSW->number; + STSma *tptr = (STSma *)realloc(pSW->tSma, pSW->number * sizeof(STSma)); + if (tptr == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + pSW->tSma = tptr; + pBuf = pval.data; + if (tDecodeTSma(pBuf, pSW->tSma + pSW->number - 1) == NULL) { + metaCloseSmaCurosr(pCur); + tdDestroyTSmaWrapper(pSW, true); + return NULL; + } + continue; + } + break; + } + + metaCloseSmaCurosr(pCur); + + return pSW; +} static void metaDBWLock(SMetaDB *pDB) { #if IMPL_WITH_LOCK diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c new file mode 100644 index 0000000000..297c2783ec --- /dev/null +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -0,0 +1,2180 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tsdbDef.h" + +#define SMA_STORAGE_TSDB_DAYS 30 +#define SMA_STORAGE_SPLIT_HOURS 24 +#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 + +#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks + +typedef enum { + SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma + SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} +} ESmaStorageLevel; + +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + // TODO +} STSmaWriteH; + +typedef struct { + int32_t iter; +} SmaFsIter; +typedef struct { + STsdb * pTsdb; + char * pDFile; // TODO: use the real DFile type, not char* + int32_t interval; // interval with the precision of DB + int32_t blockSize; // size of SMA block item + int8_t storageLevel; + int8_t days; + SmaFsIter smaFsIter; + // TODO +} STSmaReadH; + +// declaration of static functions +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t sectionDataLen, int32_t nBlocks); +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks); +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid); + +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); + +/** + * @brief Judge the tSma storage level + * + * @param interval + * @param intervalUnit + * @return int32_t + */ +static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { + // TODO: configurable for SMA_STORAGE_SPLIT_HOURS? + switch (intervalUnit) { + case TD_TIME_UNIT_HOUR: + if (interval < SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MINUTE: + if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_SEC: + if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MILLISEC: + if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) { + return SMA_STORAGE_LEVEL_DFILESET; + } + break; + default: + break; + } + return SMA_STORAGE_LEVEL_TSDB; +} + +/** + * @brief Insert TSma data blocks to B+Tree + * + * @param bTree + * @param smaKey + * @param pData + * @param dataLen + * @return int32_t + */ +static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { + // TODO: insert sma data blocks into B+Tree + printf("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d\n", + *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + return TSDB_CODE_SUCCESS; +} + +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) { + if (intervalUnit < TD_TIME_UNIT_MILLISEC) { + switch (intervalUnit) { + case TD_TIME_UNIT_YEAR: + case TD_TIME_UNIT_SEASON: + case TD_TIME_UNIT_MONTH: + case TD_TIME_UNIT_WEEK: + // illegal time unit + tsdbError("invalid interval unit: %d\n", intervalUnit); + TASSERT(0); + break; + case TD_TIME_UNIT_DAY: // the interval for tSma calculation must <= day + interval *= 86400 * 1e3; + break; + case TD_TIME_UNIT_HOUR: + interval *= 3600 * 1e3; + break; + case TD_TIME_UNIT_MINUTE: + interval *= 60 * 1e3; + break; + case TD_TIME_UNIT_SEC: + interval *= 1e3; + break; + default: + break; + } + } + + switch (intervalUnit) { + case TD_TIME_UNIT_MILLISEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e3; + } else { // nano second + return interval * 1e6; + } + break; + case TD_TIME_UNIT_MICROSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval; + } else { // nano second + return interval * 1e3; + } + break; + case TD_TIME_UNIT_NANOSEC: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval / 1e6; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval / 1e3; + } else { // nano second + return interval; + } + break; + default: + if (TSDB_TIME_PRECISION_MILLI == precision) { + return interval * 1e3; + } else if (TSDB_TIME_PRECISION_MICRO == precision) { + return interval * 1e6; + } else { // nano second + return interval * 1e9; + } + break; + } + return interval; +} + +/** + * @brief Split the TSma data blocks into expected size and insert into B+Tree. + * + * @param pSmaH + * @param pData + * @param nOffset The nOffset of blocks since fid changes. + * @param nBlocks The nBlocks with the same fid since nOffset. + * @return int32_t + */ +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t nOffset, int32_t nBlocks) { + STsdb *pTsdb = pSmaH->pTsdb; + + TASSERT(pData->colIds != NULL); + + tsdbDebug("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d", nOffset, nBlocks); + printf("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d\n", nOffset, nBlocks); + + int32_t colDataLen = pData->dataLen / pData->numOfColIds; + int32_t sectionDataLen = pSmaH->blockSize * nBlocks; + + for (col_id_t i = 0; i < pData->numOfColIds; ++i) { + // param: pointer of B+Tree, key, value, dataLen + void *bTree = pSmaH->pDFile; +#ifndef SMA_STORE_SINGLE_BLOCKS + // save tSma data blocks as a whole + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + nOffset * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + nOffset * pSmaH->blockSize, sectionDataLen) < + 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } +#else + // save tSma data blocks separately + for (int32_t n = 0; n < nBlocks; ++n) { + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + (nOffset + n) * pSmaH->interval, + (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + (nOffset + n) * pSmaH->blockSize, + pSmaH->blockSize) < 0) { + tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + } +#endif + } + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, + int32_t fid) { + // TODO + pSmaH->pDFile = "tSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} /** + * @brief Split the sma data blocks by fid. + * + * @param pSmaH + * @param param + * @param pData + * @param nOffset + * @param fid + * @param nSmaBlocks + * @return int32_t + */ +static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, + int32_t fid, int32_t *nSmaBlocks) { + STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); + + // TODO: use binary search + for (int32_t n = nOffset + 1; n < pData->numOfBlocks; ++n) { + // TODO: The tsWindow.skey should use the precision of DB. + int32_t tFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey + pSmaH->interval * n, days, pCfg->precision)); + if (tFid > fid) { + *nSmaBlocks = n - nOffset; + break; + } + } + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Insert/Update Time-range-wise SMA data. + * - If interval < SMA_STORAGE_SPLIT_HOURS(e.g. 24), save the SMA data as a part of DFileSet to e.g. + * v3f1900.tsma.${sma_index_name}. The days is the same with that for TS data files. + * - If interval >= SMA_STORAGE_SPLIT_HOURS, save the SMA data to e.g. vnode3/tsma/v3f632.tsma.${sma_index_name}. The + * days is 30 times of the interval, and the minimum days is SMA_STORAGE_TSDB_DAYS(30d). + * - The destination file of one data block for some interval is determined by its start TS key. + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, param, pData); + + if (pData->numOfBlocks <= 0 || pData->numOfColIds <= 0 || pData->dataLen <= 0) { + TASSERT(0); + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + // Step 1: Judge the storage level + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file + + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, minFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, 0, pData->numOfBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tFid = minFid; + int32_t nOffset = 0; + int32_t nSmaBlocks = 0; + do { + tsdbTSmaDataSplit(&tSmaH, param, pData, daysPerFile, nOffset, tFid, &nSmaBlocks); + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + if (tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, nSmaBlocks) < 0) { + return terrno; + } + + ++tFid; + nOffset += nSmaBlocks; + + if (tFid == maxFid) { + tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); + tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, pData->numOfBlocks - nOffset); + break; + } + } while (true); + + // TODO:tsdbEndTSmaCommit(); + } else { + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, SRSma *param, STSmaData *pData, int32_t fid) { + // TODO + pSmaH->pDFile = "rSma_interval_file_name"; + + return TSDB_CODE_SUCCESS; +} + +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + STsdbCfg * pCfg = REPO_CFG(pTsdb); + STSma * tParam = ¶m->tsma; + STSmaData * curData = pData; + STSmaWriteH tSmaH = {0}; + + tsdbInitTSmaWriteH(&tSmaH, pTsdb, tParam, pData); + + int32_t nSmaBlocks = pData->numOfBlocks; + int32_t colDataLen = pData->dataLen / nSmaBlocks; + + // Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET + // TODO: Use the daysPerFile for rSma data, not for TS data. + // TODO: The lifecycle of rSma data should be processed like the TS data files. + int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, pCfg->daysPerFile, pCfg->precision)); + int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, pCfg->daysPerFile, pCfg->precision)); + + if (minFid == maxFid) { + // Save all the TSma data to one file + tsdbSetRSmaDataFile(&tSmaH, param, pData, minFid); + // TODO: tsdbStartTSmaCommit(); + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else if (minFid < maxFid) { + // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files + // actually. + // TODO: tsdbStartTSmaCommit(); + int32_t tmpFid = 0; + int32_t step = 0; + for (int32_t n = 0; n < pData->numOfBlocks; ++n) { + } + tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); + // TODO:tsdbEndTSmaCommit(); + } else { + TASSERT(0); + return TSDB_CODE_INVALID_PARA; + } + // Step 4: finish + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Init of tSma ReadH + * + * @param pSmaH + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { + pSmaH->pTsdb = pTsdb; + pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); +} + +/** + * @brief Init of tSma FS + * + * @param pReadH + * @param param + * @param queryWin + * @return int32_t + */ +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t daysPerFile = + storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; + pReadH->storageLevel = storageLevel; + pReadH->days = daysPerFile; + pReadH->smaFsIter.iter = 0; +} + +/** + * @brief Set and open tSma file if it has key locates in queryWin. + * + * @param pReadH + * @param param + * @param queryWin + * @return true + * @return false + */ +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { + SArray *smaFs = pReadH->pTsdb->fs->cstatus->smaf; + int32_t nSmaFs = taosArrayGetSize(smaFs); + + pReadH->pDFile = NULL; + + while (pReadH->smaFsIter.iter < nSmaFs) { + void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); + if (pSmaFile) { // match(indexName, queryWindow) + // TODO: select the file by index_name ... + pReadH->pDFile = pSmaFile; + ++pReadH->smaFsIter.iter; + break; + } + ++pReadH->smaFsIter.iter; + } + + if (pReadH->pDFile != NULL) { + tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); + return true; + } + + return false; +} + +/** + * @brief Return the data between queryWin and fill the pData. + * + * @param pTsdb + * @param param + * @param pData + * @param queryWin + * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. + * @return int32_t + */ +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { + STSmaReadH tReadH = {0}; + tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); + + tsdbInitTSmaFile(&tReadH, param, queryWin); + + int32_t nResult = 0; + int64_t lastKey = 0; + + while (true) { + if (nResult >= nMaxResult) { + break; + } + + // set and open the file according to the STSma param + if (tsdbSetAndOpenTSmaFile(&tReadH, param, queryWin)) { + char bTree[100] = "\0"; + while (strncmp(bTree, "has more nodes", 100) == 0) { + if (nResult >= nMaxResult) { + break; + } + // tsdbGetDataFromBTree(bTree, queryWin, lastKey) + // fill the pData + ++nResult; + } + } + } + + // read data from file and fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Get the start TS key of the last data block of one interval/sliding. + * + * @param pTsdb + * @param param + * @param result + * @return int32_t + * 1) Return 0 and fill the result if the check procedure is normal; + * 2) Return -1 if error occurs during the check procedure. + */ +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { + const char *procedure = ""; + if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) { + return -1; + } + // fill the result + return TSDB_CODE_SUCCESS; +} + +/** + * @brief Remove the tSma data files related to param between pWin. + * + * @param pTsdb + * @param param + * @param pWin + * @return int32_t + */ +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin) { + // for ("tSmaFiles of param-interval-sliding between pWin") { + // // remove the tSmaFile + // } + return TSDB_CODE_SUCCESS; +} + +#if 0 +#define TSDB_MAX_SUBBLOCKS 8 + +typedef struct { + STable *pTable; + SSkipListIterator *pIter; +} SCommitIter; + +typedef struct { + SRtn rtn; // retention snapshot + SFSIter fsIter; // tsdb file iterator + int niters; // memory iterators + SCommitIter *iters; + bool isRFileSet; // read and commit FSET + SReadH readh; + SDFileSet wSet; + bool isDFileSame; + bool isLFileSame; + TSKEY minKey; + TSKEY maxKey; + SArray *aBlkIdx; // SBlockIdx array + STable *pTable; + SArray *aSupBlk; // Table super-block array + SArray *aSubBlk; // table sub-block array + SDataCols *pDataCols; +} STSmaWriteH; + +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + +#define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) +#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) +#define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) +#define TSDB_COMMIT_TABLE(ch) ((ch)->pTable) +#define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) +#define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) +#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) +#define TSDB_COMMIT_SMAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAD) +#define TSDB_COMMIT_SMAL_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_SMAL) +#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh)) +#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh)) +#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) +#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) + +static void tsdbStartCommit(STsdb *pRepo); +static void tsdbEndCommit(STsdb *pTsdb, int eno); +static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); +static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); +static int tsdbNextCommitFid(SCommitH *pCommith); +static void tsdbDestroyCommitH(SCommitH *pCommith); +static int tsdbCreateCommitIters(SCommitH *pCommith); +static void tsdbDestroyCommitIters(SCommitH *pCommith); +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static void tsdbResetCommitFile(SCommitH *pCommith); +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +// static int tsdbCommitMeta(STsdbRepo *pRepo); +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +static int tsdbWriteBlockInfo(SCommitH *pCommih); +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitH *pCommith, int bidx); +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock); +static void tsdbResetCommitTable(SCommitH *pCommith); +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); + +int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { + SDiskID did; + SDFileSet nSet = {0}; + STsdbFS *pfs = REPO_FS(pRepo); + int level; + + ASSERT(pSet->fid >= pRtn->minFid); + + level = tsdbGetFidLevel(pSet->fid, pRtn); + + if (tfsAllocDisk(pRepo->pTfs, level, &did) < 0) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + if (did.level > TSDB_FSET_LEVEL(pSet)) { + // Need to move the FSET to higher level + tsdbInitDFileSet(pRepo, &nSet, did, pSet->fid, FS_TXN_VERSION(pfs)); + + if (tsdbCopyDFileSet(pSet, &nSet) < 0) { + tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno)); + return -1; + } + + if (tsdbUpdateDFileSet(pfs, &nSet) < 0) { + return -1; + } + + tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id); + } else { + // On a correct level + if (tsdbUpdateDFileSet(pfs, pSet) < 0) { + return -1; + } + } + + return 0; +} + +int tsdbPrepareCommit(STsdb *pTsdb) { + if (pTsdb->mem == NULL) return 0; + + ASSERT(pTsdb->imem == NULL); + + pTsdb->imem = pTsdb->mem; + pTsdb->mem = NULL; + return 0; +} + +int tsdbCommit(STsdb *pRepo) { + STsdbMemTable *pMem = pRepo->imem; + SCommitH commith = {0}; + SDFileSet *pSet = NULL; + int fid; + + if (pRepo->imem == NULL) return 0; + + tsdbStartCommit(pRepo); + // Resource initialization + if (tsdbInitCommitH(&commith, pRepo) < 0) { + return -1; + } + + // Skip expired memory data and expired FSET + tsdbSeekCommitIter(&commith, commith.rtn.minKey); + while ((pSet = tsdbFSIterNext(&(commith.fsIter)))) { + if (pSet->fid < commith.rtn.minFid) { + tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + } else { + break; + } + } + + // Loop to commit to each file + fid = tsdbNextCommitFid(&(commith)); + while (true) { + // Loop over both on disk and memory + if (pSet == NULL && fid == TSDB_IVLD_FID) break; + + if (pSet && (fid == TSDB_IVLD_FID || pSet->fid < fid)) { + // Only has existing FSET but no memory data to commit in this + // existing FSET, only check if file in correct retention + if (tsdbApplyRtnOnFSet(pRepo, pSet, &(commith.rtn)) < 0) { + tsdbDestroyCommitH(&commith); + return -1; + } + + pSet = tsdbFSIterNext(&(commith.fsIter)); + } else { + // Has memory data to commit + SDFileSet *pCSet; + int cfid; + + if (pSet == NULL || pSet->fid > fid) { + // Commit to a new FSET with fid: fid + pCSet = NULL; + cfid = fid; + } else { + // Commit to an existing FSET + pCSet = pSet; + cfid = pSet->fid; + pSet = tsdbFSIterNext(&(commith.fsIter)); + } + + if (tsdbCommitToFile(&commith, pCSet, cfid) < 0) { + tsdbDestroyCommitH(&commith); + return -1; + } + + fid = tsdbNextCommitFid(&commith); + } + } + + tsdbDestroyCommitH(&commith); + tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); + + return 0; +} + +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + TSKEY minKey, midKey, maxKey, now; + + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision]; + + pRtn->minKey = minKey; + pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision)); + pRtn->midFid = (int)(TSDB_KEY_FID(midKey, pCfg->daysPerFile, pCfg->precision)); + pRtn->maxFid = (int)(TSDB_KEY_FID(maxKey, pCfg->daysPerFile, pCfg->precision)); + tsdbDebug("vgId:%d now:%" PRId64 " minKey:%" PRId64 " minFid:%d, midFid:%d, maxFid:%d", REPO_ID(pRepo), now, minKey, + pRtn->minFid, pRtn->midFid, pRtn->maxFid); +} + +static void tsdbStartCommit(STsdb *pRepo) { + STsdbMemTable *pMem = pRepo->imem; + + tsdbInfo("vgId:%d start to commit", REPO_ID(pRepo)); + + tsdbStartFSTxn(pRepo, 0, 0); +} + +static void tsdbEndCommit(STsdb *pTsdb, int eno) { + tsdbEndFSTxn(pTsdb); + tsdbFreeMemTable(pTsdb, pTsdb->imem); + pTsdb->imem = NULL; + tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed"); +} + +static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + + memset(pCommith, 0, sizeof(*pCommith)); + tsdbGetRtnSnap(pRepo, &(pCommith->rtn)); + + TSDB_FSET_SET_CLOSED(TSDB_COMMIT_WRITE_FSET(pCommith)); + + // Init read handle + if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) { + return -1; + } + + // Init file iterator + tsdbFSIterInit(&(pCommith->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD); + + if (tsdbCreateCommitIters(pCommith) < 0) { + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx)); + if (pCommith->aBlkIdx == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->aSupBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCommith->aSupBlk == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->aSubBlk = taosArrayInit(1024, sizeof(SBlock)); + if (pCommith->aSubBlk == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); + return -1; + } + + pCommith->pDataCols = tdNewDataCols(0, pCfg->maxRowsPerFileBlock); + if (pCommith->pDataCols == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + tsdbDestroyCommitH(pCommith); + return -1; + } + + return 0; +} + +// Skip all keys until key (not included) +static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { + for (int i = 0; i < pCommith->niters; i++) { + SCommitIter *pIter = pCommith->iters + i; + if (pIter->pTable == NULL || pIter->pIter == NULL) continue; + + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL); + } +} + +static int tsdbNextCommitFid(SCommitH *pCommith) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + int fid = TSDB_IVLD_FID; + + for (int i = 0; i < pCommith->niters; i++) { + SCommitIter *pIter = pCommith->iters + i; + // if (pIter->pTable == NULL || pIter->pIter == NULL) continue; + + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + if (nextKey == TSDB_DATA_TIMESTAMP_NULL) { + continue; + } else { + int tfid = (int)(TSDB_KEY_FID(nextKey, pCfg->daysPerFile, pCfg->precision)); + if (fid == TSDB_IVLD_FID || fid > tfid) { + fid = tfid; + } + } + } + + return fid; +} + +static void tsdbDestroyCommitH(SCommitH *pCommith) { + pCommith->pDataCols = tdFreeDataCols(pCommith->pDataCols); + pCommith->aSubBlk = taosArrayDestroy(pCommith->aSubBlk); + pCommith->aSupBlk = taosArrayDestroy(pCommith->aSupBlk); + pCommith->aBlkIdx = taosArrayDestroy(pCommith->aBlkIdx); + tsdbDestroyCommitIters(pCommith); + tsdbDestroyReadH(&(pCommith->readh)); + tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); +} + +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + + ASSERT(pSet == NULL || pSet->fid == fid); + + tsdbResetCommitFile(pCommith); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); + + // Set and open files + if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { + return -1; + } + + // Loop to commit each table data + for (int tid = 0; tid < pCommith->niters; tid++) { + SCommitIter *pIter = pCommith->iters + tid; + + if (pIter->pTable == NULL) continue; + + if (tsdbCommitToTable(pCommith, tid) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + } + + if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < + 0) { + tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + + if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { + tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + + // Close commit file + tsdbCloseCommitFile(pCommith, false); + + if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { + return -1; + } + + return 0; +} + +static int tsdbCreateCommitIters(SCommitH *pCommith) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbMemTable *pMem = pRepo->imem; + SSkipListIterator *pSlIter; + SCommitIter *pCommitIter; + SSkipListNode *pNode; + STbData *pTbData; + + pCommith->niters = SL_SIZE(pMem->pSlIdx); + pCommith->iters = (SCommitIter *)calloc(pCommith->niters, sizeof(SCommitIter)); + if (pCommith->iters == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + // Loop to create iters for each skiplist + pSlIter = tSkipListCreateIter(pMem->pSlIdx); + if (pSlIter == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + for (int i = 0; i < pCommith->niters; i++) { + tSkipListIterNext(pSlIter); + pNode = tSkipListIterGet(pSlIter); + pTbData = (STbData *)pNode->pData; + + pCommitIter = pCommith->iters + i; + pCommitIter->pIter = tSkipListCreateIter(pTbData->pData); + tSkipListIterNext(pCommitIter->pIter); + + pCommitIter->pTable = (STable *)malloc(sizeof(STable)); + pCommitIter->pTable->uid = pTbData->uid; + pCommitIter->pTable->tid = pTbData->uid; + pCommitIter->pTable->pSchema = metaGetTbTSchema(pRepo->pMeta, pTbData->uid, 0); + } + + return 0; +} + +static void tsdbDestroyCommitIters(SCommitH *pCommith) { + if (pCommith->iters == NULL) return; + + for (int i = 1; i < pCommith->niters; i++) { + tSkipListDestroyIter(pCommith->iters[i].pIter); + tdFreeSchema(pCommith->iters[i].pTable->pSchema); + free(pCommith->iters[i].pTable); + } + + free(pCommith->iters); + pCommith->iters = NULL; + pCommith->niters = 0; +} + +static void tsdbResetCommitFile(SCommitH *pCommith) { + pCommith->isRFileSet = false; + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; + taosArrayClear(pCommith->aBlkIdx); +} + +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + SDiskID did; + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); + + if (tfsAllocDisk(pRepo->pTfs, tsdbGetFidLevel(fid, &(pCommith->rtn)), &did) < 0) { + terrno = TSDB_CODE_TDB_NO_AVAIL_DISK; + return -1; + } + + // Open read FSET + if (pSet) { + if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) { + return -1; + } + + pCommith->isRFileSet = true; + + if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is opened to read to commit", REPO_ID(pRepo), TSDB_FSET_FID(pSet), + TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); + } else { + pCommith->isRFileSet = false; + } + + // Set and open commit FSET + if (pSet == NULL || did.level > TSDB_FSET_LEVEL(pSet)) { + // Create a new FSET to write data + tsdbInitDFileSet(pRepo, pWSet, did, fid, FS_TXN_VERSION(REPO_FS(pRepo))); + + if (tsdbCreateDFileSet(pRepo, pWSet, true) < 0) { + tsdbError("vgId:%d failed to create FSET %d at level %d disk id %d since %s", REPO_ID(pRepo), + TSDB_FSET_FID(pWSet), TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet), tstrerror(terrno)); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + return -1; + } + + pCommith->isDFileSame = false; + pCommith->isLFileSame = false; + + tsdbDebug("vgId:%d FSET %d at level %d disk id %d is created to commit", REPO_ID(pRepo), TSDB_FSET_FID(pWSet), + TSDB_FSET_LEVEL(pWSet), TSDB_FSET_ID(pWSet)); + } else { + did.level = TSDB_FSET_LEVEL(pSet); + did.id = TSDB_FSET_ID(pSet); + + pCommith->wSet.fid = fid; + pCommith->wSet.state = 0; + + // TSDB_FILE_HEAD + SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith); + tsdbInitDFile(pRepo, pWHeadf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_HEAD); + if (tsdbCreateDFile(pRepo, pWHeadf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWHeadf), + tstrerror(terrno)); + + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + + // TSDB_FILE_DATA + SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh)); + SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith); + tsdbInitDFileEx(pWDataf, pRDataf); + if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWDataf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + pCommith->isDFileSame = true; + + // TSDB_FILE_LAST + SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh)); + SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith); + if (pRLastf->info.size < 32 * 1024) { + tsdbInitDFileEx(pWLastf, pRLastf); + pCommith->isLFileSame = true; + + if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) { + tsdbError("vgId:%d failed to open file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } else { + tsdbInitDFile(pRepo, pWLastf, did, fid, FS_TXN_VERSION(REPO_FS(pRepo)), TSDB_FILE_LAST); + pCommith->isLFileSame = false; + + if (tsdbCreateDFile(pRepo, pWLastf, true) < 0) { + tsdbError("vgId:%d failed to create file %s to commit since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pWLastf), + tstrerror(terrno)); + + tsdbCloseDFileSet(pWSet); + (void)tsdbRemoveDFile(pWHeadf); + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + return -1; + } + } + } + } + + return 0; +} + +// extern int32_t tsTsdbMetaCompactRatio; + +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, + SBlockIdx *pIdx) { + size_t nSupBlocks; + size_t nSubBlocks; + uint32_t tlen; + SBlockInfo *pBlkInfo; + int64_t offset; + SBlock *pBlock; + + memset(pIdx, 0, sizeof(*pIdx)); + + nSupBlocks = taosArrayGetSize(pSupA); + nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); + + if (nSupBlocks <= 0) { + // No data (data all deleted) + return 0; + } + + tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + pBlkInfo = *ppBuf; + + pBlkInfo->delimiter = TSDB_FILE_DELIMITER; + pBlkInfo->tid = TABLE_TID(pTable); + pBlkInfo->uid = TABLE_UID(pTable); + + memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); + if (nSubBlocks > 0) { + memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); + + for (int i = 0; i < nSupBlocks; i++) { + pBlock = pBlkInfo->blocks + i; + + if (pBlock->numOfSubBlocks > 1) { + pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + } + } + } + + taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + + if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + + // Set pIdx + pBlock = taosArrayGetLast(pSupA); + + pIdx->tid = TABLE_TID(pTable); + pIdx->uid = TABLE_UID(pTable); + pIdx->hasLast = pBlock->last ? 1 : 0; + pIdx->maxKey = pBlock->keyLast; + pIdx->numOfBlocks = (uint32_t)nSupBlocks; + pIdx->len = tlen; + pIdx->offset = (uint32_t)offset; + + return 0; +} + +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { + SBlockIdx *pBlkIdx; + size_t nidx = taosArrayGetSize(pIdxA); + int tlen = 0, size; + int64_t offset; + + if (nidx <= 0) { + // All data are deleted + pHeadf->info.offset = 0; + pHeadf->info.len = 0; + return 0; + } + + for (size_t i = 0; i < nidx; i++) { + pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); + + size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); + if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; + + void *ptr = POINTER_SHIFT(*ppBuf, tlen); + tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + + tlen += size; + } + + tlen += sizeof(TSCKSUM); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); + + if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); + pHeadf->info.offset = (uint32_t)offset; + pHeadf->info.len = tlen; + + return 0; +} + +// // =================== Commit Meta Data +// static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMFile * pOMFile = pfs->cstatus->pmf; +// SDiskID did; + +// // Create/Open a meta file or open the existing file +// if (pOMFile == NULL) { +// // Create a new meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + +// if (open && tsdbCreateMFile(pMf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } + +// tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); +// } else { +// tsdbInitMFileEx(pMf, pOMFile); +// if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { +// tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } +// } + +// return 0; +// } + +// static int tsdbCommitMeta(STsdbRepo *pRepo) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMemTable *pMem = pRepo->imem; +// SMFile * pOMFile = pfs->cstatus->pmf; +// SMFile mf; +// SActObj * pAct = NULL; +// SActCont * pCont = NULL; +// SListNode *pNode = NULL; + +// ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); + +// if (listNEles(pMem->actList) <= 0) { +// // no meta data to commit, just keep the old meta file +// tsdbUpdateMFile(pfs, pOMFile); +// if (tsTsdbMetaCompactRatio > 0) { +// if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) { +// return -1; +// } +// int ret = tsdbCompactMetaFile(pRepo, pfs, &mf); +// if (ret < 0) tsdbError("compact meta file error"); + +// return ret; +// } +// return 0; +// } else { +// if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) { +// return -1; +// } +// } + +// // Loop to write +// while ((pNode = tdListPopHead(pMem->actList)) != NULL) { +// pAct = (SActObj *)pNode->data; +// if (pAct->act == TSDB_UPDATE_META) { +// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); +// if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// (void)tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else if (pAct->act == TSDB_DROP_META) { +// if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { +// tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else { +// ASSERT(false); +// } +// } + +// if (tsdbUpdateMFileHeader(&mf) < 0) { +// tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } + +// TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbUpdateMFile(pfs, &mf); + +// if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { +// tsdbError("compact meta file error"); +// } + +// return 0; +// } + +// int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { +// int tlen = 0; +// tlen += taosEncodeFixedU64(buf, pRecord->uid); +// tlen += taosEncodeFixedI64(buf, pRecord->offset); +// tlen += taosEncodeFixedI64(buf, pRecord->size); + +// return tlen; +// } + +// void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { +// buf = taosDecodeFixedU64(buf, &(pRecord->uid)); +// buf = taosDecodeFixedI64(buf, &(pRecord->offset)); +// buf = taosDecodeFixedI64(buf, &(pRecord->size)); + +// return buf; +// } + +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { +// char buf[64] = "\0"; +// void * pBuf = buf; +// SKVRecord rInfo; +// int64_t offset; + +// // Seek to end of meta file +// offset = tsdbSeekMFile(pMFile, 0, SEEK_END); +// if (offset < 0) { +// return -1; +// } + +// rInfo.offset = offset; +// rInfo.uid = uid; +// rInfo.size = contLen; + +// int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); +// if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { +// return -1; +// } + +// if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { +// return -1; +// } + +// tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); + +// SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; + +// pMFile->info.nRecords++; + +// SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); +// if (pRecord != NULL) { +// pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); +// } else { +// pMFile->info.nRecords++; +// } +// taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); + +// return 0; +// } + +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { +// SKVRecord rInfo = {0}; +// char buf[128] = "\0"; + +// SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// if (pRecord == NULL) { +// tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); +// return -1; +// } + +// rInfo.offset = -pRecord->offset; +// rInfo.uid = pRecord->uid; +// rInfo.size = pRecord->size; + +// void *pBuf = buf; +// tsdbEncodeKVRecord(&pBuf, &rInfo); + +// if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { +// return -1; +// } + +// pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); +// pMFile->info.nDels++; +// pMFile->info.nRecords--; +// pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + +// taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// return 0; +// } + +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { +// float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); +// float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); +// float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; + +// if (delPercent < compactRatio && tombPercent < compactRatio) { +// return 0; +// } + +// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { +// tsdbError("open meta file %s compact fail", pMFile->f.rname); +// return -1; +// } + +// tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 +// ",size:%" PRId64, +// tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); + +// SMFile mf; +// SDiskID did; + +// // first create tmp meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1); + +// if (tsdbCreateMFile(&mf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } + +// tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); + +// // second iterator metaCache +// int code = -1; +// int64_t maxBufSize = 1024; +// SKVRecord *pRecord; +// void *pBuf = NULL; + +// pBuf = malloc((size_t)maxBufSize); +// if (pBuf == NULL) { +// goto _err; +// } + +// // init Comp +// assert(pfs->metaCacheComp == NULL); +// pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); +// if (pfs->metaCacheComp == NULL) { +// goto _err; +// } + +// pRecord = taosHashIterate(pfs->metaCache, NULL); +// while (pRecord) { +// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { +// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } +// if (pRecord->size > maxBufSize) { +// maxBufSize = pRecord->size; +// void* tmp = realloc(pBuf, (size_t)maxBufSize); +// if (tmp == NULL) { +// goto _err; +// } +// pBuf = tmp; +// } +// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); +// if (nread < 0) { +// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } + +// if (nread < pRecord->size) { +// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", +// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); +// goto _err; +// } + +// if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, +// tstrerror(terrno)); +// goto _err; +// } + +// pRecord = taosHashIterate(pfs->metaCache, pRecord); +// } +// code = 0; + +// _err: +// if (code == 0) TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbCloseMFile(pMFile); + +// if (code == 0) { +// // rename meta.tmp -> meta +// tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), +// TSDB_FILE_FULL_NAME(pMFile)); taosRename(mf.f.aname,pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname, +// TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); +// // update current meta file info +// pfs->nstatus->pmf = NULL; +// tsdbUpdateMFile(pfs, &mf); + +// taosHashCleanup(pfs->metaCache); +// pfs->metaCache = pfs->metaCacheComp; +// pfs->metaCacheComp = NULL; +// } else { +// // remove meta.tmp file +// remove(mf.f.aname); +// taosHashCleanup(pfs->metaCacheComp); +// pfs->metaCacheComp = NULL; +// } + +// tfree(pBuf); + +// ASSERT(mf.info.nDels == 0); +// ASSERT(mf.info.tombSize == 0); + +// tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, +// code,mf.info.nRecords,mf.info.size); +// return code; +// } + +// // =================== Commit Time-Series Data +// #if 0 +// static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { +// for (int i = 0; i < nIters; i++) { +// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); +// if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true; +// } +// return false; +// } +// #endif + +static int tsdbCommitToTable(SCommitH *pCommith, int tid) { + SCommitIter *pIter = pCommith->iters + tid; + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + + tsdbResetCommitTable(pCommith); + + // Set commit table + if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { + return -1; + } + + // No disk data and no memory data, just return + if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { + return 0; + } + + // Must has disk data or has memory data + int nBlocks; + int bidx = 0; + SBlock *pBlock; + + if (pCommith->readh.pBlkIdx) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { + return -1; + } + + nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + } else { + nBlocks = 0; + } + + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + + while (true) { + if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; + + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || + (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { + // merge pBlock data and memory data + if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { + return -1; + } + + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + nextKey = tsdbNextIterKey(pIter->pIter); + } else { + // Only commit memory data + if (pBlock == NULL) { + if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { + return -1; + } + } else { + if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { + return -1; + } + } + nextKey = tsdbNextIterKey(pIter->pIter); + } + } + + if (tsdbWriteBlockInfo(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + return -1; + } + + return 0; +} + +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + + pCommith->pTable = pTable; + + if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pCommith->isRFileSet) { + if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { + return -1; + } + } else { + pCommith->readh.pBlkIdx = NULL; + } + return 0; +} + +static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SBlock *pBlock = (SBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } else { + return 0; + } +} + +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper, void **ppBuf, void **ppCBuf) { + STsdbCfg *pCfg = REPO_CFG(pRepo); + SBlockData *pBlockData; + int64_t offset = 0; + int rowsToWrite = pDataCols->numOfRows; + + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); + ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); + + // Make buffer space + if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { + return -1; + } + pBlockData = (SBlockData *)(*ppBuf); + + // Get # of cols not all NULL(not including key column) + int nColsNotAllNull = 0; + for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column + SDataCol *pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; + + if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it + continue; + } + + memset(pBlockCol, 0, sizeof(*pBlockCol)); + + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; + if (tDataTypes[pDataCol->type].statisFunc) { + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), + &(pBlockCol->numOfNull)); + if (pBlockCol->numOfNull == 0) { + TD_SET_COL_ROWS_NORM(pBlockCol); + } else { + TD_SET_COL_ROWS_MISC(pBlockCol); + } + } else { + TD_SET_COL_ROWS_MISC(pBlockCol); + } + nColsNotAllNull++; + } + + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); + + // Compress the data if neccessary + int tcol = 0; // counter of not all NULL and written columns + uint32_t toffset = 0; + int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); + int32_t lsize = tsize; + int32_t keyLen = 0; + int32_t nBitmaps = (int32_t)TD_BITMAP_BYTES(rowsToWrite); + int32_t tBitmaps = 0; + + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + // All not NULL columns finish + if (ncol != 0 && tcol >= nColsNotAllNull) break; + + SDataCol *pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + tcol; + + if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; + + int32_t flen; // final length + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + +#ifdef TD_SUPPORT_BITMAP + int32_t tBitmaps = 0; + if ((ncol != 0) && !TD_COL_ROWS_NORM(pBlockCol)) { + if (IS_VAR_DATA_TYPE(pDataCol->type)) { + tBitmaps = nBitmaps; + tlen += tBitmaps; + } else { + tBitmaps = (int32_t)ceil((double)nBitmaps / TYPE_BYTES[pDataCol->type]); + tlen += tBitmaps * TYPE_BYTES[pDataCol->type]; + } + // move bitmap parts ahead + // TODO: put bitmap part to the 1st location(pBitmap points to pData) to avoid the memmove + memcpy(POINTER_SHIFT(pDataCol->pData, pDataCol->len), pDataCol->pBitmap, nBitmaps); + } +#endif + + void *tptr; + + // Make room + if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { + return -1; + } + pBlockData = (SBlockData *)(*ppBuf); + pBlockCol = pBlockData->cols + tcol; + tptr = POINTER_SHIFT(pBlockData, lsize); + + if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { + return -1; + } + + // Compress or just copy + if (pCfg->compression) { + flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite + tBitmaps, tptr, + tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, + tlen + COMP_OVERFLOW_BYTES); + } else { + flen = tlen; + memcpy(tptr, pDataCol->pData, flen); + } + + // Add checksum + ASSERT(flen > 0); + flen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); + + if (ncol != 0) { + tsdbSetBlockColOffset(pBlockCol, toffset); + pBlockCol->len = flen; + tcol++; + } else { + keyLen = flen; + } + + toffset += flen; + lsize += flen; + } + + pBlockData->delimiter = TSDB_FILE_DELIMITER; + pBlockData->uid = TABLE_UID(pTable); + pBlockData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); + + // Write the whole block to file + if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { + return -1; + } + + // Update pBlock membership vairables + pBlock->last = isLast; + pBlock->offset = offset; + pBlock->algorithm = pCfg->compression; + pBlock->numOfRows = rowsToWrite; + pBlock->len = lsize; + pBlock->keyLen = keyLen; + pBlock->numOfSubBlocks = isSuper ? 1 : 0; + pBlock->numOfCols = nColsNotAllNull; + pBlock->keyFirst = dataColsKeyFirst(pDataCols); + pBlock->keyLast = dataColsKeyLast(pDataCols); + + tsdbDebug("vgId:%d uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64 + " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, + REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, + pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); + + return 0; +} + +static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper) { + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, + isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), + (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); +} + +static int tsdbWriteBlockInfo(SCommitH *pCommih) { + SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); + SBlockIdx blkIdx; + STable *pTable = TSDB_COMMIT_TABLE(pCommih); + + if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), + &blkIdx) < 0) { + return -1; + } + + if (blkIdx.numOfBlocks == 0) { + return 0; + } + + if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + SMergeInfo mInfo; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + SDFile *pDFile; + bool isLast; + SBlock block; + + while (true) { + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, + pCfg->update, &mInfo); + + if (pCommith->pDataCols->numOfRows <= 0) break; + + if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } else { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { + return -1; + } + } + + return 0; +} + +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + TSKEY keyLimit; + int16_t colId = PRIMARYKEY_TIMESTAMP_COL_ID; + SMergeInfo mInfo; + SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; + SBlock block, supBlock; + SDFile *pDFile; + + if (bidx == nBlocks - 1) { + keyLimit = pCommith->maxKey; + } else { + keyLimit = pBlock[1].keyFirst - 1; + } + + SSkipListIterator titer = *(pIter->pIter); + if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; + + tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, + pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); + + if (mInfo.nOperations == 0) { + // no new data to insert (all updates denied) + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + *(pIter->pIter) = titer; + } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { + // Ignore the block + ASSERT(0); + *(pIter->pIter) = titer; + } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { + // Add a sub-block + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, + pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, + &mInfo); + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; + + if (pBlock->numOfSubBlocks == 1) { + subBlocks[0] = *pBlock; + subBlocks[0].numOfSubBlocks = 0; + } else { + memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + sizeof(SBlock) * pBlock->numOfSubBlocks); + } + subBlocks[pBlock->numOfSubBlocks] = block; + supBlock = *pBlock; + supBlock.keyFirst = mInfo.keyFirst; + supBlock.keyLast = mInfo.keyLast; + supBlock.numOfSubBlocks++; + supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; + supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); + + if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; + } + + return 0; +} + +static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + SDFile *pDFile; + SBlock block; + bool isSameFile; + + ASSERT(pBlock->numOfSubBlocks > 0); + + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isSameFile = pCommith->isLFileSame; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isSameFile = pCommith->isDFileSame; + } + + if (isSameFile) { + if (pBlock->numOfSubBlocks == 1) { + if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { + return -1; + } + } else { + block = *pBlock; + block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); + + if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + pBlock->numOfSubBlocks) < 0) { + return -1; + } + } + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } + + return 0; +} + +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { + if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + SBlock block; + SDFile *pDFile; + bool isLast; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + + int biter = 0; + while (true) { + tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, + pCfg->update); + + if (pCommith->pDataCols->numOfRows == 0) break; + + if (isLastOneBlock) { + if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } + + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } + + return 0; +} + +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update) { + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; + + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); + + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + STSRow *row = tsdbNextIterRow(pCommitIter->pIter); + if (row == NULL || TD_ROW_KEY(row) > maxKey) { + key2 = INT64_MAX; + } else { + key2 = TD_ROW_KEY(row); + } + + if (key1 == INT64_MAX && key2 == INT64_MAX) break; + + if (key1 < key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + // TODO: dataColAppendVal may fail + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints); + } + + pTarget->numOfRows++; + (*iter)++; + } else if (key1 > key2) { + if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); + ASSERT(pSchema != NULL); + } + + tdAppendSTSRowToDataCol(row, pSchema, pTarget, true); + + tSkipListIterNext(pCommitIter->pIter); + } else { + if (update != TD_ROW_OVERWRITE_UPDATE) { + // copy disk data + for (int i = 0; i < pDataCols->numOfCols; i++) { + // TODO: dataColAppendVal may fail + SCellVal sVal = {0}; + if (tdGetColDataOfRow(&sVal, pDataCols->cols + i, *iter) < 0) { + TASSERT(0); + } + tdAppendValToDataCol(pTarget->cols + i, sVal.valType, sVal.val, pTarget->numOfRows, pTarget->maxPoints); + } + + if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + } + if (update != TD_ROW_DISCARD_UPDATE) { + // copy mem data + if (pSchema == NULL || schemaVersion(pSchema) != TD_ROW_SVER(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, TD_ROW_SVER(row)); + ASSERT(pSchema != NULL); + } + + tdAppendSTSRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); + } + (*iter)++; + tSkipListIterNext(pCommitIter->pIter); + } + + if (pTarget->numOfRows >= maxRows) break; + } +} + +static void tsdbResetCommitTable(SCommitH *pCommith) { + taosArrayClear(pCommith->aSubBlk); + taosArrayClear(pCommith->aSupBlk); + pCommith->pTable = NULL; +} + +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { + if (pCommith->isRFileSet) { + tsdbCloseAndUnsetFSet(&(pCommith->readh)); + } + + if (!hasError) { + TSDB_FSET_FSYNC(TSDB_COMMIT_WRITE_FSET(pCommith)); + } + tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); +} + +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; + + ASSERT(mergeRows > 0); + + if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { + if (pBlock->last) { + if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; + } else { + if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; + } + } + + return false; +} + +// int tsdbApplyRtn(STsdbRepo *pRepo) { +// SRtn rtn; +// SFSIter fsiter; +// STsdbFS * pfs = REPO_FS(pRepo); +// SDFileSet *pSet; + +// // Get retention snapshot +// tsdbGetRtnSnap(pRepo, &rtn); + +// tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); +// while ((pSet = tsdbFSIterNext(&fsiter))) { +// if (pSet->fid < rtn.minFid) { +// tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, +// TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); +// continue; +// } + +// if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { +// return -1; +// } +// } + +// return 0; +// } +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 78067f8f83..ba8eea809e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -15,6 +15,14 @@ #include "tsdbDef.h" +/** + * @brief insert TS data + * + * @param pTsdb + * @param pMsg + * @param pRsp + * @return int + */ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { // Check if mem is there. If not, create one. if (pTsdb->mem == NULL) { @@ -24,4 +32,37 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { } } return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); +} + +/** + * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + * TODO: Who is responsible for resource allocate and release? + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +/** + * @brief Insert Time-range-wise Rollup Sma(RSma) data + * + * @param pTsdb + * @param param + * @param pData + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) { + tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; } \ No newline at end of file diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index a2616307ff..f3f21dc9c0 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -132,6 +132,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tqProcessRebReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { } } break; + case TDMT_VND_CREATE_SMA: { // timeRangeSMA + // 1. tdCreateSmaMeta(pVnode->pMeta,...); + // 2. tdCreateSmaDataInit(); + // 3. tdCreateSmaData + } break; + case TDMT_VND_CANCEL_SMA: { // timeRangeSMA + } break; + case TDMT_VND_DROP_SMA: { // timeRangeSMA + } break; default: ASSERT(0); break; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 986986aa70..97157fc49c 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -95,7 +95,7 @@ TEST(testCase, tSmaEncodeDecodeTest) { // resource release tdDestroyTSma(&tSma, false); - tdDestroyTSmaWrapper(&dstTSmaWrapper); + tdDestroyTSmaWrapper(&dstTSmaWrapper, false); } TEST(testCase, tSma_DB_Put_Get_Del_Test) { @@ -161,7 +161,7 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(qSmaCfg->interval, tSma.interval); tdDestroyTSma(qSmaCfg, true); - // get value by table uid + // get index name by table uid SMSmaCursor *pSmaCur = metaOpenSmaCursor(pMeta, tbUid); assert(pSmaCur != NULL); uint32_t indexCnt = 0; @@ -176,6 +176,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { EXPECT_EQ(indexCnt, 2); metaCloseSmaCurosr(pSmaCur); + // get wrapper by table uid + STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); + assert(pSW != NULL); + EXPECT_EQ(pSW->number, 2); + EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); + EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); + EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); + EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); + // resource release metaRemoveSmaFromDb(pMeta, smaIndexName1); metaRemoveSmaFromDb(pMeta, smaIndexName2); @@ -197,15 +206,15 @@ TEST(testCase, tSmaInsertTest) { int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); int32_t numOfColIds = 3; - int32_t numOfSmaBlocks = 10; + int32_t numOfBlocks = 10; - int32_t dataLen = numOfColIds * numOfSmaBlocks * blockSize; + int32_t dataLen = numOfColIds * numOfBlocks * blockSize; pSmaData = (STSmaData*)malloc(sizeof(STSmaData) + dataLen); ASSERT_EQ(pSmaData != NULL, true); pSmaData->tableUid = 3232329230; pSmaData->numOfColIds = numOfColIds; - pSmaData->numOfSmaBlocks = numOfSmaBlocks; + pSmaData->numOfBlocks = numOfBlocks; pSmaData->dataLen = dataLen; pSmaData->tsWindow.skey = 1640000000; pSmaData->tsWindow.ekey = 1645788649;