Merge pull request #10779 from taosdata/feature/TD-11463-3.0
Feature/td 11463 3.0
This commit is contained in:
commit
b50ecc4e45
|
@ -1890,33 +1890,18 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
|
|||
}
|
||||
return buf;
|
||||
}
|
||||
typedef enum {
|
||||
TD_TIME_UNIT_UNKNOWN = -1,
|
||||
TD_TIME_UNIT_YEAR = 0,
|
||||
TD_TIME_UNIT_SEASON = 1,
|
||||
TD_TIME_UNIT_MONTH = 2,
|
||||
TD_TIME_UNIT_WEEK = 3,
|
||||
TD_TIME_UNIT_DAY = 4,
|
||||
TD_TIME_UNIT_HOUR = 5,
|
||||
TD_TIME_UNIT_MINUTE = 6,
|
||||
TD_TIME_UNIT_SEC = 7,
|
||||
TD_TIME_UNIT_MILLISEC = 8,
|
||||
TD_TIME_UNIT_MICROSEC = 9,
|
||||
TD_TIME_UNIT_NANOSEC = 10
|
||||
} ETDTimeUnit;
|
||||
|
||||
typedef struct {
|
||||
int8_t version; // for compatibility(default 0)
|
||||
int8_t intervalUnit;
|
||||
int8_t slidingUnit;
|
||||
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
|
||||
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
|
||||
char indexName[TSDB_INDEX_NAME_LEN];
|
||||
char timezone[TD_TIMEZONE_LEN]; // sma data is invalid if timezone change.
|
||||
char timezone[TD_TIMEZONE_LEN]; // sma data expired if timezone changes.
|
||||
int32_t exprLen;
|
||||
int32_t tagsFilterLen;
|
||||
int64_t indexUid;
|
||||
tb_uid_t tableUid; // super/child/common table uid
|
||||
int64_t interval;
|
||||
int64_t offset;
|
||||
int64_t offset; // use unit by precision of DB
|
||||
int64_t sliding;
|
||||
char* expr; // sma expression
|
||||
char* tagsFilter;
|
||||
|
@ -1961,7 +1946,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
int64_t indexUid;
|
||||
TSKEY skey; // startTS of one interval/sliding
|
||||
TSKEY skey; // startKey of one interval/sliding window
|
||||
int64_t interval;
|
||||
int32_t dataLen; // not including head
|
||||
int8_t intervalUnit;
|
||||
|
|
|
@ -354,6 +354,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TDB_IVLD_TAG_VAL TAOS_DEF_ERROR_CODE(0, 0x0615)
|
||||
#define TSDB_CODE_TDB_NO_CACHE_LAST_ROW TAOS_DEF_ERROR_CODE(0, 0x0616)
|
||||
#define TSDB_CODE_TDB_NO_SMA_INDEX_IN_META TAOS_DEF_ERROR_CODE(0, 0x0617)
|
||||
#define TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR TAOS_DEF_ERROR_CODE(0, 0x0618)
|
||||
|
||||
// query
|
||||
#define TSDB_CODE_QRY_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0700)
|
||||
|
|
|
@ -60,10 +60,11 @@ struct STsdb {
|
|||
SSmaEnv * pRSmaEnv;
|
||||
};
|
||||
|
||||
#define REPO_ID(r) ((r)->vgId)
|
||||
#define REPO_CFG(r) (&(r)->config)
|
||||
#define REPO_FS(r) (r)->fs
|
||||
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||
#define REPO_ID(r) ((r)->vgId)
|
||||
#define REPO_CFG(r) (&(r)->config)
|
||||
#define REPO_FS(r) (r)->fs
|
||||
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||
#define REPO_SMA_ENV(r, t) ((TSDB_SMA_TYPE_ROLLUP == (t)) ? (r)->pRSmaEnv : (r)->pTSmaEnv)
|
||||
|
||||
int tsdbLockRepo(STsdb *pTsdb);
|
||||
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||
|
|
|
@ -68,8 +68,8 @@ int32_t tsdbOpenBDBEnv(DB_ENV **ppEnv, const char *path) {
|
|||
|
||||
ret = pEnv->open(pEnv, path, DB_CREATE | DB_INIT_CDB | DB_INIT_MPOOL, 0);
|
||||
if (ret != 0) {
|
||||
// BDB_PERR("Failed to open tsdb env", ret);
|
||||
tsdbWarn("Failed to open tsdb env for path %s since %d", path ? path : "NULL", ret);
|
||||
terrno = TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR;
|
||||
tsdbWarn("Failed to open tsdb env for path %s since ret %d != 0", path ? path : "NULL", ret);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -23,8 +23,8 @@ static const char *TSDB_FNAME_SUFFIX[] = {
|
|||
"smal", // TSDB_FILE_SMAL
|
||||
"", // TSDB_FILE_MAX
|
||||
"meta", // TSDB_FILE_META
|
||||
"sma", // TSDB_FILE_TSMA(directory name)
|
||||
"sma", // TSDB_FILE_RSMA(directory name)
|
||||
"tsma", // TSDB_FILE_TSMA
|
||||
"rsma", // TSDB_FILE_RSMA
|
||||
};
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
|
||||
|
|
|
@ -15,9 +15,15 @@
|
|||
|
||||
#include "tsdbDef.h"
|
||||
|
||||
static const char *TSDB_SMA_DNAME[] = {
|
||||
"", // TSDB_SMA_TYPE_BLOCK
|
||||
"tsma", // TSDB_SMA_TYPE_TIME_RANGE
|
||||
"rsma", // TSDB_SMA_TYPE_ROLLUP
|
||||
};
|
||||
|
||||
#undef SMA_PRINT_DEBUG_LOG
|
||||
#define SMA_STORAGE_TSDB_DAYS 30
|
||||
#define SMA_STORAGE_TSDB_TIMES 30
|
||||
#define SMA_STORAGE_TSDB_TIMES 10
|
||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
||||
#define SMA_KEY_LEN 18 // tableUid_colId_TSKEY 8+2+8
|
||||
|
||||
|
@ -93,6 +99,11 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
|
|||
static int32_t tsdbSetTSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData, int32_t storageLevel, int32_t fid);
|
||||
static int32_t tsdbInitTSmaFile(STSmaReadH *pSmaH, TSKEY skey);
|
||||
static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey);
|
||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]);
|
||||
|
||||
static void tsdbGetSmaDir(int32_t vgId, ETsdbSmaType smaType, char dirName[]) {
|
||||
snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vgId, TSDB_SMA_DNAME[smaType]);
|
||||
}
|
||||
|
||||
static SSmaEnv *tsdbNewSmaEnv(const STsdb *pTsdb, const char *path) {
|
||||
SSmaEnv *pEnv = NULL;
|
||||
|
@ -136,7 +147,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (pEnv && *pEnv) {
|
||||
if (*pEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -144,7 +155,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (*pEnv == NULL) {
|
||||
if (*pEnv == NULL) { // 2nd phase check
|
||||
if ((*pEnv = tsdbNewSmaEnv(pTsdb, path)) == NULL) {
|
||||
tsdbUnlockRepo(pTsdb);
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -152,7 +163,7 @@ static int32_t tsdbInitSmaEnv(STsdb *pTsdb, const char *path, SSmaEnv **pEnv) {
|
|||
}
|
||||
|
||||
if (tsdbUnlockRepo(pTsdb) != 0) {
|
||||
tsdbFreeSmaEnv(*pEnv);
|
||||
*pEnv = tsdbFreeSmaEnv(*pEnv);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -244,6 +255,39 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbCheckAndInitSmaEnv(STsdb *pTsdb, int8_t smaType) {
|
||||
switch (smaType) {
|
||||
case TSDB_SMA_TYPE_TIME_RANGE:
|
||||
if (pTsdb->pTSmaEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
break;
|
||||
case TSDB_SMA_TYPE_ROLLUP:
|
||||
if (pTsdb->pRSmaEnv) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
// SDiskID did = {0};
|
||||
SSmaEnv *pEnv = NULL;
|
||||
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
|
||||
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
pTsdb->pTSmaEnv = pEnv;
|
||||
} else {
|
||||
pTsdb->pRSmaEnv = pEnv;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Update expired window according to msg from stream computing module.
|
||||
*
|
||||
|
@ -253,26 +297,17 @@ int32_t tsdbDestroySmaState(SSmaStat *pSmaStat) {
|
|||
* @return int32_t
|
||||
*/
|
||||
int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
SSmaEnv * pEnv = NULL;
|
||||
|
||||
if (!msg || !pTsdb->pMeta) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char smaPath[TSDB_FILENAME_LEN] = "/proj/.sma/";
|
||||
if (tsdbInitSmaEnv(pTsdb, smaPath, &pEnv) != TSDB_CODE_SUCCESS) {
|
||||
if (tsdbCheckAndInitSmaEnv(pTsdb, smaType) != TSDB_CODE_SUCCESS) {
|
||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (smaType == TSDB_SMA_TYPE_TIME_RANGE) {
|
||||
pTsdb->pTSmaEnv = pEnv;
|
||||
} else if (smaType == TSDB_SMA_TYPE_ROLLUP) {
|
||||
pTsdb->pRSmaEnv = pEnv;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
SSmaEnv *pEnv = REPO_SMA_ENV(pTsdb, smaType);
|
||||
|
||||
// TODO: decode the msg => start
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
|
@ -308,7 +343,6 @@ int32_t tsdbUpdateExpiredWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
|||
}
|
||||
pItem->pSma = pSma;
|
||||
|
||||
// TODO: change indexName to indexUid
|
||||
if (taosHashPut(pItemsHash, &indexUid, sizeof(indexUid), &pItem, sizeof(pItem)) != 0) {
|
||||
// If error occurs during put smaStatItem, free the resources of pItem
|
||||
taosHashCleanup(pItem->expiredWindows);
|
||||
|
@ -378,32 +412,32 @@ static int32_t tsdbResetExpiredWindow(SSmaStat *pStat, int64_t indexUid, TSKEY s
|
|||
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
|
||||
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
|
||||
switch (intervalUnit) {
|
||||
case TD_TIME_UNIT_HOUR:
|
||||
case TIME_UNIT_HOUR:
|
||||
if (interval < SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
break;
|
||||
case TD_TIME_UNIT_MINUTE:
|
||||
case TIME_UNIT_MINUTE:
|
||||
if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
break;
|
||||
case TD_TIME_UNIT_SEC:
|
||||
case TIME_UNIT_SECOND:
|
||||
if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
break;
|
||||
case TD_TIME_UNIT_MILLISEC:
|
||||
case TIME_UNIT_MILLISECOND:
|
||||
if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
break;
|
||||
case TD_TIME_UNIT_MICROSEC:
|
||||
case TIME_UNIT_MICROSECOND:
|
||||
if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
break;
|
||||
case TD_TIME_UNIT_NANOSEC:
|
||||
case TIME_UNIT_NANOSECOND:
|
||||
if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
|
||||
return SMA_STORAGE_LEVEL_DFILESET;
|
||||
}
|
||||
|
@ -429,8 +463,8 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
|
|||
|
||||
// TODO: insert sma data blocks into B+Tree
|
||||
tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", dataLen %d",
|
||||
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
||||
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
|
||||
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
||||
*(int64_t *)POINTER_SHIFT(smaKey, 10), dataLen);
|
||||
|
||||
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
@ -447,66 +481,73 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, uint32_t k
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Approximate value for week/month/year.
|
||||
*
|
||||
* @param interval
|
||||
* @param intervalUnit
|
||||
* @param precision
|
||||
* @return int64_t
|
||||
*/
|
||||
static int64_t tsdbGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision) {
|
||||
if (intervalUnit < TD_TIME_UNIT_MILLISEC) {
|
||||
switch (intervalUnit) {
|
||||
case TD_TIME_UNIT_YEAR:
|
||||
case TD_TIME_UNIT_SEASON:
|
||||
case TD_TIME_UNIT_MONTH:
|
||||
case TD_TIME_UNIT_WEEK:
|
||||
// illegal time unit
|
||||
tsdbError("invalid interval unit: %d\n", intervalUnit);
|
||||
TASSERT(0);
|
||||
break;
|
||||
case TD_TIME_UNIT_DAY: // the interval for tSma calculation must <= day
|
||||
interval *= 86400 * 1e3;
|
||||
break;
|
||||
case TD_TIME_UNIT_HOUR:
|
||||
interval *= 3600 * 1e3;
|
||||
break;
|
||||
case TD_TIME_UNIT_MINUTE:
|
||||
interval *= 60 * 1e3;
|
||||
break;
|
||||
case TD_TIME_UNIT_SEC:
|
||||
interval *= 1e3;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
switch (intervalUnit) {
|
||||
case TIME_UNIT_YEAR: // approximate value
|
||||
interval *= 365 * 86400 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_MONTH: // approximate value
|
||||
interval *= 30 * 86400 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_WEEK: // approximate value
|
||||
interval *= 7 * 86400 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_DAY: // the interval for tSma calculation must <= day
|
||||
interval *= 86400 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_HOUR:
|
||||
interval *= 3600 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_MINUTE:
|
||||
interval *= 60 * 1e3;
|
||||
break;
|
||||
case TIME_UNIT_SECOND:
|
||||
interval *= 1e3;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
switch (precision) {
|
||||
case TSDB_TIME_PRECISION_MILLI:
|
||||
if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
|
||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
||||
return interval / 1e3;
|
||||
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
|
||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
|
||||
return interval / 1e6;
|
||||
} else {
|
||||
return interval;
|
||||
}
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_MICRO:
|
||||
if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
|
||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
||||
return interval;
|
||||
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
|
||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
|
||||
return interval / 1e3;
|
||||
} else {
|
||||
return interval * 1e3;
|
||||
}
|
||||
break;
|
||||
case TSDB_TIME_PRECISION_NANO:
|
||||
if (TD_TIME_UNIT_MICROSEC == intervalUnit) {
|
||||
if (TIME_UNIT_MICROSECOND == intervalUnit) {
|
||||
return interval * 1e3;
|
||||
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
|
||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
|
||||
return interval;
|
||||
} else {
|
||||
return interval * 1e6;
|
||||
}
|
||||
break;
|
||||
default: // ms
|
||||
if (TD_TIME_UNIT_MICROSEC == intervalUnit) { // us
|
||||
if (TIME_UNIT_MICROSECOND == intervalUnit) { // us
|
||||
return interval / 1e3;
|
||||
} else if (TD_TIME_UNIT_NANOSEC == intervalUnit) { // nano second
|
||||
} else if (TIME_UNIT_NANOSECOND == intervalUnit) { // nano second
|
||||
return interval / 1e6;
|
||||
} else {
|
||||
return interval;
|
||||
|
@ -800,10 +841,19 @@ static bool tsdbSetAndOpenTSmaFile(STSmaReadH *pReadH, TSKEY *queryKey) {
|
|||
static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_t indexUid, int64_t interval,
|
||||
int8_t intervalUnit, tb_uid_t tableUid, col_id_t colId, TSKEY querySkey,
|
||||
int32_t nMaxResult) {
|
||||
if (!pTsdb->pTSmaEnv) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d getTSmaDataImpl failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaStatItem *pItem = (SSmaStatItem *)taosHashGet(SMA_ENV_STAT_ITEMS(pTsdb->pTSmaEnv), &indexUid, sizeof(indexUid));
|
||||
if (pItem == NULL) {
|
||||
// mark all window as expired and notify query module to query raw TS data.
|
||||
return TSDB_CODE_SUCCESS;
|
||||
// Normally pItem should not be NULL, mark all windows as expired and notify query module to fetch raw TS data if
|
||||
// it's NULL.
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
tsdbWarn("vgId:%d getTSmaDataImpl failed since no index %" PRIi64 " in local cache", REPO_ID(pTsdb), indexUid);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -815,6 +865,7 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
if (taosHashGet(pItem->expiredWindows, &querySkey, sizeof(TSKEY)) != NULL) {
|
||||
// TODO: mark this window as expired.
|
||||
|
@ -835,8 +886,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
tsdbEncodeTSmaKey(tableUid, colId, querySkey, (void **)&pSmaKey);
|
||||
|
||||
tsdbDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIu16 "-%" PRIx64 ", keyLen %d", REPO_ID(pTsdb),
|
||||
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
||||
*(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
|
||||
tReadH.dFile.path, *(tb_uid_t *)smaKey, *(uint16_t *)POINTER_SHIFT(smaKey, 8),
|
||||
*(int64_t *)POINTER_SHIFT(smaKey, 10), SMA_KEY_LEN);
|
||||
|
||||
void * result = NULL;
|
||||
uint32_t valueSize = 0;
|
||||
|
@ -947,7 +998,6 @@ int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, int8_t smaType, char *msg) {
|
|||
* @brief Insert Time-range-wise Rollup Sma(RSma) data
|
||||
*
|
||||
* @param pTsdb
|
||||
* @param param
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
|
|
|
@ -37,9 +37,9 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
|||
// encode
|
||||
STSma tSma = {0};
|
||||
tSma.version = 0;
|
||||
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||
tSma.intervalUnit = TIME_UNIT_DAY;
|
||||
tSma.interval = 1;
|
||||
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
|
||||
tSma.slidingUnit = TIME_UNIT_HOUR;
|
||||
tSma.sliding = 0;
|
||||
tstrncpy(tSma.indexName, "sma_index_test", TSDB_INDEX_NAME_LEN);
|
||||
tstrncpy(tSma.timezone, "Asia/Shanghai", TD_TIMEZONE_LEN);
|
||||
|
@ -50,37 +50,37 @@ TEST(testCase, tSma_Meta_Encode_Decode_Test) {
|
|||
uint32_t bufLen = tEncodeTSmaWrapper(NULL, &tSmaWrapper);
|
||||
|
||||
void *buf = calloc(bufLen, 1);
|
||||
assert(buf != NULL);
|
||||
ASSERT_NE(buf, nullptr);
|
||||
|
||||
STSmaWrapper *pSW = (STSmaWrapper *)buf;
|
||||
uint32_t len = tEncodeTSmaWrapper(&buf, &tSmaWrapper);
|
||||
|
||||
EXPECT_EQ(len, bufLen);
|
||||
ASSERT_EQ(len, bufLen);
|
||||
|
||||
// decode
|
||||
STSmaWrapper dstTSmaWrapper = {0};
|
||||
void * result = tDecodeTSmaWrapper(pSW, &dstTSmaWrapper);
|
||||
assert(result != NULL);
|
||||
ASSERT_NE(result, nullptr);
|
||||
|
||||
EXPECT_EQ(tSmaWrapper.number, dstTSmaWrapper.number);
|
||||
ASSERT_EQ(tSmaWrapper.number, dstTSmaWrapper.number);
|
||||
|
||||
for (int i = 0; i < tSmaWrapper.number; ++i) {
|
||||
STSma *pSma = tSmaWrapper.tSma + i;
|
||||
STSma *qSma = dstTSmaWrapper.tSma + i;
|
||||
|
||||
EXPECT_EQ(pSma->version, qSma->version);
|
||||
EXPECT_EQ(pSma->intervalUnit, qSma->intervalUnit);
|
||||
EXPECT_EQ(pSma->slidingUnit, qSma->slidingUnit);
|
||||
EXPECT_STRCASEEQ(pSma->indexName, qSma->indexName);
|
||||
EXPECT_STRCASEEQ(pSma->timezone, qSma->timezone);
|
||||
EXPECT_EQ(pSma->indexUid, qSma->indexUid);
|
||||
EXPECT_EQ(pSma->tableUid, qSma->tableUid);
|
||||
EXPECT_EQ(pSma->interval, qSma->interval);
|
||||
EXPECT_EQ(pSma->sliding, qSma->sliding);
|
||||
EXPECT_EQ(pSma->exprLen, qSma->exprLen);
|
||||
EXPECT_STRCASEEQ(pSma->expr, qSma->expr);
|
||||
EXPECT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen);
|
||||
EXPECT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter);
|
||||
ASSERT_EQ(pSma->version, qSma->version);
|
||||
ASSERT_EQ(pSma->intervalUnit, qSma->intervalUnit);
|
||||
ASSERT_EQ(pSma->slidingUnit, qSma->slidingUnit);
|
||||
ASSERT_STRCASEEQ(pSma->indexName, qSma->indexName);
|
||||
ASSERT_STRCASEEQ(pSma->timezone, qSma->timezone);
|
||||
ASSERT_EQ(pSma->indexUid, qSma->indexUid);
|
||||
ASSERT_EQ(pSma->tableUid, qSma->tableUid);
|
||||
ASSERT_EQ(pSma->interval, qSma->interval);
|
||||
ASSERT_EQ(pSma->sliding, qSma->sliding);
|
||||
ASSERT_EQ(pSma->exprLen, qSma->exprLen);
|
||||
ASSERT_STRCASEEQ(pSma->expr, qSma->expr);
|
||||
ASSERT_EQ(pSma->tagsFilterLen, qSma->tagsFilterLen);
|
||||
ASSERT_STRCASEEQ(pSma->tagsFilter, qSma->tagsFilter);
|
||||
}
|
||||
|
||||
// resource release
|
||||
|
@ -103,9 +103,9 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
// encode
|
||||
STSma tSma = {0};
|
||||
tSma.version = 0;
|
||||
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||
tSma.intervalUnit = TIME_UNIT_DAY;
|
||||
tSma.interval = 1;
|
||||
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
|
||||
tSma.slidingUnit = TIME_UNIT_HOUR;
|
||||
tSma.sliding = 0;
|
||||
tSma.indexUid = indexUid1;
|
||||
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
|
||||
|
@ -114,10 +114,12 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
|
||||
tSma.exprLen = strlen(expr);
|
||||
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
|
||||
ASSERT_NE(tSma.expr, nullptr);
|
||||
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
|
||||
|
||||
tSma.tagsFilterLen = strlen(tagsFilter);
|
||||
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
|
||||
ASSERT_NE(tSma.tagsFilter, nullptr);
|
||||
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
|
||||
|
||||
SMeta * pMeta = NULL;
|
||||
|
@ -129,18 +131,18 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
|
||||
assert(pMeta != NULL);
|
||||
// save index 1
|
||||
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
|
||||
pSmaCfg->indexUid = indexUid2;
|
||||
tstrncpy(pSmaCfg->indexName, smaIndexName2, TSDB_INDEX_NAME_LEN);
|
||||
pSmaCfg->version = 1;
|
||||
pSmaCfg->intervalUnit = TD_TIME_UNIT_HOUR;
|
||||
pSmaCfg->intervalUnit = TIME_UNIT_HOUR;
|
||||
pSmaCfg->interval = 1;
|
||||
pSmaCfg->slidingUnit = TD_TIME_UNIT_MINUTE;
|
||||
pSmaCfg->slidingUnit = TIME_UNIT_MINUTE;
|
||||
pSmaCfg->sliding = 5;
|
||||
|
||||
// save index 2
|
||||
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
|
||||
// get value by indexName
|
||||
STSma *qSmaCfg = NULL;
|
||||
|
@ -150,8 +152,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("timezone1 = %s\n", qSmaCfg->timezone);
|
||||
printf("expr1 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
|
||||
printf("tagsFilter1 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
||||
EXPECT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
||||
ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName1);
|
||||
ASSERT_EQ(qSmaCfg->tableUid, tSma.tableUid);
|
||||
tdDestroyTSma(qSmaCfg);
|
||||
tfree(qSmaCfg);
|
||||
|
||||
|
@ -161,8 +163,8 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("timezone2 = %s\n", qSmaCfg->timezone);
|
||||
printf("expr2 = %s\n", qSmaCfg->expr != NULL ? qSmaCfg->expr : "");
|
||||
printf("tagsFilter2 = %s\n", qSmaCfg->tagsFilter != NULL ? qSmaCfg->tagsFilter : "");
|
||||
EXPECT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
||||
EXPECT_EQ(qSmaCfg->interval, tSma.interval);
|
||||
ASSERT_STRCASEEQ(qSmaCfg->indexName, smaIndexName2);
|
||||
ASSERT_EQ(qSmaCfg->interval, tSma.interval);
|
||||
tdDestroyTSma(qSmaCfg);
|
||||
tfree(qSmaCfg);
|
||||
|
||||
|
@ -178,25 +180,25 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("indexName = %s\n", indexName);
|
||||
++indexCnt;
|
||||
}
|
||||
EXPECT_EQ(indexCnt, nCntTSma);
|
||||
ASSERT_EQ(indexCnt, nCntTSma);
|
||||
metaCloseSmaCurosr(pSmaCur);
|
||||
|
||||
// get wrapper by table uid
|
||||
STSmaWrapper *pSW = metaGetSmaInfoByTable(pMeta, tbUid);
|
||||
assert(pSW != NULL);
|
||||
EXPECT_EQ(pSW->number, nCntTSma);
|
||||
EXPECT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
|
||||
EXPECT_STRCASEEQ(pSW->tSma->timezone, timezone);
|
||||
EXPECT_STRCASEEQ(pSW->tSma->expr, expr);
|
||||
EXPECT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter);
|
||||
EXPECT_EQ(pSW->tSma->indexUid, indexUid1);
|
||||
EXPECT_EQ(pSW->tSma->tableUid, tbUid);
|
||||
EXPECT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
|
||||
EXPECT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone);
|
||||
EXPECT_STRCASEEQ((pSW->tSma + 1)->expr, expr);
|
||||
EXPECT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter);
|
||||
EXPECT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
|
||||
EXPECT_EQ((pSW->tSma + 1)->tableUid, tbUid);
|
||||
ASSERT_EQ(pSW->number, nCntTSma);
|
||||
ASSERT_STRCASEEQ(pSW->tSma->indexName, smaIndexName1);
|
||||
ASSERT_STRCASEEQ(pSW->tSma->timezone, timezone);
|
||||
ASSERT_STRCASEEQ(pSW->tSma->expr, expr);
|
||||
ASSERT_STRCASEEQ(pSW->tSma->tagsFilter, tagsFilter);
|
||||
ASSERT_EQ(pSW->tSma->indexUid, indexUid1);
|
||||
ASSERT_EQ(pSW->tSma->tableUid, tbUid);
|
||||
ASSERT_STRCASEEQ((pSW->tSma + 1)->indexName, smaIndexName2);
|
||||
ASSERT_STRCASEEQ((pSW->tSma + 1)->timezone, timezone);
|
||||
ASSERT_STRCASEEQ((pSW->tSma + 1)->expr, expr);
|
||||
ASSERT_STRCASEEQ((pSW->tSma + 1)->tagsFilter, tagsFilter);
|
||||
ASSERT_EQ((pSW->tSma + 1)->indexUid, indexUid2);
|
||||
ASSERT_EQ((pSW->tSma + 1)->tableUid, tbUid);
|
||||
|
||||
tdDestroyTSmaWrapper(pSW);
|
||||
tfree(pSW);
|
||||
|
@ -208,7 +210,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
printf("metaGetSmaTbUids: uid[%" PRIu32 "] = %" PRIi64 "\n", i, *(tb_uid_t *)taosArrayGet(pUids, i));
|
||||
// printf("metaGetSmaTbUids: index[%" PRIu32 "] = %s", i, (char *)taosArrayGet(pUids, i));
|
||||
}
|
||||
EXPECT_EQ(taosArrayGetSize(pUids), 1);
|
||||
ASSERT_EQ(taosArrayGetSize(pUids), 1);
|
||||
taosArrayDestroy(pUids);
|
||||
|
||||
// resource release
|
||||
|
@ -231,7 +233,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
const tb_uid_t tbUid = 1234567890;
|
||||
const int64_t indexUid1 = 2000000001;
|
||||
const int64_t interval1 = 1;
|
||||
const int8_t intervalUnit1 = TD_TIME_UNIT_DAY;
|
||||
const int8_t intervalUnit1 = TIME_UNIT_DAY;
|
||||
const uint32_t nCntTSma = 2;
|
||||
TSKEY skey1 = 1646987196;
|
||||
const int64_t testSmaData1 = 100;
|
||||
|
@ -239,9 +241,9 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
// encode
|
||||
STSma tSma = {0};
|
||||
tSma.version = 0;
|
||||
tSma.intervalUnit = TD_TIME_UNIT_DAY;
|
||||
tSma.intervalUnit = TIME_UNIT_DAY;
|
||||
tSma.interval = 1;
|
||||
tSma.slidingUnit = TD_TIME_UNIT_HOUR;
|
||||
tSma.slidingUnit = TIME_UNIT_HOUR;
|
||||
tSma.sliding = 0;
|
||||
tSma.indexUid = indexUid1;
|
||||
tstrncpy(tSma.indexName, smaIndexName1, TSDB_INDEX_NAME_LEN);
|
||||
|
@ -250,10 +252,12 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
|
||||
tSma.exprLen = strlen(expr);
|
||||
tSma.expr = (char *)calloc(tSma.exprLen + 1, 1);
|
||||
ASSERT_NE(tSma.expr, nullptr);
|
||||
tstrncpy(tSma.expr, expr, tSma.exprLen + 1);
|
||||
|
||||
tSma.tagsFilterLen = strlen(tagsFilter);
|
||||
tSma.tagsFilter = (char *)calloc(tSma.tagsFilterLen + 1, 1);
|
||||
ASSERT_NE(tSma.tagsFilter, nullptr);
|
||||
tstrncpy(tSma.tagsFilter, tagsFilter, tSma.tagsFilterLen + 1);
|
||||
|
||||
SMeta * pMeta = NULL;
|
||||
|
@ -265,7 +269,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
pMeta = metaOpen(smaTestDir, pMetaCfg, NULL);
|
||||
assert(pMeta != NULL);
|
||||
// save index 1
|
||||
EXPECT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
ASSERT_EQ(metaSaveSmaToDB(pMeta, pSmaCfg), 0);
|
||||
|
||||
// step 2: insert data
|
||||
STSmaDataWrapper *pSmaData = NULL;
|
||||
|
@ -298,18 +302,19 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
}
|
||||
|
||||
char *msg = (char *)calloc(100, 1);
|
||||
EXPECT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||
assert(msg != NULL);
|
||||
ASSERT_EQ(tsdbUpdateSmaWindow(&tsdb, TSDB_SMA_TYPE_TIME_RANGE, msg), 0);
|
||||
|
||||
// init
|
||||
int32_t allocCnt = 0;
|
||||
int32_t allocStep = 40960;
|
||||
int32_t buffer = 4096;
|
||||
int32_t allocStep = 16384;
|
||||
int32_t buffer = 1024;
|
||||
void * buf = NULL;
|
||||
EXPECT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
|
||||
ASSERT_EQ(tsdbMakeRoom(&buf, allocStep), 0);
|
||||
int32_t bufSize = taosTSizeof(buf);
|
||||
int32_t numOfTables = 10;
|
||||
col_id_t numOfCols = 4096;
|
||||
EXPECT_GT(numOfCols, 0);
|
||||
ASSERT_GT(numOfCols, 0);
|
||||
|
||||
pSmaData = (STSmaDataWrapper *)buf;
|
||||
printf(">> allocate [%d] time to %d and addr is %p\n", ++allocCnt, bufSize, pSmaData);
|
||||
|
@ -326,7 +331,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
int32_t tableDataLen = sizeof(STSmaTbData);
|
||||
for (col_id_t c = 0; c < numOfCols; ++c) {
|
||||
if (bufSize - len - tableDataLen < buffer) {
|
||||
EXPECT_EQ(tsdbMakeRoom(&buf, bufSize + allocStep), 0);
|
||||
ASSERT_EQ(tsdbMakeRoom(&buf, bufSize + allocStep), 0);
|
||||
pSmaData = (STSmaDataWrapper *)buf;
|
||||
pTbData = (STSmaTbData *)POINTER_SHIFT(pSmaData, len);
|
||||
bufSize = taosTSizeof(buf);
|
||||
|
@ -353,22 +358,22 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
|
|||
}
|
||||
pSmaData->dataLen = (len - sizeof(STSmaDataWrapper));
|
||||
|
||||
EXPECT_GE(bufSize, pSmaData->dataLen);
|
||||
ASSERT_GE(bufSize, pSmaData->dataLen);
|
||||
|
||||
// execute
|
||||
EXPECT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||
ASSERT_EQ(tsdbInsertTSmaData(&tsdb, (char *)pSmaData), TSDB_CODE_SUCCESS);
|
||||
|
||||
// step 3: query
|
||||
uint32_t checkDataCnt = 0;
|
||||
for (int32_t t = 0; t < numOfTables; ++t) {
|
||||
for (col_id_t c = 0; c < numOfCols; ++c) {
|
||||
EXPECT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
|
||||
ASSERT_EQ(tsdbGetTSmaData(&tsdb, NULL, indexUid1, interval1, intervalUnit1, tbUid + t,
|
||||
c + PRIMARYKEY_TIMESTAMP_COL_ID, skey1, 1),
|
||||
TSDB_CODE_SUCCESS);
|
||||
++checkDataCnt;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
printf("%s:%d The sma data check count for insert and query is %" PRIu32 "\n", __FILE__, __LINE__, checkDataCnt);
|
||||
|
||||
// release data
|
||||
|
|
|
@ -350,6 +350,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_MESSED_MSG, "TSDB messed message")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVLD_TAG_VAL, "TSDB invalid tag value")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_CACHE_LAST_ROW, "TSDB no cache last row data")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_SMA_INDEX_IN_META, "No sma index in meta")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TDB_ENV_OPEN_ERROR, "TDB env open error")
|
||||
|
||||
// query
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, "Invalid handle")
|
||||
|
|
Loading…
Reference in New Issue