SSDatablock refactor
This commit is contained in:
parent
afeea2ff34
commit
ded8843367
|
@ -38,9 +38,10 @@ typedef enum {
|
|||
} ESmaStorageLevel;
|
||||
|
||||
typedef struct {
|
||||
STsdb *pTsdb;
|
||||
SDBFile dFile;
|
||||
int32_t interval; // interval with the precision of DB
|
||||
STsdb *pTsdb;
|
||||
SDBFile dFile;
|
||||
SSDataBlock *pData; // sma data
|
||||
int32_t interval; // interval with the precision of DB
|
||||
} STSmaWriteH;
|
||||
|
||||
typedef struct {
|
||||
|
@ -98,7 +99,8 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, STSmaDataWrapper *pData, int64_
|
|||
int32_t nMaxResult);
|
||||
|
||||
// insert data
|
||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData);
|
||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval,
|
||||
int8_t intervalUnit);
|
||||
static void tsdbDestroyTSmaWriteH(STSmaWriteH *pSmaH);
|
||||
static int32_t tsdbInitTSmaReadH(STSmaReadH *pSmaH, STsdb *pTsdb, int64_t interval, int8_t intervalUnit);
|
||||
static int32_t tsdbGetSmaStorageLevel(int64_t interval, int8_t intervalUnit);
|
||||
|
@ -800,9 +802,10 @@ static int32_t tsdbInsertTSmaDataSection(STSmaWriteH *pSmaH, STSmaDataWrapper *p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, STSmaDataWrapper *pData) {
|
||||
static int32_t tsdbInitTSmaWriteH(STSmaWriteH *pSmaH, STsdb *pTsdb, SSDataBlock *pData, int64_t interval, int8_t intervalUnit) {
|
||||
pSmaH->pTsdb = pTsdb;
|
||||
pSmaH->interval = tsdbGetIntervalByPrecision(pData->interval, pData->intervalUnit, REPO_CFG(pTsdb)->precision);
|
||||
pSmaH->interval = tsdbGetIntervalByPrecision(interval, intervalUnit, REPO_CFG(pTsdb)->precision);
|
||||
pSmaH->pData = pData;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -857,10 +860,10 @@ static int32_t tsdbGetTSmaDays(STsdb *pTsdb, int64_t interval, int32_t storageLe
|
|||
* @return int32_t
|
||||
*/
|
||||
static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
SSDataBlock *pData = (SSDataBlock *)msg;
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pTSmaEnv);
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
|
@ -868,15 +871,15 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
if (pData->dataLen <= 0) {
|
||||
TASSERT(0);
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return TSDB_CODE_FAILED;
|
||||
if (pData == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert tSma data failed since pData is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STSmaWriteH tSmaH = {0};
|
||||
|
||||
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
|
||||
if (taosArrayGetSize(pData->pDataBlock) <= 0) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
tsdbWarn("vgId:%d insert tSma data failed since pDataBlock is empty", REPO_ID(pTsdb));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
@ -895,6 +898,14 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STSma *pSma = pItem->pSma;
|
||||
|
||||
STSmaWriteH tSmaH = {0};
|
||||
|
||||
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
char rPath[TSDB_FILENAME_LEN] = {0};
|
||||
char aPath[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
||||
|
@ -907,8 +918,11 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
}
|
||||
|
||||
// Step 1: Judge the storage level and days
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
|
||||
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
|
||||
|
||||
|
||||
#if 0
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
||||
|
||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||
|
@ -933,7 +947,7 @@ static int32_t tsdbInsertTSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
|
||||
// Step 3: reset the SSmaStat
|
||||
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
||||
|
||||
#endif
|
||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -999,29 +1013,58 @@ static int32_t tsdbSetRSmaDataFile(STSmaWriteH *pSmaH, STSmaDataWrapper *pData,
|
|||
}
|
||||
|
||||
static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
STSmaDataWrapper *pData = (STSmaDataWrapper *)msg;
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
|
||||
STsdbCfg *pCfg = REPO_CFG(pTsdb);
|
||||
SSDataBlock *pData = (SSDataBlock *)msg;
|
||||
SSmaEnv *pEnv = atomic_load_ptr(&pTsdb->pRSmaEnv);
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert tSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pData->dataLen <= 0) {
|
||||
TASSERT(0);
|
||||
if (pEnv == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pTSmaEnv is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pData == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PTR;
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pData is NULL", REPO_ID(pTsdb));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pData->pDataBlock) <= 0) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
tsdbWarn("vgId:%d insert rSma data failed since pDataBlock is empty", REPO_ID(pTsdb));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaStat *pStat = SMA_ENV_STAT(pTsdb->pTSmaEnv);
|
||||
SSmaStatItem *pItem = NULL;
|
||||
|
||||
tsdbRefSmaStat(pTsdb, pStat);
|
||||
|
||||
if (pStat && pStat->smaStatItems) {
|
||||
pItem = taosHashGet(pStat->smaStatItems, &indexUid, sizeof(indexUid));
|
||||
}
|
||||
|
||||
if ((pItem == NULL) || ((pItem = *(SSmaStatItem **)pItem) == NULL) || tsdbSmaStatIsDropped(pItem)) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_SMA_STAT;
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
STSma *pSma = pItem->pSma;
|
||||
|
||||
STSmaWriteH tSmaH = {0};
|
||||
|
||||
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData) != 0) {
|
||||
if (tsdbInitTSmaWriteH(&tSmaH, pTsdb, pData, pSma->interval, pSma->intervalUnit) != 0) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int64_t indexUid = SMA_TEST_INDEX_UID;
|
||||
char rPath[TSDB_FILENAME_LEN] = {0};
|
||||
char aPath[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(rPath, TSDB_FILENAME_LEN, "%s%s%" PRIi64, SMA_ENV_PATH(pEnv), TD_DIRSEP, indexUid);
|
||||
|
@ -1033,8 +1076,9 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
}
|
||||
|
||||
// Step 1: Judge the storage level and days
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pData->interval, pData->intervalUnit);
|
||||
int32_t storageLevel = tsdbGetSmaStorageLevel(pSma->interval, pSma->intervalUnit);
|
||||
int32_t daysPerFile = tsdbGetTSmaDays(pTsdb, tSmaH.interval, storageLevel);
|
||||
#if 0
|
||||
int32_t fid = (int32_t)(TSDB_KEY_FID(pData->skey, daysPerFile, pCfg->precision));
|
||||
|
||||
// Step 2: Set the DFile for storage of SMA index, and iterate/split the TSma data and store to B+Tree index file
|
||||
|
@ -1057,8 +1101,10 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, char *msg) {
|
|||
|
||||
// Step 3: reset the SSmaStat
|
||||
tsdbResetExpiredWindow(pTsdb, SMA_ENV_STAT(pTsdb->pTSmaEnv), pData->indexUid, pData->skey);
|
||||
#endif
|
||||
|
||||
tsdbDestroyTSmaWriteH(&tSmaH);
|
||||
tsdbUnRefSmaStat(pTsdb, pStat);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -280,7 +280,7 @@ TEST(testCase, tSma_metaDB_Put_Get_Del_Test) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
#if 0
|
||||
TEST(testCase, tSma_Data_Insert_Query_Test) {
|
||||
// step 1: prepare meta
|
||||
const char *smaIndexName1 = "sma_index_test_1";
|
||||
|
|
Loading…
Reference in New Issue