diff --git a/source/dnode/vnode/inc/tsdb.h b/source/dnode/vnode/inc/tsdb.h index 25bac86f71..87edfb8dde 100644 --- a/source/dnode/vnode/inc/tsdb.h +++ b/source/dnode/vnode/inc/tsdb.h @@ -95,7 +95,6 @@ int tsdbCommit(STsdb *pTsdb); * @return int32_t */ int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg); - int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); /** @@ -107,6 +106,12 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg); */ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); +// TODO: This is the basic params, and should wrap the params to a queryHandle. +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult); + + // STsdbCfg int tsdbOptionsInit(STsdbCfg *); void tsdbOptionsClear(STsdbCfg *); diff --git a/source/dnode/vnode/src/inc/tsdbDBDef.h b/source/dnode/vnode/src/inc/tsdbDBDef.h index 7740dd0fab..2e37b0ba45 100644 --- a/source/dnode/vnode/src/inc/tsdbDBDef.h +++ b/source/dnode/vnode/src/inc/tsdbDBDef.h @@ -35,6 +35,7 @@ void tsdbCloseDBF(SDBFile* pDBF); int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path); void tsdbCloseBDBEnv(DB_ENV* pEnv); int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize); +void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index c54fdf85a3..649b5a2d47 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -27,29 +27,18 @@ struct SSmaEnv { }; #define SMA_ENV_LOCK(env) ((env)->lock) +#define SMA_ENV_ENV(env) ((env)->dbEnv) #define SMA_ENV_PATH(env) ((env)->path) #define SMA_ENV_STAT(env) ((env)->pStat) #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) -// insert/update interface -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); - -// query interface -// TODO: This is the basic params, and should wrap the params to a queryHandle. -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult); - -// management interface -int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); -void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); -void * tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); +void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); +void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); #if 0 int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result); int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin); #endif - - // internal func static FORCE_INLINE int32_t tsdbEncodeTSmaKey(tb_uid_t tableUid, col_id_t colId, TSKEY tsKey, void **pData) { int32_t len = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c index 4fc415cfd1..cf3351c5d8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -29,7 +29,7 @@ static void tsdbCloseBDBDb(DB *pDB); #define BDB_PERR(info, code) fprintf(stderr, "%s:%d " info " reason: %s\n", __FILE__, __LINE__, db_strerror(code)) -int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { +int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { // TDBEnv is shared by a group of SDBFile if (!pEnv) { terrno = TSDB_CODE_INVALID_PTR; @@ -46,18 +46,12 @@ int tsdbOpenDBF(TDBEnv pEnv, SDBFile *pDBF) { return 0; } -static void *tsdbFreeDBF(SDBFile *pDBF) { - if (pDBF) { - free(pDBF); - } - return NULL; -} - void tsdbCloseDBF(SDBFile *pDBF) { if (pDBF->pDB) { tsdbCloseBDBDb(pDBF->pDB); - pDBF->pDB = tsdbFreeDBF(pDBF); + pDBF->pDB = NULL; } + tfree(pDBF->path); } int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) { @@ -145,4 +139,35 @@ int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, // TODO: unlock return 0; +} + +void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) { + void *result = NULL; + DBT key1 = {0}; + DBT value1 = {0}; + int ret; + + // Set key/value + key1.data = key; + key1.size = keySize; + + // Query + // TODO: lock + ret = pDBF->pDB->get(pDBF->pDB, NULL, &key1, &value1, 0); + // TODO: unlock + if (ret != 0) { + return NULL; + } + + result = calloc(1, value1.size); + + if (result == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + *valueSize = value1.size; + memcpy(result, value1.data, value1.size); + + return result; } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index f96ea644b4..0dbcb29f80 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -15,6 +15,7 @@ #include "tsdbDef.h" +#undef SMA_PRINT_DEBUG_LOG #define SMA_STORAGE_TSDB_DAYS 30 #define SMA_STORAGE_TSDB_TIMES 30 #define SMA_STORAGE_SPLIT_HOURS 24 @@ -31,24 +32,23 @@ typedef enum { } ESmaStorageLevel; typedef struct { - STsdb * pTsdb; - SDBFile *pDFile; - int32_t interval; // interval with the precision of DB - // TODO + STsdb * pTsdb; + SDBFile dFile; + int32_t interval; // interval with the precision of DB } STSmaWriteH; typedef struct { int32_t iter; + int32_t fid; } SmaFsIter; typedef struct { STsdb * pTsdb; - char * pDFile; // TODO: use the real DFile type, not char* + SDBFile dFile; int32_t interval; // interval with the precision of DB int32_t blockSize; // size of SMA block item int8_t storageLevel; int8_t days; SmaFsIter smaFsIter; - // TODO } STSmaReadH; typedef struct { @@ -69,23 +69,30 @@ struct SSmaStat { }; // declaration of static functions +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg); +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg); +// TODO: This is the basic params, and should wrap the params to a queryHandle. +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult); +static int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg); + static int32_t tsdbInitSmaStat(SSmaStat **pSmaStat); static int32_t tsdbDestroySmaState(SSmaStat *pSmaStat); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path); static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv); static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit); static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit); static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *pData); -static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); +static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen); static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision); -static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel); +static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel); static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid); -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData); -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin); +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey); +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey); static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) { SSmaEnv *pEnv = NULL; @@ -408,25 +415,35 @@ static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) { } /** - * @brief Insert TSma data blocks to B+Tree + * @brief Insert TSma data blocks to DB File build by B+Tree * - * @param bTree + * @param pSmaH * @param smaKey + * @param keyLen * @param pData * @param dataLen * @return int32_t */ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t keyLen, void *pData, uint32_t dataLen) { - SDBFile *pDBFile = pSmaH->pDFile; + SDBFile *pDBFile = &pSmaH->dFile; // TODO: insert sma data blocks into B+Tree - tsdbDebug("insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", pDBFile->path, - *(uint64_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); + tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d", + REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen); - if(tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0){ + if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) { return TSDB_CODE_FAILED; } +#ifdef SMA_PRINT_DEBUG_LOG + uint32_t valueSize = 0; + void * data = tsdbGetSmaDataByKey(pDBFile, smaKey, keyLen, &valueSize); + ASSERT(data != NULL); + for (uint32_t v = 0; v < valueSize; v += 8) { + tsdbWarn("vgId:%d sma data - val[%d] is %" PRIi64, REPO_ID(pSmaH->pTsdb), v, *(int64_t *)POINTER_SHIFT(data, v)); + } +#endif return TSDB_CODE_SUCCESS; } @@ -458,41 +475,41 @@ static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit } } - switch (intervalUnit) { - case TD_TIME_UNIT_MILLISEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { - return interval; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval * 1e3; - } else { // nano second - return interval * 1e6; - } - break; - case TD_TIME_UNIT_MICROSEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { + switch (precision) { + case TSDB_TIME_PRECISION_MILLI: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us return interval / 1e3; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval; - } else { // nano second - return interval * 1e3; - } - break; - case TD_TIME_UNIT_NANOSEC: - if (TSDB_TIME_PRECISION_MILLI == precision) { + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second return interval / 1e6; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { - return interval / 1e3; - } else { // nano second + } else { return interval; } break; - default: - if (TSDB_TIME_PRECISION_MILLI == precision) { + case TSDB_TIME_PRECISION_MICRO: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us + return interval; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval / 1e3; + } else { return interval * 1e3; - } else if (TSDB_TIME_PRECISION_MICRO == precision) { + } + break; + case TSDB_TIME_PRECISION_NANO: + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { + return interval * 1e3; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval; + } else { return interval * 1e6; - } else { // nano second - return interval * 1e9; + } + break; + default: // ms + if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us + return interval / 1e3; + } else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second + return interval / 1e6; + } else { + return interval; } break; } @@ -551,37 +568,38 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { pSmaH->pTsdb = pTsdb; pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); - pSmaH->pDFile = (SDBFile *)calloc(1, sizeof(SDBFile *)); - if (!pSmaH->pDFile) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } return TSDB_CODE_SUCCESS; } static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH) { if (pSmaH) { - if (pSmaH->pDFile) { - tsdbCloseDBF(pSmaH->pDFile); - } + tsdbCloseDBF(&pSmaH->dFile); } } static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid) { STsdb *pTsdb = pSmaH->pTsdb; - ASSERT(pSmaH->pDFile->path == NULL && pSmaH->pDFile->pDB == NULL); + ASSERT(pSmaH->dFile.path == NULL && pSmaH->dFile.pDB == NULL); char tSmaFile[TSDB_FILENAME_LEN] = {0}; snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pTsdb), fid); - pSmaH->pDFile->path = strdup(tSmaFile); + pSmaH->dFile.path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; } -static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { - STsdbCfg *pCfg = REPO_CFG(pSmaH->pTsdb); +/** + * @brief + * + * @param pTsdb + * @param interval Interval calculated by DB's precision + * @param storageLevel + * @return int32_t + */ +static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLevel) { + STsdbCfg *pCfg = REPO_CFG(pTsdb); int32_t daysPerFile = pCfg->daysPerFile; if (storageLevel == SMA_STORAGE_LEVEL_TSDB) { - int32_t days = 30 * (pSmaH->interval / tsTickPerDay[pCfg->precision]); + int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerDay[pCfg->precision]); daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS; } @@ -600,7 +618,7 @@ static int32_t tsdbGetTSmaDays(STSmaWriteH *pSmaH, int32_t storageLevel) { * @param msg * @return int32_t */ -int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { +static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; @@ -624,16 +642,16 @@ int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) { // Step 1: Judge the storage level and days int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit); - int32_t daysPerFile = tsdbGetTSmaDays(&tSmaH, storageLevel); + int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel); int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision)); // Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file // - Set and open the DFile or the B+Tree file // TODO: tsdbStartTSmaCommit(); tsdbSetTSmaDataFile(&tSmaH, pData, storageLevel, fid); - if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, tSmaH.pDFile) != 0) { + if (tsdbOpenDBF(pTsdb->pTSmaEnv->dbEnv, &tSmaH.dFile) != 0) { tsdbWarn("vgId:%d open DB file %s failed since %s", REPO_ID(pTsdb), - tSmaH.pDFile->path ? tSmaH.pDFile->path : "path is NULL", tstrerror(terrno)); + tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno)); tsdbDestroyTSmaWriteH(&tSmaH); return TSDB_CODE_FAILED; } @@ -657,12 +675,12 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, char tSmaFile[TSDB_FILENAME_LEN] = {0}; snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.rsma", REPO_ID(pTsdb), fid); - pSmaH->pDFile->path = strdup(tSmaFile); + pSmaH->dFile.path = strdup(tSmaFile); return TSDB_CODE_SUCCESS; } -int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { +static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { STsdbCfg * pCfg = REPO_CFG(pTsdb); STSmaDataWrapper *pData = (STSmaDataWrapper *)msg; STSmaWriteH tSmaH = {0}; @@ -698,35 +716,35 @@ int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) { } /** - * @brief Init of tSma ReadH + * @brief * * @param pSmaH * @param pTsdb - * @param param - * @param pData + * @param interval + * @param intervalUnit * @return int32_t */ -static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) { +static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit) { pSmaH->pTsdb = pTsdb; - pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision); - // pSmaH->blockSize = param->numOfFuncIds * sizeof(int64_t); + pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision); + pSmaH->storageLevel = tsdbGetSmaStorageLevel(interval, intervalUnit); + pSmaH->days = tsdbGetTSmaDays(pTsdb, pSmaH->interval, pSmaH->storageLevel); } /** * @brief Init of tSma FS * * @param pReadH - * @param param - * @param queryWin + * @param skey * @return int32_t */ -static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { - int32_t storageLevel = 0; // tsdbGetSmaStorageLevel(param->interval, param->intervalUnit); - int32_t daysPerFile = - storageLevel == SMA_STORAGE_LEVEL_TSDB ? SMA_STORAGE_TSDB_DAYS : REPO_CFG(pReadH->pTsdb)->daysPerFile; - pReadH->storageLevel = storageLevel; - pReadH->days = daysPerFile; - pReadH->smaFsIter.iter = 0; +static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey) { + int32_t fid = (int32_t)(TSDB_KEY_FID(skey, pSmaH->days, REPO_CFG(pSmaH->pTsdb)->precision)); + char tSmaFile[TSDB_FILENAME_LEN] = {0}; + snprintf(tSmaFile, TSDB_FILENAME_LEN, "v%df%d.tsma", REPO_ID(pSmaH->pTsdb), fid); + pSmaH->dFile.path = strdup(tSmaFile); + pSmaH->smaFsIter.iter = 0; + pSmaH->smaFsIter.fid = fid; } /** @@ -738,17 +756,18 @@ static int32_t tsdbInitTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { * @return true * @return false */ -static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { +static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) { SArray *smaFs = pReadH->pTsdb->fs->cstatus->sf; int32_t nSmaFs = taosArrayGetSize(smaFs); - pReadH->pDFile = NULL; + tsdbCloseDBF(&pReadH->dFile); +#if 0 while (pReadH->smaFsIter.iter < nSmaFs) { void *pSmaFile = taosArrayGet(smaFs, pReadH->smaFsIter.iter); if (pSmaFile) { // match(indexName, queryWindow) // TODO: select the file by index_name ... - pReadH->pDFile = pSmaFile; + pReadH->dFile = pSmaFile; ++pReadH->smaFsIter.iter; break; } @@ -759,41 +778,82 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, STimeWindow *queryWin) { tsdbDebug("vg%d: smaFile %s matched", REPO_ID(pReadH->pTsdb), "[pSmaFile dir]"); return true; } +#endif return false; } /** - * @brief Return the data between queryWin and fill the pData. + * @brief * - * @param pTsdb - * @param param + * @param pTsdb Return the data between queryWin and fill the pData. * @param pData - * @param queryWin + * @param indexUid + * @param interval + * @param intervalUnit + * @param tableUid + * @param colId + * @param pQuerySKey * @param nMaxResult The query invoker should control the nMaxResult need to return to avoid OOM. * @return int32_t */ -int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow *queryWin, int32_t nMaxResult) { - SSmaStatItem *pItem = - (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &pData->indexUid, sizeof(pData->indexUid)); +static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, + int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, + int32_t nMaxResult) { + SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid)); if (pItem == NULL) { // mark all window as expired and notify query module to query raw TS data. return TSDB_CODE_SUCCESS; } - int32_t nQueryWin = 0; +#if 0 + int32_t nQueryWin = taosArrayGetSize(pQuerySKey); for (int32_t n = 0; n < nQueryWin; ++n) { - TSKEY thisWindow = n; - if (taosHashGet(pItem->expiredWindows, &thisWindow, sizeof(thisWindow)) != NULL) { + TSKEY skey = taosArrayGet(pQuerySKey, n); + if (taosHashGet(pItem->expiredWindows, &skey, sizeof(TSKEY)) != NULL) { // TODO: mark this window as expired. } } - +#endif +#if 0 + if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) { + // TODO: mark this window as expired. + } +#endif STSmaReadH tReadH = {0}; - tsdbInitTSmaReadH(&tReadH, pTsdb, pData); + tsdbInitTSmaReadH(&tReadH, pTsdb, interval, intervalUnit); + tsdbCloseDBF(&tReadH.dFile); - tsdbInitTSmaFile(&tReadH, queryWin); + tsdbInitTSmaFile(&tReadH, querySkey); + if (tsdbOpenDBF(SMA_ENV_ENV(pTsdb->pTSmaEnv), &tReadH.dFile) != 0) { + tsdbWarn("vgId:%d open DBF %s failed since %s", REPO_ID(pTsdb), tReadH.dFile.path, tstrerror(terrno)); + return TSDB_CODE_FAILED; + } + char smaKey[SMA_KEY_LEN] = {0}; + void *pSmaKey = &smaKey; + tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey); + + tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb), + tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN); + + void * result = NULL; + uint32_t valueSize = 0; + if ((result = tsdbGetSmaDataByKey(&tReadH.dFile, smaKey, SMA_KEY_LEN, &valueSize)) == NULL) { + tsdbWarn("vgId:%d get sma data failed from smaIndex %" PRIi64 ", smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 + " since %s", + REPO_ID(pTsdb), indexUid, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8), + *(int64_t *)POINTER_SHIFT(smaKey, 10), tstrerror(terrno)); + tsdbCloseDBF(&tReadH.dFile); + return TSDB_CODE_FAILED; + } +#ifdef SMA_PRINT_DEBUG_LOG + for (uint32_t v = 0; v < valueSize; v += 8) { + tsdbWarn("vgId:%d v[%d]=%" PRIi64, REPO_ID(pTsdb), v, *(int64_t *)POINTER_SHIFT(result, v)); + } +#endif +#if 0 int32_t nResult = 0; int64_t lastKey = 0; @@ -815,8 +875,9 @@ int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, STimeWindow * } } } - +#endif // read data from file and fill the result + tsdbCloseDBF(&tReadH.dFile); return TSDB_CODE_SUCCESS; } @@ -854,4 +915,55 @@ int32_t tsdbRemoveTSmaData(STsdb *pTsdb, void *smaIndex, STimeWindow *pWin) { // } return TSDB_CODE_SUCCESS; } -#endif \ No newline at end of file +#endif + +/** + * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine + * + * @param pTsdb + * @param param + * @param msg + * @return int32_t + * TODO: Who is responsible for resource allocate and release? + */ +int32_t tsdbInsertTSmaData(STsdb *pTsdb, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertTSmaDataImpl(pTsdb, msg)) < 0) { + tsdbWarn("vgId:%d insert tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbUpdateExpiredWindow(pTsdb, smaType, msg)) < 0) { + tsdbWarn("vgId:%d update expired sma window failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +/** + * @brief Insert Time-range-wise Rollup Sma(RSma) data + * + * @param pTsdb + * @param param + * @param msg + * @return int32_t + */ +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbInsertRSmaDataImpl(pTsdb, msg)) < 0) { + tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} + +int32_t tsdbGetTSmaData(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval, int8_t intervalUnit, + tb_uid_t tableUid, col_id_t colId, TSKEY querySkey, int32_t nMaxResult) { + int32_t code = TSDB_CODE_SUCCESS; + if ((code = tsdbGetTSmaDataImpl(pTsdb, pData, indexUid, interval, intervalUnit, tableUid, colId, querySkey, + nMaxResult)) < 0) { + tsdbWarn("vgId:%d get tSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); + } + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 9cccea9853..3ccb483fe4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -34,6 +34,7 @@ int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp) { return tsdbMemTableInsert(pTsdb, pTsdb->mem, pMsg, NULL); } +#if 0 /** * @brief Insert/Update tSma(Time-range-wise SMA) data from stream computing engine * @@ -73,4 +74,6 @@ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg) { tsdbWarn("vgId:%d insert rSma data failed since %s", REPO_ID(pTsdb), tstrerror(terrno)); } return code; -} \ No newline at end of file +} + +#endif \ No newline at end of file diff --git a/source/dnode/vnode/test/tsdbSmaTest.cpp b/source/dnode/vnode/test/tsdbSmaTest.cpp index f815291c77..18dca33bda 100644 --- a/source/dnode/vnode/test/tsdbSmaTest.cpp +++ b/source/dnode/vnode/test/tsdbSmaTest.cpp @@ -33,7 +33,7 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } -TEST(testCase, tSmaEncodeDecodeTest) { +TEST(testCase, tSma_Meta_Encode_Decode_Test) { // encode STSma tSma = {0}; tSma.version = 0; @@ -87,8 +87,9 @@ TEST(testCase, tSmaEncodeDecodeTest) { tdDestroyTSma(&tSma); tdDestroyTSmaWrapper(&dstTSmaWrapper); } + #if 1 -TEST(testCase, tSma_DB_Put_Get_Del_Test) { +TEST(testCase, tSma_metaDB_Put_Get_Del_Test) { const char * smaIndexName1 = "sma_index_test_1"; const char * smaIndexName2 = "sma_index_test_2"; const char * timezone = "Asia/Shanghai"; @@ -220,16 +221,21 @@ TEST(testCase, tSma_DB_Put_Get_Del_Test) { #endif #if 1 -TEST(testCase, tSmaInsertTest) { - // prepare meta +TEST(testCase, tSma_Data_Insert_Query_Test) { + // step 1: prepare meta const char * smaIndexName1 = "sma_index_test_1"; const char * timezone = "Asia/Shanghai"; const char * expr = "select count(a,b, top 20), from table interval 1d, sliding 1h;"; - const char * tagsFilter = "I'm tags filter"; + const char * tagsFilter = "where tags.location='Beijing' and tags.district='ChaoYang'"; const char * smaTestDir = "./smaTest"; const tb_uid_t tbUid = 1234567890; const int64_t indexUid1 = 2000000001; + const int64_t interval1 = 1; + const int8_t intervalUnit1 = TD_TIME_UNIT_DAY; const uint32_t nCntTSma = 2; + TSKEY skey1 = 1646987196; + const int64_t testSmaData1 = 100; + const int64_t testSmaData2 = 200; // encode STSma tSma = {0}; tSma.version = 0; @@ -261,7 +267,7 @@ TEST(testCase, tSmaInsertTest) { // save index 1 EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0); - // insert data + // step 2: insert data STSmaDataWrapper *pSmaData = NULL; STsdb tsdb = {0}; STsdbCfg * pCfg = &tsdb.config; @@ -276,6 +282,21 @@ TEST(testCase, tSmaInsertTest) { tsdb.config.update = TD_ROW_OVERWRITE_UPDATE; tsdb.config.compression = TWO_STAGE_COMP; + switch (tsdb.config.precision) { + case TSDB_TIME_PRECISION_MILLI: + skey1 *= 1e3; + break; + case TSDB_TIME_PRECISION_MICRO: + skey1 *= 1e6; + break; + case TSDB_TIME_PRECISION_NANO: + skey1 *= 1e9; + break; + default: // ms + skey1 *= 1e3; + break; + } + char *msg = (char *)calloc(100, 1); EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0); @@ -286,21 +307,21 @@ TEST(testCase, tSmaInsertTest) { void * buf = NULL; EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0); int32_t bufSize = taosTSizeof(buf); - int32_t numOfTables = 5; - col_id_t numOfCols = 10; + int32_t numOfTables = 10; + col_id_t numOfCols = 4096; EXPECT_GT(numOfCols, 0); pSmaData = (STSmaDataWrapper *)buf; printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData); - pSmaData->skey = 1646987196000; - pSmaData->interval = 10; - pSmaData->intervalUnit = TD_TIME_UNIT_MINUTE; + pSmaData->skey = skey1; + pSmaData->interval = interval1; + pSmaData->intervalUnit = intervalUnit1; pSmaData->indexUid = indexUid1; int32_t len = sizeof(STSmaDataWrapper); for (int32_t t = 0; t < numOfTables; ++t) { STSmaTbData *pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len); - pTbData->tableUid = t; + pTbData->tableUid = tbUid + t; int32_t tableDataLen = sizeof(STSmaTbData); for (col_id_t c = 0; c < numOfCols; ++c) { @@ -313,8 +334,17 @@ TEST(testCase, tSmaInsertTest) { } STSmaColData *pColData = (STSmaColData *)POINTER_SHIFT(pSmaData, len + tableDataLen); pColData->colId = c + PRIMARYKEY_TIMESTAMP_COL_ID; - pColData->blockSize = ((c & 1) == 0) ? 8 : 16; + // TODO: fill col data + if ((c & 1) == 0) { + pColData->blockSize = 8; + memcpy(pColData->data, &testSmaData1, 8); + } else { + pColData->blockSize = 16; + memcpy(pColData->data, &testSmaData1, 8); + memcpy(POINTER_SHIFT(pColData->data, 8), &testSmaData2, 8); + } + tableDataLen += (sizeof(STSmaColData) + pColData->blockSize); } pTbData->dataLen = (tableDataLen - sizeof(STSmaTbData)); @@ -328,6 +358,19 @@ TEST(testCase, tSmaInsertTest) { // execute EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS); + // step 3: query + uint32_t checkDataCnt = 0; + for (int32_t t = 0; t < numOfTables; ++t) { + for (col_id_t c = 0; c < numOfCols; ++c) { + EXPECT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t, + c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1), + TSDB_CODE_SUCCESS); + ++checkDataCnt; + } + } + + printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt); + // release data taosTZfree(buf); // release meta