diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 0dcf554433..ec74b6e8b9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1895,24 +1895,19 @@ typedef enum { } ETDTimeUnit; typedef struct { - uint16_t funcId; - uint16_t nColIds; - col_id_t* colIds; // sorted colIds -} SFuncColIds; - -typedef struct { - uint8_t version; // for compatibility - uint8_t intervalUnit; - uint8_t slidingUnit; - char indexName[TSDB_INDEX_NAME_LEN]; - char timezone[TD_TIMEZONE_LEN]; - uint16_t nFuncColIds; - uint16_t tagsFilterLen; - tb_uid_t tableUid; // super/common table uid - int64_t interval; - int64_t sliding; - SFuncColIds* funcColIds; // sorted funcIds - char* tagsFilter; + int8_t version; // for compatibility(default 0) + int8_t intervalUnit; + int8_t slidingUnit; + char indexName[TSDB_INDEX_NAME_LEN]; + char timezone[TD_TIMEZONE_LEN]; // sma data is invalid if timezone change. + uint16_t exprLen; + uint16_t tagsFilterLen; + int64_t indexUid; + tb_uid_t tableUid; // super/child/common table uid + int64_t interval; + int64_t sliding; + char* expr; // sma expression + char* tagsFilter; } STSma; // Time-range-wise SMA typedef struct { @@ -1939,24 +1934,30 @@ int32_t tSerializeSVDropTSmaReq(void** buf, SVDropTSmaReq* pReq); void* tDeserializeSVDropTSmaReq(void* buf, SVDropTSmaReq* pReq); typedef struct { - STimeWindow tsWindow; // [skey, ekey] - uint64_t tableUid; // sub/common table uid - int32_t numOfBlocks; // number of sma blocks for each column, total number is numOfBlocks*numOfColId - int32_t dataLen; // total data length - col_id_t* colIds; // e.g. 2,4,9,10 - col_id_t numOfColIds; // e.g. 4 - char data[]; // the sma blocks -} STSmaData; + col_id_t colId; + uint16_t blockSize; // sma data block size + char data[]; +} STSmaColData; -// TODO: move to the final location afte schema of STSma/STSmaData defined -static FORCE_INLINE void tdDestroySmaData(STSmaData* pSmaData) { - if (pSmaData) { - if (pSmaData->colIds) { - tfree(pSmaData->colIds); - } - tfree(pSmaData); - } -} +typedef struct { + tb_uid_t tableUid; // super/child/normal table uid + int32_t dataLen; // not including head + char data[]; +} STSmaTbData; + +typedef struct { + int64_t indexUid; + TSKEY skey; // startTS of one interval/sliding + int64_t interval; + int32_t dataLen; // not including head + int8_t intervalUnit; + char data[]; +} STSmaDataWrapper; // sma data for a interval/sliding window + +// interval/sliding => window + +// => window->table->colId +// => 当一个window下所有的表均计算完成时,流计算告知tsdb清除window的过期标记 // RSma: Rollup SMA typedef struct { @@ -1979,13 +1980,7 @@ typedef struct { static FORCE_INLINE void tdDestroyTSma(STSma* pSma) { if (pSma) { - if (pSma->funcColIds != NULL) { - for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) { - tfree((pSma->funcColIds + i)->colIds); - } - tfree(pSma->funcColIds); - } - + tfree(pSma->expr); tfree(pSma->tagsFilter); } } @@ -2004,24 +1999,20 @@ static FORCE_INLINE void tdDestroyTSmaWrapper(STSmaWrapper* pSW) { static FORCE_INLINE int32_t tEncodeTSma(void** buf, const STSma* pSma) { int32_t tlen = 0; - tlen += taosEncodeFixedU8(buf, pSma->version); - tlen += taosEncodeFixedU8(buf, pSma->intervalUnit); - tlen += taosEncodeFixedU8(buf, pSma->slidingUnit); + tlen += taosEncodeFixedI8(buf, pSma->version); + tlen += taosEncodeFixedI8(buf, pSma->intervalUnit); + tlen += taosEncodeFixedI8(buf, pSma->slidingUnit); tlen += taosEncodeString(buf, pSma->indexName); tlen += taosEncodeString(buf, pSma->timezone); - tlen += taosEncodeFixedU16(buf, pSma->nFuncColIds); + tlen += taosEncodeFixedU16(buf, pSma->exprLen); tlen += taosEncodeFixedU16(buf, pSma->tagsFilterLen); + tlen += taosEncodeFixedI64(buf, pSma->indexUid); tlen += taosEncodeFixedI64(buf, pSma->tableUid); tlen += taosEncodeFixedI64(buf, pSma->interval); tlen += taosEncodeFixedI64(buf, pSma->sliding); - - for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) { - SFuncColIds* funcColIds = pSma->funcColIds + i; - tlen += taosEncodeFixedU16(buf, funcColIds->funcId); - tlen += taosEncodeFixedU16(buf, funcColIds->nColIds); - for (uint16_t j = 0; j < funcColIds->nColIds; ++j) { - tlen += taosEncodeFixedU16(buf, *(funcColIds->colIds + j)); - } + + if (pSma->exprLen > 0) { + tlen += taosEncodeString(buf, pSma->expr); } if (pSma->tagsFilterLen > 0) { @@ -2042,43 +2033,30 @@ static FORCE_INLINE int32_t tEncodeTSmaWrapper(void** buf, const STSmaWrapper* p } static FORCE_INLINE void* tDecodeTSma(void* buf, STSma* pSma) { - buf = taosDecodeFixedU8(buf, &pSma->version); - buf = taosDecodeFixedU8(buf, &pSma->intervalUnit); - buf = taosDecodeFixedU8(buf, &pSma->slidingUnit); + buf = taosDecodeFixedI8(buf, &pSma->version); + buf = taosDecodeFixedI8(buf, &pSma->intervalUnit); + buf = taosDecodeFixedI8(buf, &pSma->slidingUnit); buf = taosDecodeStringTo(buf, pSma->indexName); buf = taosDecodeStringTo(buf, pSma->timezone); - buf = taosDecodeFixedU16(buf, &pSma->nFuncColIds); + buf = taosDecodeFixedU16(buf, &pSma->exprLen); buf = taosDecodeFixedU16(buf, &pSma->tagsFilterLen); + buf = taosDecodeFixedI64(buf, &pSma->indexUid); buf = taosDecodeFixedI64(buf, &pSma->tableUid); buf = taosDecodeFixedI64(buf, &pSma->interval); buf = taosDecodeFixedI64(buf, &pSma->sliding); - if (pSma->nFuncColIds > 0) { - pSma->funcColIds = (SFuncColIds*)calloc(pSma->nFuncColIds, sizeof(SFuncColIds)); - if (pSma->funcColIds == NULL) { + + if (pSma->exprLen > 0) { + pSma->expr = (char*)calloc(pSma->exprLen, 1); + if (pSma->expr != NULL) { + buf = taosDecodeStringTo(buf, pSma->expr); + } else { tdDestroyTSma(pSma); return NULL; } - for (uint16_t i = 0; i < pSma->nFuncColIds; ++i) { - SFuncColIds* funcColIds = pSma->funcColIds + i; - buf = taosDecodeFixedU16(buf, &funcColIds->funcId); - buf = taosDecodeFixedU16(buf, &funcColIds->nColIds); - if (funcColIds->nColIds > 0) { - funcColIds->colIds = (col_id_t*)calloc(funcColIds->nColIds, sizeof(col_id_t)); - if (funcColIds->colIds != NULL) { - for (uint16_t j = 0; j < funcColIds->nColIds; ++j) { - buf = taosDecodeFixedU16(buf, funcColIds->colIds + j); - } - } else { - tdDestroyTSma(pSma); - return NULL; - } - } else { - funcColIds->colIds = NULL; - } - } + } else { - pSma->funcColIds = NULL; + pSma->expr = NULL; } if (pSma->tagsFilterLen > 0) { diff --git a/source/dnode/vnode/inc/meta.h b/source/dnode/vnode/inc/meta.h index 2d747d0e80..05749884d3 100644 --- a/source/dnode/vnode/inc/meta.h +++ b/source/dnode/vnode/inc/meta.h @@ -58,8 +58,8 @@ STbCfg * metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); STbCfg * metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema * metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -STSma * metaGetSmaInfoByName(SMeta *pMeta, const char *indexName); -STSmaWrapper * metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid); +STSma * metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid); +STSmaWrapper * metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); SArray * metaGetSmaTbUids(SMeta *pMeta, bool isDup); SMTbCursor *metaOpenTbCursor(SMeta *pMeta); diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 5513742c73..e67e0cae4b 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -89,24 +89,21 @@ int tsdbCommit(STsdb *pTsdb); /** * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine - * - * @param pTsdb - * @param param - * @param pData - * @return int32_t + * + * @param pTsdb + * @param msg + * @return int32_t */ -int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData); +int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); /** * @brief Insert RSma(Time-range-wise Rollup SMA) data. - * - * @param pTsdb - * @param param - * @param pData - * @return int32_t + * + * @param pTsdb + * @param msg + * @return int32_t */ -int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData); - +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); // STsdbCfg int tsdbOptionsInit(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbFS.h b/source/dnode/vnode/src/inc/tsdbFS.h index a7275f57fd..96c5f8468f 100644 --- a/source/dnode/vnode/src/inc/tsdbFS.h +++ b/source/dnode/vnode/src/inc/tsdbFS.h @@ -42,17 +42,14 @@ typedef struct { typedef struct { STsdbFSMeta meta; // FS meta SArray * df; // data file array - - // SArray * v2t100.index_name - - SArray * smaf; // sma data file array v2t1900.index_name + SArray * sf; // sma data file array v2(t|r)1900.index_name_1 } SFSStatus; /** * @brief Directory structure of .tsma data files. - * - * root@cary /vnode2/tsdb $ tree .tsma/ - * .tsma/ + * + * /vnode2/tsdb $ tree .sma/ + * .sma/ * ├── v2t100.index_name_1 * ├── v2t101.index_name_1 * ├── v2t102.index_name_1 @@ -66,7 +63,7 @@ typedef struct { * 0 directories, 9 files */ - typedef struct { +typedef struct { pthread_rwlock_t lock; SFSStatus *cstatus; // current status diff --git a/source/dnode/vnode/src/inc/tsdbFile.h b/source/dnode/vnode/src/inc/tsdbFile.h index d6ce33c389..5cc8cc045e 100644 --- a/source/dnode/vnode/src/inc/tsdbFile.h +++ b/source/dnode/vnode/src/inc/tsdbFile.h @@ -335,6 +335,17 @@ typedef struct { SDFile files[TSDB_FILE_MAX]; } SDFileSet; +typedef struct { + int fid; + int8_t state; + uint8_t ver; +#if 0 + SDFInfo info; +#endif + STfsFile f; + TdFilePtr pFile; +} SSFile; // files split by days with fid + #define TSDB_LATEST_FSET_VER 0 #define TSDB_FSET_FID(s) ((s)->fid) diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index 1a5ccbdc21..7fceb580d6 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -19,26 +19,28 @@ typedef struct SSmaStat SSmaStat; // insert/update interface -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData); -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData); +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); // query interface // TODO: This is the basic params, and should wrap the params to a queryHandle. -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult); +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult); // management interface int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg); +int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +#if 0 int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); -int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); +#endif // internal func -static FORCE_INLINE int32_t tsdbEncodeTSmaKey(uint64_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { +static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; - len += taosEncodeFixedU64(pData, tableUid); + len += taosEncodeFixedI64(pData, tableUid); len += taosEncodeFixedU16(pData, colId); len += taosEncodeFixedI64(pData, tsKey); return len; diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index efdb3e0fe4..d9af526c2a 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -227,21 +227,27 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { } int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { - char buf[512] = {0}; // TODO: may overflow - void *pBuf = NULL; + // char buf[512] = {0}; // TODO: may overflow + void *pBuf = NULL, *qBuf = NULL; DBT key1 = {0}, value1 = {0}; { // save sma info - pBuf = buf; + int32_t len = tEncodeTSma(NULL, pSmaCfg); + pBuf = calloc(len, 1); + if (pBuf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } - key1.data = pSmaCfg->indexName; - key1.size = strlen(key1.data); + key1.data = (void *)&pSmaCfg->indexUid; + key1.size = sizeof(pSmaCfg->indexUid); - tEncodeTSma(&pBuf, pSmaCfg); + qBuf = pBuf; + tEncodeTSma(&qBuf, pSmaCfg); - value1.data = buf; - value1.size = POINTER_DISTANCE(pBuf, buf); + value1.data = pBuf; + value1.size = POINTER_DISTANCE(qBuf, pBuf); value1.app_data = pSmaCfg; } @@ -609,7 +615,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { return pTbCfg; } -STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { +STSma *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid) { STSma * pCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; @@ -617,8 +623,8 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { int ret; // Set key/value - key.data = (void *)indexName; - key.size = strlen(indexName); + key.data = (void *)&indexUid; + key.size = sizeof(indexUid); // Query metaDBRLock(pDB); @@ -634,7 +640,10 @@ STSma *metaGetSmaInfoByName(SMeta *pMeta, const char *indexName) { return NULL; } - tDecodeTSma(value.data, pCfg); + if (tDecodeTSma(value.data, pCfg) == NULL) { + tfree(pCfg); + return NULL; + } return pCfg; } @@ -871,7 +880,7 @@ const char *metaSmaCursorNext(SMSmaCursor *pCur) { } } -STSmaWrapper *metaGetSmaInfoByUid(SMeta *pMeta, tb_uid_t uid) { +STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { STSmaWrapper *pSW = NULL; pSW = calloc(sizeof(*pSW), 1); diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 51abcbd9dd..dc0d262725 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -19,22 +19,20 @@ #define SMA_STORAGE_SPLIT_HOURS 24 #define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8 -#define SMA_STORE_SINGLE_BLOCKS // store SMA data by single block or multiple blocks - #define SMA_STATE_HASH_SLOT 4 #define SMA_STATE_ITEM_HASH_SLOT 32 #define SMA_TEST_INDEX_NAME "smaTestIndexName" // TODO: just for test +#define SMA_TEST_INDEX_UID 123456 // TODO: just for test typedef enum { - SMA_STORAGE_LEVEL_TSDB = 0, // store TSma in dir e.g. vnode${N}/tsdb/.tsma - SMA_STORAGE_LEVEL_DFILESET = 1 // store TSma in file e.g. vnode${N}/tsdb/v2f1900.tsma.${sma_index_name} + SMA_STORAGE_LEVEL_TSDB = 0, // use days of self-defined e.g. vnode${N}/tsdb/tsma/sma_index_uid/v2t200.dat + SMA_STORAGE_LEVEL_DFILESET = 1 // use days of TS data e.g. vnode${N}/tsdb/rsma/sma_index_uid/v2r200.dat } ESmaStorageLevel; typedef struct { STsdb * pTsdb; char * pDFile; // TODO: use the real DFile type, not char* int32_t interval; // interval with the precision of DB - int32_t blockSize; // size of SMA block item // TODO } STSmaWriteH; @@ -62,6 +60,7 @@ typedef struct { */ int8_t state; // ETsdbSmaStat SHashObj *expiredWindows; // key: skey of time window, value: N/A + STSma * pSma; } SSmaStatItem; struct SSmaStat { @@ -69,20 +68,18 @@ struct SSmaStat { }; // declaration of static functions -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit); -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t sectionDataLen, int32_t nBlocks); +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen); -static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, - int32_t fid, int32_t *nSmaBlocks); -static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, - int32_t fid); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData); -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin); +static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); + +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat) { ASSERT(pSmaStat != NULL); @@ -133,10 +130,10 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) { // TODO: use taosHashSetFreeFp when taosHashSetFreeFp is ready. SSmaStatItem *item = taosHashIterate(pSmaStat->smaStatItems, NULL); while (item != NULL) { + tfree(item->pSma); taosHashCleanup(item->expiredWindows); item = taosHashIterate(pSmaStat->smaStatItems, item); } - taosHashCleanup(pSmaStat->smaStatItems); free(pSmaStat); } @@ -154,9 +151,13 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { return TSDB_CODE_FAILED; } - tsdbInitSmaStat(&pTsdb->pSmaStat); // lazy mode + // lazy mode + if (tsdbInitSmaStat(&pTsdb->pSmaStat) != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } // TODO: decode the msg => start + int64_t indexUid = SMA_TEST_INDEX_UID; const char * indexName = SMA_TEST_INDEX_NAME; const int32_t SMA_TEST_EXPIRED_WINDOW_SIZE = 10; TSKEY expiredWindows[SMA_TEST_EXPIRED_WINDOW_SIZE]; @@ -169,14 +170,24 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { SHashObj *pItemsHash = pTsdb->pSmaStat->smaStatItems; SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pItemsHash, indexName, strlen(indexName)); - if (!pItem) { + if (pItem == NULL) { pItem = tsdbNewSmaStatItem(TSDB_SMA_STAT_EXPIRED); // TODO use the real state - if (!pItem) { + if (pItem == NULL) { // Response to stream computing: OOM // For query, if the indexName not found, the TSDB should tell query module to query raw TS data. return TSDB_CODE_FAILED; } + // cache smaMeta + STSma *pSma = metaGetSmaInfoByIndex(pTsdb->pMeta, indexUid); + if (pSma == NULL) { + taosHashCleanup(pItem->expiredWindows); + free(pItem); + return TSDB_CODE_FAILED; + } + pItem->pSma = pSma; + + // TODO: change indexName to indexUid if (taosHashPut(pItemsHash, indexName, strnlen(indexName, TSDB_INDEX_NAME_LEN), &pItem, sizeof(pItem)) != 0) { // If error occurs during put smaStatItem, free the resources of pItem taosHashCleanup(pItem->expiredWindows); @@ -195,6 +206,7 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { // 2) This would solve the inconsistency to some extent, but not completely, unless we record all expired // windows failed to put into hash table. taosHashCleanup(pItem->expiredWindows); + tfree(pItem->pSma); taosHashRemove(pItemsHash, indexName, sizeof(indexName)); return TSDB_CODE_FAILED; } @@ -203,19 +215,21 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, char *msg) { return TSDB_CODE_SUCCESS; } -static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, const char *indexName, void *timeWindow) { +static int32_t tsdbResetExpiredWindow(STsdb *pTsdb, int64_t indexUid, TSKEY skey) { SSmaStatItem *pItem = NULL; if (pTsdb->pSmaStat && pTsdb->pSmaStat->smaStatItems) { - pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName)); + pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &indexUid, sizeof(indexUid)); } if (pItem != NULL) { - // TODO: reset time windows for the sma data blocks - while (true) { - TSKEY thisWindow = 0; - taosHashRemove(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)); + // TODO: reset time window for the sma data blocks + if (taosHashRemove(pItem->expiredWindows, &skey, sizeof(TSKEY)) != 0) { + // error handling } + + } else { + // error handling } return TSDB_CODE_SUCCESS; } @@ -277,7 +291,7 @@ static int32_t tsdbJudgeStorageLevel(int64_t interval, int8_t intervalUnit) { */ static int32_t tsdbInsertTSmaBlocks(void *bTree, const char *smaKey, const char *pData, int32_t dataLen) { // TODO: insert sma data blocks into B+Tree - printf("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d\n", + tsdbDebug("insert sma data blocks into B+Tree: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); return TSDB_CODE_SUCCESS; } @@ -360,85 +374,60 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit * @param nBlocks The nBlocks with the same fid since nOffset. * @return int32_t */ -static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaData *pData, int32_t nOffset, int32_t nBlocks) { +static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData) { STsdb *pTsdb = pSmaH->pTsdb; - TASSERT(pData->colIds != NULL); + tsdbDebug("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64, pData->indexUid, pData->skey); - tsdbDebug("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d", nOffset, nBlocks); - printf("tsdbInsertTSmaDataSection: nOffset %d, nBlocks %d\n", nOffset, nBlocks); + // TODO: check the data integrity - int32_t colDataLen = pData->dataLen / pData->numOfColIds; - int32_t sectionDataLen = pSmaH->blockSize * nBlocks; + void *bTree = pSmaH->pDFile; - for (col_id_t i = 0; i < pData->numOfColIds; ++i) { - // param: pointer of B+Tree, key, value, dataLen - void *bTree = pSmaH->pDFile; -#ifndef SMA_STORE_SINGLE_BLOCKS - // save tSma data blocks as a whole - char smaKey[SMA_KEY_LEN] = {0}; - void *pSmaKey = &smaKey; - tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + nOffset * pSmaH->interval, - (void **)&pSmaKey); - if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + nOffset * pSmaH->blockSize, sectionDataLen) < - 0) { - tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + int32_t len = 0; + while (true) { + if (len >= pData->dataLen) { + break; } -#else - // save tSma data blocks separately - for (int32_t n = 0; n < nBlocks; ++n) { - char smaKey[SMA_KEY_LEN] = {0}; - void *pSmaKey = &smaKey; - tsdbEncodeTSmaKey(pData->tableUid, *(pData->colIds + i), pData->tsWindow.skey + (nOffset + n) * pSmaH->interval, - (void **)&pSmaKey); - if (tsdbInsertTSmaBlocks(bTree, smaKey, pData->data + i * colDataLen + (nOffset + n) * pSmaH->blockSize, - pSmaH->blockSize) < 0) { + assert(pData->dataLen > 0); + STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pData->data, len); + + int32_t tbLen = 0; + while (true) { + if (tbLen >= pTbData->dataLen) { + break; + } + assert(pTbData->dataLen > 0); + STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pTbData->data, tbLen); + char smaKey[SMA_KEY_LEN] = {0}; + void * pSmaKey = &smaKey; +#if 0 + printf("tsdbInsertTSmaDataSection: index %" PRIi64 ", skey %" PRIi64 " table[%" PRIi64 "]col[%" PRIu16 "]\n", + pData->indexUid, pData->skey, pTbData->tableUid, pColData->colId); +#endif + tsdbEncodeTSmaKey(pTbData->tableUid, pColData->colId, pData->skey, (void **)&pSmaKey); + if (tsdbInsertTSmaBlocks(bTree, smaKey, pColData->data, pColData->blockSize) < 0) { tsdbWarn("vgId:%d insert tSma blocks failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } + tbLen += (sizeof(STSmaColData) + pColData->blockSize); } -#endif + len += (sizeof(STSmaTbData) + pTbData->dataLen); } + return TSDB_CODE_SUCCESS; } -static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { +static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); - // pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); + pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); } -static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t storageLevel, - int32_t fid) { +static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { // TODO pSmaH->pDFile = "tSma_interval_file_name"; return TSDB_CODE_SUCCESS; -} /** - * @brief Split the sma data blocks by fid. - * - * @param pSmaH - * @param param - * @param pData - * @param nOffset - * @param fid - * @param nSmaBlocks - * @return int32_t - */ -static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pData, int32_t days, int32_t nOffset, - int32_t fid, int32_t *nSmaBlocks) { - STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); +} - // TODO: use binary search - for (int32_t n = nOffset + 1; n < pData->numOfBlocks; ++n) { - // TODO: The tsWindow.skey should use the precision of DB. - int32_t tFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey + pSmaH->interval * n, days, pCfg->precision)); - if (tFid > fid) { - *nSmaBlocks = n - nOffset; - break; - } - } - return TSDB_CODE_SUCCESS; -} /** * @brief Insert/Update Time-range-wise SMA data. @@ -449,124 +438,81 @@ static int32_t tsdbTSmaDataSplit(STSmaWriteH *pSmaH, STSma *param, STSmaData *pD * - The destination file of one data block for some interval is determined by its start TS key. * * @param pTsdb - * @param param - * @param pData + * @param msg * @return int32_t */ -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData) { +int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); - STSmaData * curData = pData; + STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaWriteH tSmaH = {0}; - tsdbInitTSmaWriteH(&tSmaH, pTsdb, param, pData); + tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); - if (pData->numOfBlocks <= 0 || pData->numOfColIds <= 0 || pData->dataLen <= 0) { + if (pData->dataLen <= 0) { TASSERT(0); terrno = TSDB_CODE_INVALID_PARA; return terrno; } // Step 1: Judge the storage level - int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); + int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // - Set and open the DFile or the B+Tree file - int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, daysPerFile, pCfg->precision)); - int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, daysPerFile, pCfg->precision)); + int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); - if (minFid == maxFid) { - // Save all the TSma data to one file - // TODO: tsdbStartTSmaCommit(); - tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, minFid); - tsdbInsertTSmaDataSection(&tSmaH, pData, 0, pData->numOfBlocks); - // TODO:tsdbEndTSmaCommit(); - } else if (minFid < maxFid) { - // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files - // actually. - // TODO: tsdbStartTSmaCommit(); - int32_t tFid = minFid; - int32_t nOffset = 0; - int32_t nSmaBlocks = 0; - do { - tsdbTSmaDataSplit(&tSmaH, param, pData, daysPerFile, nOffset, tFid, &nSmaBlocks); - tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); - if (tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, nSmaBlocks) < 0) { - return terrno; - } - - ++tFid; - nOffset += nSmaBlocks; - - if (tFid == maxFid) { - tsdbSetTSmaDataFile(&tSmaH, param, pData, storageLevel, tFid); - tsdbInsertTSmaDataSection(&tSmaH, pData, nOffset, pData->numOfBlocks - nOffset); - break; - } - } while (true); - - // TODO:tsdbEndTSmaCommit(); - } else { - terrno = TSDB_CODE_INVALID_PARA; - return terrno; - } + // Save all the TSma data to one file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbInsertTSmaDataSection(&tSmaH, pData); + // TODO:tsdbEndTSmaCommit(); // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, param->indexName, &pData->tsWindow); + tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); return TSDB_CODE_SUCCESS; } -static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, SRSma *param, STSmaData *pData, int32_t fid) { +static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t fid) { // TODO pSmaH->pDFile = "rSma_interval_file_name"; return TSDB_CODE_SUCCESS; } -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { +int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); - STSma * tParam = ¶m->tsma; - STSmaData * curData = pData; + STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaWriteH tSmaH = {0}; - tsdbInitTSmaWriteH(&tSmaH, pTsdb, tParam, pData); + tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData); - int32_t nSmaBlocks = pData->numOfBlocks; - int32_t colDataLen = pData->dataLen / nSmaBlocks; - - // Step 2.2: Storage of SMA_STORAGE_LEVEL_DFILESET - // TODO: Use the daysPerFile for rSma data, not for TS data. - // TODO: The lifecycle of rSma data should be processed like the TS data files. - int32_t minFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.skey, pCfg->daysPerFile, pCfg->precision)); - int32_t maxFid = (int32_t)(TSDB_KEY_FID(pData->tsWindow.ekey, pCfg->daysPerFile, pCfg->precision)); - - if (minFid == maxFid) { - // Save all the TSma data to one file - tsdbSetRSmaDataFile(&tSmaH, param, pData, minFid); - // TODO: tsdbStartTSmaCommit(); - tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); - // TODO:tsdbEndTSmaCommit(); - } else if (minFid < maxFid) { - // Split the TSma data and save to multiple files. As there is limit for the span, it can't span more than 2 files - // actually. - // TODO: tsdbStartTSmaCommit(); - int32_t tmpFid = 0; - int32_t step = 0; - for (int32_t n = 0; n < pData->numOfBlocks; ++n) { - } - tsdbInsertTSmaDataSection(&tSmaH, pData, colDataLen, nSmaBlocks); - // TODO:tsdbEndTSmaCommit(); - } else { + if (pData->dataLen <= 0) { TASSERT(0); - return TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_INVALID_PARA; + return terrno; } - // reset the SSmaStat - tsdbResetExpiredWindow(pTsdb, param->tsma.indexName, &pData->tsWindow); + // Step 1: Judge the storage level + int32_t storageLevel = tsdbJudgeStorageLevel(pData->interval, pData->intervalUnit); + int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : pCfg->daysPerFile; + + // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file + // - Set and open the DFile or the B+Tree file + + int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); + + // Save all the TSma data to one file + // TODO: tsdbStartTSmaCommit(); + tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); + tsdbInsertTSmaDataSection(&tSmaH, pData); + // TODO:tsdbEndTSmaCommit(); + + // reset the SSmaStat + tsdbResetExpiredWindow(pTsdb, pData->indexUid, pData->skey); - // Step 4: finish return TSDB_CODE_SUCCESS; } @@ -579,9 +525,9 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, SRSma *param, STSmaData *pData) { * @param pData * @return int32_t */ -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, STSmaData *pData) { +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(param->interval, param->intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); // pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); } @@ -593,8 +539,8 @@ static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSma *param, * @param queryWin * @return int32_t */ -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { - int32_t storageLevel = tsdbJudgeStorageLevel(param->interval, param->intervalUnit); +static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { + int32_t storageLevel = 0; //tsdbJudgeStorageLevel(param->interval, param->intervalUnit); int32_t daysPerFile = storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; pReadH->storageLevel = storageLevel; @@ -611,8 +557,8 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *q * @return true * @return false */ -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow *queryWin) { - SArray *smaFs = pReadH->pTsdb->fs->cstatus->smaf; +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { + SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf; int32_t nSmaFs = taosArrayGetSize(smaFs); pReadH->pDFile = NULL; @@ -646,10 +592,9 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STSma *param, STimeWindow * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @return int32_t */ -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeWindow *queryWin, int32_t nMaxResult) { - const char *indexName = param->indexName; - - SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, indexName, strlen(indexName)); +int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { + SSmaStatItem *pItem = + (SSmaStatItem *)taosHashGet(pTsdb->pSmaStat->smaStatItems, &pData->indexUid, sizeof(pData->indexUid)); if (pItem == NULL) { // mark all window as expired and notify query module to query raw TS data. return TSDB_CODE_SUCCESS; @@ -664,9 +609,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW } STSmaReadH tReadH = {0}; - tsdbInitTSmaReadH(&tReadH, pTsdb, param, pData); + tsdbInitTSmaReadH(&tReadH, pTsdb, pData); - tsdbInitTSmaFile(&tReadH, param, queryWin); + tsdbInitTSmaFile(&tReadH, queryWin); int32_t nResult = 0; int64_t lastKey = 0; @@ -677,7 +622,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW } // set and open the file according to the STSma param - if (tsdbSetAndOpenTSmaFile(&tReadH, param, queryWin)) { + if (tsdbSetAndOpenTSmaFile(&tReadH, queryWin)) { char bTree[100] = "\0"; while (strncmp(bTree, "has more nodes", 100) == 0) { if (nResult >= nMaxResult) { @@ -694,6 +639,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW return TSDB_CODE_SUCCESS; } +#if 0 /** * @brief Get the start TS key of the last data block of one interval/sliding. * @@ -704,7 +650,7 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSma *param, STSmaData *pData, STimeW * 1) Return 0 and fill the result if the check procedure is normal; * 2) Return -1 if error occurs during the check procedure. */ -int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { +int32_t tsdbGetTSmaStatus(STsdb *pTsdb, void *smaIndex, void *result) { const char *procedure = ""; if (strncmp(procedure, "get the start TS key of the last data block", 100) != 0) { return -1; @@ -721,9 +667,10 @@ int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result) { * @param pWin * @return int32_t */ -int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin) { +int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) { // for ("tSmaFiles of param-interval-sliding between pWin") { // // remove the tSmaFile // } return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index ba8eea809e..26d31af4f3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -39,13 +39,13 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { * * @param pTsdb * @param param - * @param pData + * @param msg * @return int32_t * TODO: Who is responsible for resource allocate and release? */ -int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { +int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbInsertTSmaDataImpl(pTsdb, param, pData)) < 0) { + if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) { tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; @@ -56,12 +56,12 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, STSma *param, STSmaData *pData) { * * @param pTsdb * @param param - * @param pData + * @param msg * @return int32_t */ -int32_t tsdbInsertRSmaData(STsdb *pTsdb, SRSma *param, STSmaData *pData) { +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { int32_t code = TSDB_CODE_SUCCESS; - if ((code = tsdbInsertRSmaDataImpl(pTsdb, param, pData)) < 0) { + if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) { tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index 5a5c5e1530..ac9a8fd3d0 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -43,20 +43,8 @@ TEST(testCase, tSmaEncodeDecodeTest) { tSma.sliding = 0; tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN); tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN); + tSma.indexUid = 2345678910; tSma.tableUid = 1234567890; - tSma.nFuncColIds = 5; - tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds)); - ASSERT(tSma.funcColIds != NULL); - for (int32_t n = 0; n < tSma.nFuncColIds; ++n) { - SFuncColIds *funcColIds = tSma.funcColIds + n; - funcColIds->funcId = n; - funcColIds->nColIds = 10; - funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t)); - ASSERT(funcColIds->colIds != NULL); - for (int32_t i = 0; i < funcColIds->nColIds; ++i) { - *(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); - } - } STSmaWrapper tSmaWrapper = {.number = 1, .tSma = &tSma}; uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper); @@ -85,35 +73,31 @@ TEST(testCase, tSmaEncodeDecodeTest) { EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit); EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName); EXPECT_STRCASEEQ(pSma->timezone, qSma->timezone); - EXPECT_EQ(pSma->nFuncColIds, qSma->nFuncColIds); + EXPECT_EQ(pSma->indexUid, qSma->indexUid); EXPECT_EQ(pSma->tableUid, qSma->tableUid); EXPECT_EQ(pSma->interval, qSma->interval); EXPECT_EQ(pSma->sliding, qSma->sliding); + EXPECT_EQ(pSma->exprLen, qSma->exprLen); + EXPECT_STRCASEEQ(pSma->expr, qSma->expr); EXPECT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen); EXPECT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter); - for (uint32_t j = 0; j < pSma->nFuncColIds; ++j) { - SFuncColIds *pFuncColIds = pSma->funcColIds + j; - SFuncColIds *qFuncColIds = qSma->funcColIds + j; - EXPECT_EQ(pFuncColIds->funcId, qFuncColIds->funcId); - EXPECT_EQ(pFuncColIds->nColIds, qFuncColIds->nColIds); - for (uint32_t k = 0; k < pFuncColIds->nColIds; ++k) { - EXPECT_EQ(*(pFuncColIds->colIds + k), *(qFuncColIds->colIds + k)); - } - } } // resource release tdDestroyTSma(&tSma); tdDestroyTSmaWrapper(&dstTSmaWrapper); } - +#if 1 TEST(testCase, tSma_DB_Put_Get_Del_Test) { const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName2 = "sma_index_test_2"; - const char * timeZone = "Asia/Shanghai"; + const char * timezone = "Asia/Shanghai"; + const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; const char * tagsFilter = "I'm tags filter"; const char * smaTestDir = "./smaTest"; - const uint64_t tbUid = 1234567890; + const tb_uid_t tbUid = 1234567890; + const int64_t indexUid1 = 2000000001; + const int64_t indexUid2 = 2000000002; const uint32_t nCntTSma = 2; // encode STSma tSma = {0}; @@ -122,22 +106,15 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { tSma.interval = 1; tSma.slidingUnit = TD_TIME_UNIT_HOUR; tSma.sliding = 0; + tSma.indexUid = indexUid1; tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN); - tstrncpy(tSma.timezone, timeZone, TD_TIMEZONE_LEN); + tstrncpy(tSma.timezone, timezone, TD_TIMEZONE_LEN); tSma.tableUid = tbUid; - tSma.nFuncColIds = 5; - tSma.funcColIds = (SFuncColIds *)calloc(tSma.nFuncColIds, sizeof(SFuncColIds)); - ASSERT(tSma.funcColIds != NULL); - for (int32_t n = 0; n < tSma.nFuncColIds; ++n) { - SFuncColIds *funcColIds = tSma.funcColIds + n; - funcColIds->funcId = n; - funcColIds->nColIds = 10; - funcColIds->colIds = (col_id_t *)calloc(funcColIds->nColIds, sizeof(col_id_t)); - ASSERT(funcColIds->colIds != NULL); - for (int32_t i = 0; i < funcColIds->nColIds; ++i) { - *(funcColIds->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); - } - } + + tSma.exprLen = strlen(expr); + tSma.expr = (char *)calloc(tSma.exprLen + 1, 1); + tstrncpy(tSma.expr, expr, tSma.exprLen + 1); + tSma.tagsFilterLen = strlen(tagsFilter); tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1); tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1); @@ -151,8 +128,9 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { pMeta = metaOpen(smaTestDir, pMetaCfg, NULL); assert(pMeta != NULL); // save index 1 - metaSaveSmaToDB(pMeta, pSmaCfg); + EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); + pSmaCfg->indexUid = indexUid2; tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN); pSmaCfg->version = 1; pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR; @@ -161,24 +139,26 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { pSmaCfg->sliding = 5; // save index 2 - metaSaveSmaToDB(pMeta, pSmaCfg); + EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); // get value by indexName STSma *qSmaCfg = NULL; - qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName1); + qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid1); assert(qSmaCfg != NULL); printf("name1 = %s\n", qSmaCfg->indexName); printf("timezone1 = %s\n", qSmaCfg->timezone); + printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : ""); printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1); EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid); tdDestroyTSma(qSmaCfg); tfree(qSmaCfg); - qSmaCfg = metaGetSmaInfoByName(pMeta, smaIndexName2); + qSmaCfg = metaGetSmaInfoByIndex(pMeta, indexUid2); assert(qSmaCfg != NULL); printf("name2 = %s\n", qSmaCfg->indexName); printf("timezone2 = %s\n", qSmaCfg->timezone); + printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : ""); printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : ""); EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2); EXPECT_EQ(qSmaCfg->interval, tSma.interval); @@ -201,17 +181,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { metaCloseSmaCurosr(pSmaCur); // get wrapper by table uid - STSmaWrapper *pSW = metaGetSmaInfoByUid(pMeta, tbUid); + STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid); assert(pSW != NULL); EXPECT_EQ(pSW->number, nCntTSma); EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1); - EXPECT_STRCASEEQ(pSW->tSma->timezone, timeZone); + EXPECT_STRCASEEQ(pSW->tSma->timezone, timezone); + EXPECT_STRCASEEQ(pSW->tSma->expr, expr); EXPECT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter); - EXPECT_EQ(pSW->tSma->tableUid, tSma.tableUid); + EXPECT_EQ(pSW->tSma->indexUid, indexUid1); + EXPECT_EQ(pSW->tSma->tableUid, tbUid); EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2); - EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timeZone); + EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone); + EXPECT_STRCASEEQ((pSW->tSma + 1)->expr, expr); EXPECT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter); - EXPECT_EQ((pSW->tSma + 1)->tableUid, tSma.tableUid); + EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2); + EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid); tdDestroyTSmaWrapper(pSW); tfree(pSW); @@ -233,44 +217,68 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { tdDestroyTSma(&tSma); metaClose(pMeta); } +#endif -#if 0 +#if 1 TEST(testCase, tSmaInsertTest) { - STSma tSma = {0}; - STSmaData *pSmaData = NULL; - STsdb tsdb = {0}; + const int64_t indexUid = 2000000002; + STSmaDataWrapper *pSmaData = NULL; + STsdb tsdb = {0}; + STsdbCfg * pCfg = &tsdb.config; + + pCfg->daysPerFile = 1; // init - tSma.intervalUnit = TD_TIME_UNIT_DAY; - tSma.interval = 1; - tSma.numOfFuncIds = 5; // sum/min/max/avg/last + int32_t allocCnt = 0; + int32_t allocStep = 40960; + int32_t buffer = 4096; + void * buf = NULL; + EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); + int32_t bufSize = taosTSizeof(buf); + int32_t numOfTables = 25; + col_id_t numOfCols = 4096; + EXPECT_GT(numOfCols, 0); - int32_t blockSize = tSma.numOfFuncIds * sizeof(int64_t); - int32_t numOfColIds = 3; - int32_t numOfBlocks = 10; + pSmaData = (STSmaDataWrapper *)buf; + printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); + pSmaData->skey = 1646987196; + pSmaData->interval = 10; + pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; + pSmaData->indexUid = indexUid; - int32_t dataLen = numOfColIds * numOfBlocks * blockSize; + int32_t len = sizeof(STSmaDataWrapper); + for (int32_t t = 0; t < numOfTables; ++t) { + STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); + pTbData->tableUid = t; - pSmaData = (STSmaData *)malloc(sizeof(STSmaData) + dataLen); - ASSERT_EQ(pSmaData != NULL, true); - pSmaData->tableUid = 3232329230; - pSmaData->numOfColIds = numOfColIds; - pSmaData->numOfBlocks = numOfBlocks; - pSmaData->dataLen = dataLen; - pSmaData->tsWindow.skey = 1640000000; - pSmaData->tsWindow.ekey = 1645788649; - pSmaData->colIds = (col_id_t *)malloc(sizeof(col_id_t) * numOfColIds); - ASSERT_EQ(pSmaData->colIds != NULL, true); - - for (int32_t i = 0; i < numOfColIds; ++i) { - *(pSmaData->colIds + i) = (i + PRIMARYKEY_TIMESTAMP_COL_ID); + int32_t tableDataLen = sizeof(STSmaTbData); + for (col_id_t c = 0; c < numOfCols; ++c) { + if (bufSize - len - tableDataLen < buffer) { + EXPECT_EQ(tsdbMakeRoom(&buf, bufSize + allocStep), 0); + pSmaData = (STSmaDataWrapper *)buf; + pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); + bufSize = taosTSizeof(buf); + printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); + } + STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen); + pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID; + pColData->blockSize = ((c & 1) == 0) ? 8 : 16; + // TODO: fill col data + tableDataLen += (sizeof(STSmaColData) + pColData->blockSize); + } + pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); + len += tableDataLen; + // printf("bufSize=%d, len=%d, len of table[%d]=%d\n", bufSize, len, t, tableDataLen); } + pSmaData->dataLen = (len - sizeof(STSmaDataWrapper)); + + EXPECT_GE(bufSize, pSmaData->dataLen); // execute - EXPECT_EQ(tsdbInsertTSmaData(&tsdb, &tSma, pSmaData), TSDB_CODE_SUCCESS); + EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); // release - tdDestroySmaData(pSmaData); + taosTZfree(buf); } #endif diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index fb926610bd..b565a512f3 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -265,7 +265,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) { return NULL; } #if FILE_WITH_LOCK - pthread_rwlock_init(&(pFile->rwlock),NULL); + pthread_rwlock_init(&(pFile->rwlock), NULL); #endif pFile->fd = fd; pFile->fp = fp;