feat: tsma logic optimization
This commit is contained in:
parent
b74c0cf1fe
commit
156b0cfebf
|
@ -403,6 +403,19 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
|
||||||
|
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -70,7 +70,7 @@ struct SSmaStatItem {
|
||||||
* N.B. only applicable to tsma
|
* N.B. only applicable to tsma
|
||||||
*/
|
*/
|
||||||
int8_t state; // ETsdbSmaStat
|
int8_t state; // ETsdbSmaStat
|
||||||
SHashObj *expiredWindows; // key: skey of time window, value: N/A
|
SHashObj *expiredWindows; // key: skey of time window, value: version
|
||||||
STSma *pTSma; // cache schema
|
STSma *pTSma; // cache schema
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -60,7 +60,7 @@ typedef struct {
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
} SRtn;
|
} SRtn;
|
||||||
|
|
||||||
#define TSDB_DATA_DIR_LEN 6
|
#define TSDB_DATA_DIR_LEN 6 // adapt accordingly
|
||||||
struct STsdb {
|
struct STsdb {
|
||||||
char *path;
|
char *path;
|
||||||
SVnode *pVnode;
|
SVnode *pVnode;
|
||||||
|
|
|
@ -56,8 +56,8 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
if (tDecodeCStr(pCoder, &pME->name) < 0) return -1;
|
||||||
|
|
||||||
if (pME->type == TSDB_SUPER_TABLE) {
|
if (pME->type == TSDB_SUPER_TABLE) {
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schema) < 0) return -1;
|
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schema) < 0) return -1;
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
if (tDecodeSSchemaWrapperEx(pCoder, &pME->stbEntry.schemaTag) < 0) return -1;
|
||||||
} else if (pME->type == TSDB_CHILD_TABLE) {
|
} else if (pME->type == TSDB_CHILD_TABLE) {
|
||||||
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
|
||||||
|
@ -67,9 +67,9 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
|
||||||
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
|
||||||
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
|
||||||
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1;
|
||||||
if (tDecodeSSchemaWrapper(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schema) < 0) return -1;
|
||||||
} else if (pME->type == TSDB_TSMA_TABLE) {
|
} else if (pME->type == TSDB_TSMA_TABLE) {
|
||||||
pME->smaEntry.tsma = taosMemoryCalloc(1, sizeof(STSma));
|
pME->smaEntry.tsma = tDecoderMalloc(pCoder, sizeof(STSma));
|
||||||
if(!pME->smaEntry.tsma) {
|
if(!pME->smaEntry.tsma) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -394,11 +394,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMSmaCursor *pCur = metaOpenSmaCursor(pMeta, uid);
|
|
||||||
if (pCur == NULL) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pMeta, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
int64_t smaId;
|
int64_t smaId;
|
||||||
|
@ -442,12 +437,10 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid, bool deepCopy) {
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosArrayDestroy(pSmaIds);
|
taosArrayDestroy(pSmaIds);
|
||||||
metaCloseSmaCursor(pCur);
|
|
||||||
return pSW;
|
return pSW;
|
||||||
_err:
|
_err:
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
taosArrayDestroy(pSmaIds);
|
taosArrayDestroy(pSmaIds);
|
||||||
metaCloseSmaCursor(pCur);
|
|
||||||
tdFreeTSmaWrapper(pSW, deepCopy);
|
tdFreeTSmaWrapper(pSW, deepCopy);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,12 +55,13 @@ static inline int tdSmaKeyCmpr(const void *arg1, int len1, const void *arg2, int
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
|
static int32_t smaOpenDBDb(TDB **ppDB, TENV *pEnv, const char *pFName) {
|
||||||
int ret;
|
|
||||||
tdb_cmpr_fn_t compFunc;
|
tdb_cmpr_fn_t compFunc;
|
||||||
|
|
||||||
// Create a database
|
// Create a database
|
||||||
compFunc = tdSmaKeyCmpr;
|
compFunc = tdSmaKeyCmpr;
|
||||||
ret = tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB);
|
if(tdbDbOpen(pFName, -1, -1, compFunc, pEnv, ppDB) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -76,7 +77,7 @@ int32_t smaOpenDBF(TENV *pEnv, SDBFile *pDBF) {
|
||||||
|
|
||||||
// Open DBF
|
// Open DBF
|
||||||
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
|
if (smaOpenDBDb(&(pDBF->pDB), pEnv, pDBF->path) < 0) {
|
||||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
smaError("failed to open DBF: %s", pDBF->path);
|
||||||
smaCloseDBDb(pDBF->pDB);
|
smaCloseDBDb(pDBF->pDB);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -97,9 +98,10 @@ int32_t smaCloseDBF(SDBFile *pDBF) {
|
||||||
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
|
int32_t smaSaveSmaToDB(SDBFile *pDBF, void *pKey, int32_t keyLen, void *pVal, int32_t valLen, TXN *txn) {
|
||||||
int32_t ret;
|
int32_t ret;
|
||||||
|
|
||||||
ret = tdbDbInsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
|
printf("save tsma data into %s, keyLen:%d valLen:%d txn:%p\n", pDBF->path, keyLen, valLen, txn);
|
||||||
|
ret = tdbDbUpsert(pDBF->pDB, pKey, keyLen, pVal, valLen, txn);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
smaError("failed to create insert sma data into db, ret = %d", ret);
|
smaError("failed to upsert tsma data into db, ret = %d", ret);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +115,7 @@ void *smaGetSmaDataByKey(SDBFile *pDBF, const void *pKey, int32_t keyLen, int32_
|
||||||
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
|
ret = tdbDbGet(pDBF->pDB, pKey, keyLen, &pVal, valLen);
|
||||||
|
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
smaError("failed to get sma data from db, ret = %d", ret);
|
smaError("failed to get tsma data from db, ret = %d", ret);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,21 +16,22 @@
|
||||||
#include "sma.h"
|
#include "sma.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
typedef STsdbCfg STSmaKeepCfg;
|
||||||
|
|
||||||
#undef _TEST_SMA_PRINT_DEBUG_LOG_
|
#undef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
#define SMA_STORAGE_TSDB_DAYS 30
|
#define SMA_STORAGE_TSDB_MINUTES 86400
|
||||||
#define SMA_STORAGE_TSDB_TIMES 10
|
#define SMA_STORAGE_TSDB_TIMES 10
|
||||||
#define SMA_STORAGE_SPLIT_HOURS 24
|
#define SMA_STORAGE_SPLIT_FACTOR 144 // least records in tsma file
|
||||||
#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8
|
#define SMA_KEY_LEN 16 // TSKEY+groupId 8+8
|
||||||
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
|
#define SMA_DROP_EXPIRED_TIME 10 // default is 10 seconds
|
||||||
|
|
||||||
#define SMA_STATE_ITEM_HASH_SLOT 32
|
#define SMA_STATE_ITEM_HASH_SLOT 32
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SSma *pSma;
|
SSma *pSma;
|
||||||
SDBFile dFile;
|
SDBFile dFile;
|
||||||
const SArray *pDataBlocks; // sma data
|
const SArray *pDataBlocks; // sma data
|
||||||
int32_t interval; // interval with the precision of DB
|
int64_t interval; // interval with the precision of DB
|
||||||
} STSmaWriteH;
|
} STSmaWriteH;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -42,10 +43,10 @@ typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
SSma *pSma;
|
SSma *pSma;
|
||||||
SDBFile dFile;
|
SDBFile dFile;
|
||||||
int32_t interval; // interval with the precision of DB
|
int64_t interval; // interval with the precision of DB
|
||||||
int32_t blockSize; // size of SMA block item
|
int32_t blockSize; // size of SMA block item
|
||||||
|
int32_t days;
|
||||||
int8_t storageLevel;
|
int8_t storageLevel;
|
||||||
int8_t days;
|
|
||||||
SmaFsIter smaFsIter;
|
SmaFsIter smaFsIter;
|
||||||
} STSmaReadH;
|
} STSmaReadH;
|
||||||
|
|
||||||
|
@ -58,7 +59,7 @@ typedef enum {
|
||||||
// static func
|
// static func
|
||||||
|
|
||||||
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
|
static int64_t tdGetIntervalByPrecision(int64_t interval, uint8_t intervalUnit, int8_t precision, bool adjusted);
|
||||||
static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
|
static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval);
|
||||||
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
|
static int32_t tdInitTSmaWriteH(STSmaWriteH *pSmaH, SSma *pSma, const SArray *pDataBlocks, int64_t interval,
|
||||||
int8_t intervalUnit);
|
int8_t intervalUnit);
|
||||||
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit);
|
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit);
|
||||||
|
@ -92,9 +93,10 @@ static int32_t tdDropTSmaDataImpl(SSma *pSma, int64_t indexUid);
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) {
|
static int32_t tdInitTSmaReadH(STSmaReadH *pSmaH, SSma *pSma, int64_t interval, int8_t intervalUnit) {
|
||||||
|
STSmaKeepCfg *pCfg = SMA_TSDB_CFG(pSma);
|
||||||
pSmaH->pSma = pSma;
|
pSmaH->pSma = pSma;
|
||||||
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
|
pSmaH->interval = tdGetIntervalByPrecision(interval, intervalUnit, SMA_TSDB_CFG(pSma)->precision, true);
|
||||||
pSmaH->storageLevel = tdGetSmaStorageLevel(interval, intervalUnit);
|
pSmaH->storageLevel = tdGetSmaStorageLevel(pCfg, interval);
|
||||||
pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel);
|
pSmaH->days = tdGetTSmaDays(pSma, pSmaH->interval, pSmaH->storageLevel);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -275,11 +277,13 @@ static int32_t tdSetTSmaDataFile(STSmaWriteH *pSmaH, int64_t indexUid, int32_t f
|
||||||
*/
|
*/
|
||||||
static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) {
|
static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel) {
|
||||||
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
||||||
int32_t daysPerFile = pCfg->days;
|
int32_t daysPerFile = pCfg->days; // unit is minute
|
||||||
|
|
||||||
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
|
if (storageLevel == SMA_STORAGE_LEVEL_TSDB) {
|
||||||
int32_t days = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]);
|
int32_t minutes = SMA_STORAGE_TSDB_TIMES * (interval / tsTickPerMin[pCfg->precision]);
|
||||||
daysPerFile = days > SMA_STORAGE_TSDB_DAYS ? days : SMA_STORAGE_TSDB_DAYS;
|
if (minutes > SMA_STORAGE_TSDB_MINUTES) {
|
||||||
|
daysPerFile = SMA_STORAGE_TSDB_MINUTES;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return daysPerFile;
|
return daysPerFile;
|
||||||
|
@ -288,46 +292,15 @@ static int32_t tdGetTSmaDays(SSma *pSma, int64_t interval, int32_t storageLevel)
|
||||||
/**
|
/**
|
||||||
* @brief Judge the tSma storage level
|
* @brief Judge the tSma storage level
|
||||||
*
|
*
|
||||||
|
* @param pCfg
|
||||||
* @param interval
|
* @param interval
|
||||||
* @param intervalUnit
|
|
||||||
* @return int32_t
|
* @return int32_t
|
||||||
*/
|
*/
|
||||||
static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
|
static int32_t tdGetSmaStorageLevel(STSmaKeepCfg *pCfg, int64_t interval) {
|
||||||
// TODO: configurable for SMA_STORAGE_SPLIT_HOURS?
|
int64_t mInterval = convertTimeFromPrecisionToUnit(interval, pCfg->precision, TIME_UNIT_MINUTE);
|
||||||
switch (intervalUnit) {
|
if (pCfg->days / mInterval >= SMA_STORAGE_SPLIT_FACTOR) {
|
||||||
case TIME_UNIT_HOUR:
|
|
||||||
if (interval < SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
return SMA_STORAGE_LEVEL_DFILESET;
|
||||||
}
|
}
|
||||||
break;
|
|
||||||
case TIME_UNIT_MINUTE:
|
|
||||||
if (interval < 60 * SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_SECOND:
|
|
||||||
if (interval < 3600 * SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_MILLISECOND:
|
|
||||||
if (interval < 3600 * 1e3 * SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_MICROSECOND:
|
|
||||||
if (interval < 3600 * 1e6 * SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TIME_UNIT_NANOSECOND:
|
|
||||||
if (interval < 3600 * 1e9 * SMA_STORAGE_SPLIT_HOURS) {
|
|
||||||
return SMA_STORAGE_LEVEL_DFILESET;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
return SMA_STORAGE_LEVEL_TSDB;
|
return SMA_STORAGE_LEVEL_TSDB;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -346,6 +319,7 @@ static int32_t tdGetSmaStorageLevel(int64_t interval, int8_t intervalUnit) {
|
||||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
STsdbCfg *pCfg = SMA_TSDB_CFG(pSma);
|
||||||
const SArray *pDataBlocks = (const SArray *)msg;
|
const SArray *pDataBlocks = (const SArray *)msg;
|
||||||
|
int64_t testSkey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
// TODO: destroy SSDataBlocks(msg)
|
// TODO: destroy SSDataBlocks(msg)
|
||||||
|
|
||||||
|
@ -403,8 +377,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step 1: Judge the storage level and days
|
// Step 1: Judge the storage level and days
|
||||||
int32_t storageLevel = tdGetSmaStorageLevel(pTSma->interval, pTSma->intervalUnit);
|
int32_t storageLevel = tdGetSmaStorageLevel(pCfg, tSmaH.interval);
|
||||||
int32_t daysPerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel);
|
int32_t minutePerFile = tdGetTSmaDays(pSma, tSmaH.interval, storageLevel);
|
||||||
|
|
||||||
char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId
|
char smaKey[SMA_KEY_LEN] = {0}; // key: skey + groupId
|
||||||
char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer?
|
char dataBuf[512] = {0}; // val: aggr data // TODO: handle 512 buffer?
|
||||||
|
@ -432,6 +406,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
if (!isStartKey) {
|
if (!isStartKey) {
|
||||||
isStartKey = true;
|
isStartKey = true;
|
||||||
skey = *(TSKEY *)var;
|
skey = *(TSKEY *)var;
|
||||||
|
testSkey = skey;
|
||||||
printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
|
printf("= skey %" PRIi64 " groupId = %" PRIi64 "|", skey, groupId);
|
||||||
tdEncodeTSmaKey(groupId, skey, &pSmaKey);
|
tdEncodeTSmaKey(groupId, skey, &pSmaKey);
|
||||||
} else {
|
} else {
|
||||||
|
@ -503,9 +478,10 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
printf("\n");
|
||||||
// if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
|
// if ((tlen > 0) && (skey != TSKEY_INITIAL_VAL)) {
|
||||||
if (tlen > 0) {
|
if (tlen > 0) {
|
||||||
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, daysPerFile, pCfg->precision));
|
int32_t fid = (int32_t)(TSDB_KEY_FID(skey, minutePerFile, pCfg->precision));
|
||||||
|
|
||||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
|
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index
|
||||||
// file
|
// file
|
||||||
|
@ -517,6 +493,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
smaCloseDBF(&tSmaH.dFile);
|
smaCloseDBF(&tSmaH.dFile);
|
||||||
}
|
}
|
||||||
tdSetTSmaDataFile(&tSmaH, indexUid, fid);
|
tdSetTSmaDataFile(&tSmaH, indexUid, fid);
|
||||||
|
smaDebug("@@@ vgId:%d write to DBF %s, days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi32 " queryKey:%" PRIi64,
|
||||||
|
SMA_VID(pSma), tSmaH.dFile.path, minutePerFile, tSmaH.interval, storageLevel, testSkey);
|
||||||
if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
|
if (smaOpenDBF(pEnv->dbEnv, &tSmaH.dFile) != 0) {
|
||||||
smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma),
|
smaWarn("vgId:%d open DB file %s failed since %s", SMA_VID(pSma),
|
||||||
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
tSmaH.dFile.path ? tSmaH.dFile.path : "path is NULL", tstrerror(terrno));
|
||||||
|
@ -528,7 +506,7 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
|
if (tdInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
|
||||||
smaWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
|
smaWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
|
||||||
" since %s",
|
" since %s",
|
||||||
SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno));
|
SMA_VID(pSma), indexUid, skey, groupId, tstrerror(terrno));
|
||||||
tdSmaEndCommit(pEnv);
|
tdSmaEndCommit(pEnv);
|
||||||
|
@ -536,7 +514,8 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
tdUnRefSmaStat(pSma, pStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
|
|
||||||
|
smaDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
|
||||||
SMA_VID(pSma), indexUid, skey, groupId);
|
SMA_VID(pSma), indexUid, skey, groupId);
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
|
@ -547,7 +526,6 @@ int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||||
SMA_VID(pSma), skey, tlen, indexUid);
|
SMA_VID(pSma), skey, tlen, indexUid);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("\n");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tdSmaEndCommit(pEnv); // TODO: not commit for every insert
|
tdSmaEndCommit(pEnv); // TODO: not commit for every insert
|
||||||
|
@ -579,13 +557,13 @@ static int32_t tdInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t keyL
|
||||||
TXN *txn) {
|
TXN *txn) {
|
||||||
SDBFile *pDBFile = &pSmaH->dFile;
|
SDBFile *pDBFile = &pSmaH->dFile;
|
||||||
|
|
||||||
// TODO: insert sma data blocks into B+Tree(TDB)
|
// TODO: insert tsma data blocks into B+Tree(TDB)
|
||||||
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
|
if (smaSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
|
||||||
smaWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
|
smaWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
|
||||||
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
|
smaDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
|
||||||
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
SMA_VID(pSmaH->pSma), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
||||||
|
|
||||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
|
@ -776,6 +754,8 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
|
||||||
tdUnRefSmaStat(pSma, pStat);
|
tdUnRefSmaStat(pSma, pStat);
|
||||||
|
|
||||||
tdInitTSmaFile(&tReadH, indexUid, querySKey);
|
tdInitTSmaFile(&tReadH, indexUid, querySKey);
|
||||||
|
smaDebug("### vgId:%d read from DBF %s days:%d, interval:%" PRIi64 ", storageLevel:%" PRIi8 " queryKey:%" PRIi64,
|
||||||
|
SMA_VID(pSma), tReadH.dFile.path, tReadH.days, tReadH.interval, tReadH.storageLevel, querySKey);
|
||||||
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
|
if (smaOpenDBF(pEnv->dbEnv, &tReadH.dFile) != 0) {
|
||||||
smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
|
smaWarn("vgId:%d open DBF %s failed since %s", SMA_VID(pSma), tReadH.dFile.path, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
@ -783,7 +763,7 @@ int32_t tdGetTSmaDataImpl(SSma *pSma, char *pData, int64_t indexUid, TSKEY query
|
||||||
|
|
||||||
char smaKey[SMA_KEY_LEN] = {0};
|
char smaKey[SMA_KEY_LEN] = {0};
|
||||||
void *pSmaKey = &smaKey;
|
void *pSmaKey = &smaKey;
|
||||||
int64_t queryGroupId = 1;
|
int64_t queryGroupId = 0;
|
||||||
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
|
tdEncodeTSmaKey(queryGroupId, querySKey, (void **)&pSmaKey);
|
||||||
|
|
||||||
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma),
|
smaDebug("vgId:%d get sma data from %s: smaKey %" PRIx64 "-%" PRIx64 ", keyLen %d", SMA_VID(pSma),
|
||||||
|
@ -915,8 +895,8 @@ static int32_t tdSetExpiredWindow(SSma *pSma, SHashObj *pItemsHash, int64_t inde
|
||||||
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
|
terrno = TSDB_CODE_TDB_NO_SMA_INDEX_IN_META;
|
||||||
taosHashCleanup(pItem->expiredWindows);
|
taosHashCleanup(pItem->expiredWindows);
|
||||||
taosMemoryFree(pItem);
|
taosMemoryFree(pItem);
|
||||||
smaWarn("vgId:%d update expired window failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid,
|
smaWarn("vgId:%d set expire window, get tsma meta failed for smaIndex %" PRIi64 " since %s", SMA_VID(pSma),
|
||||||
tstrerror(terrno));
|
indexUid, tstrerror(terrno));
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
pItem->pTSma = pTSma;
|
pItem->pTSma = pTSma;
|
||||||
|
@ -1021,7 +1001,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
|
||||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!pSW || (pTSma->tableUid != msgIter.suid)) {
|
if (!pSW || (pTSma && (pTSma->tableUid != msgIter.suid))) {
|
||||||
if (pSW) {
|
if (pSW) {
|
||||||
pSW = tdFreeTSmaWrapper(pSW, false);
|
pSW = tdFreeTSmaWrapper(pSW, false);
|
||||||
}
|
}
|
||||||
|
@ -1043,6 +1023,7 @@ int32_t tdUpdateExpiredWindowImpl(SSma *pSma, SSubmitReq *pMsg, int64_t version)
|
||||||
interval.slidingUnit = pTSma->slidingUnit;
|
interval.slidingUnit = pTSma->slidingUnit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: process multiple tsma for one table uid
|
||||||
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
|
TSKEY winSKey = taosTimeTruncate(TD_ROW_KEY(row), &interval, interval.precision);
|
||||||
|
|
||||||
if (lastWinSKey != winSKey) {
|
if (lastWinSKey != winSKey) {
|
||||||
|
|
|
@ -42,7 +42,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
int slen = 0;
|
int slen = 0;
|
||||||
|
|
||||||
*ppTsdb = NULL;
|
*ppTsdb = NULL;
|
||||||
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + strlen(dir) + TSDB_DATA_DIR_LEN + 3;
|
slen = strlen(tfsGetPrimaryPath(pVnode->pTfs)) + strlen(pVnode->path) + 3;
|
||||||
|
|
||||||
// create handle
|
// create handle
|
||||||
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
|
pTsdb = (STsdb *)taosMemoryCalloc(1, sizeof(*pTsdb) + slen);
|
||||||
|
@ -73,7 +73,8 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId:%d tsdb is opened for %s", TD_VID(pVnode), pTsdb->path);
|
tsdbDebug("vgId:%d tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
||||||
|
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
||||||
|
|
||||||
*ppTsdb = pTsdb;
|
*ppTsdb = pTsdb;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -372,13 +372,13 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (level == TSDB_RETENTION_L0) {
|
if (level == TSDB_RETENTION_L0) {
|
||||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L0);
|
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L0);
|
||||||
return VND_RSMA0(pVnode);
|
return VND_RSMA0(pVnode);
|
||||||
} else if (level == TSDB_RETENTION_L1) {
|
} else if (level == TSDB_RETENTION_L1) {
|
||||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L1);
|
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L1);
|
||||||
return VND_RSMA1(pVnode);
|
return VND_RSMA1(pVnode);
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("%p rsma level %d is selected to query", pReadHandle, TSDB_RETENTION_L2);
|
tsdbDebug("vgId:%d read handle %p rsma level %d is selected to query", TD_VID(pVnode), pReadHandle, TSDB_RETENTION_L2);
|
||||||
return VND_RSMA2(pVnode);
|
return VND_RSMA2(pVnode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -876,13 +876,13 @@ static int32_t tsdbInsertTSmaBlocks(STSmaWriteH *pSmaH, void *smaKey, int32_t ke
|
||||||
TXN *txn) {
|
TXN *txn) {
|
||||||
SDBFile *pDBFile = &pSmaH->dFile;
|
SDBFile *pDBFile = &pSmaH->dFile;
|
||||||
|
|
||||||
// TODO: insert sma data blocks into B+Tree(TDB)
|
// TODO: insert tsma data blocks into B+Tree(TDB)
|
||||||
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
|
if (tsdbSaveSmaToDB(pDBFile, smaKey, keyLen, pData, dataLen, txn) != 0) {
|
||||||
tsdbWarn("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
|
tsdbWarn("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " fail",
|
||||||
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
tsdbDebug("vgId:%d insert sma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
|
tsdbDebug("vgId:%d insert tsma data blocks into %s: smaKey %" PRIx64 "-%" PRIx64 ", dataLen %" PRIu32 " succeed",
|
||||||
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
REPO_ID(pSmaH->pTsdb), pDBFile->path, *(int64_t *)smaKey, *(int64_t *)POINTER_SHIFT(smaKey, 8), dataLen);
|
||||||
|
|
||||||
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
#ifdef _TEST_SMA_PRINT_DEBUG_LOG_
|
||||||
|
@ -1245,7 +1245,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
|
if (tsdbInsertTSmaBlocks(&tSmaH, &smaKey, SMA_KEY_LEN, dataBuf, tlen, &pEnv->txn) != 0) {
|
||||||
tsdbWarn("vgId:%d insert tSma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
|
tsdbWarn("vgId:%d insert tsma data blocks fail for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64
|
||||||
" since %s",
|
" since %s",
|
||||||
REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
|
REPO_ID(pTsdb), indexUid, skey, groupId, tstrerror(terrno));
|
||||||
tsdbSmaEndCommit(pEnv);
|
tsdbSmaEndCommit(pEnv);
|
||||||
|
@ -1253,7 +1253,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, int64_t indexUid, const char
|
||||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
tsdbDebug("vgId:%d insert tSma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
|
tsdbDebug("vgId:%d insert tsma data blocks success for index %" PRIi64 ", skey %" PRIi64 ", groupId %" PRIi64,
|
||||||
REPO_ID(pTsdb), indexUid, skey, groupId);
|
REPO_ID(pTsdb), indexUid, skey, groupId);
|
||||||
// TODO:tsdbEndTSmaCommit();
|
// TODO:tsdbEndTSmaCommit();
|
||||||
|
|
||||||
|
|
|
@ -103,7 +103,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
|
|
||||||
// open sma
|
// open sma
|
||||||
if (smaOpen(pVnode)) {
|
if (smaOpen(pVnode)) {
|
||||||
vError("vgId:%d failed to open vnode tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d failed to open vnode sma since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue