TD-100
This commit is contained in:
parent
366b07fb96
commit
7e8fc3b2ae
|
@ -443,13 +443,16 @@ typedef struct {
|
||||||
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
|
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
|
||||||
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
|
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
|
||||||
|
|
||||||
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
|
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
||||||
|
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
|
||||||
|
// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
|
||||||
void tsdbDestroyHelper(SRWHelper *pHelper);
|
void tsdbDestroyHelper(SRWHelper *pHelper);
|
||||||
void tsdbResetHelper(SRWHelper *pHelper);
|
void tsdbResetHelper(SRWHelper *pHelper);
|
||||||
|
|
||||||
// --------- For set operations
|
// --------- For set operations
|
||||||
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
|
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
|
||||||
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
|
// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
|
||||||
|
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
|
||||||
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
|
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
|
||||||
|
|
||||||
// --------- For read operations
|
// --------- For read operations
|
||||||
|
|
|
@ -400,6 +400,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t ti
|
||||||
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||||
config->tableId.uid = uid;
|
config->tableId.uid = uid;
|
||||||
config->tableId.tid = tid;
|
config->tableId.tid = tid;
|
||||||
|
config->name = strdup("test1");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -873,16 +874,7 @@ static void *tsdbCommitData(void *arg) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a write helper to commit data
|
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) goto _exit;
|
||||||
SHelperCfg hcfg = {.type = TSDB_WRITE_HELPER,
|
|
||||||
.maxTables = pCfg->maxTables,
|
|
||||||
.maxRowSize = pMeta->maxRowBytes,
|
|
||||||
.maxRows = pCfg->maxRowsPerFileBlock,
|
|
||||||
.maxCols = pMeta->maxCols,
|
|
||||||
.minRowsPerFileBlock = pCfg->minRowsPerFileBlock,
|
|
||||||
.maxRowsPerFileBlock = pCfg->maxRowsPerFileBlock,
|
|
||||||
.compress = pCfg->compression};
|
|
||||||
if (tsdbInitHelper(&whelper, &hcfg) < 0) goto _exit;
|
|
||||||
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit;
|
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit;
|
||||||
|
|
||||||
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
|
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
|
||||||
|
@ -898,7 +890,6 @@ static void *tsdbCommitData(void *arg) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
tdFreeDataCols(pDataCols);
|
tdFreeDataCols(pDataCols);
|
||||||
tsdbDestroyHelper(&whelper);
|
|
||||||
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
tsdbDestroyTableIters(iters, pCfg->maxTables);
|
||||||
|
|
||||||
tsdbLockRepo(arg);
|
tsdbLockRepo(arg);
|
||||||
|
@ -948,8 +939,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
||||||
SSkipListIterator *pIter = iters[tid];
|
SSkipListIterator *pIter = iters[tid];
|
||||||
|
|
||||||
// Set the helper and the buffer dataCols object to help to write this table
|
// Set the helper and the buffer dataCols object to help to write this table
|
||||||
SHelperTable hTable = {.uid = pTable->tableId.uid, .tid = pTable->tableId.tid, .sversion = pTable->sversion};
|
tsdbSetHelperTable(pHelper, pTable, pRepo);
|
||||||
tsdbSetHelperTable(pHelper, &hTable, tsdbGetTableSchema(pMeta, pTable));
|
|
||||||
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
|
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
|
||||||
|
|
||||||
// Loop to write the data in the cache to files. If no data to write, just break the loop
|
// Loop to write the data in the cache to files. If no data to write, just break the loop
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
#include "talgo.h"
|
#include "talgo.h"
|
||||||
|
|
||||||
// Local function definitions
|
// Local function definitions
|
||||||
static int tsdbCheckHelperCfg(SHelperCfg *pCfg);
|
// static int tsdbCheckHelperCfg(SHelperCfg *pCfg);
|
||||||
static int tsdbInitHelperFile(SRWHelper *pHelper);
|
static int tsdbInitHelperFile(SRWHelper *pHelper);
|
||||||
// static void tsdbClearHelperFile(SHelperFile *pHFile);
|
// static void tsdbClearHelperFile(SHelperFile *pHFile);
|
||||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper);
|
||||||
|
@ -102,14 +102,21 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
|
||||||
tdFreeDataCols(pHelper->pDataCols[1]);
|
tdFreeDataCols(pHelper->pDataCols[1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------------------------------ OPERATIONS FOR OUTSIDE ------------------------------------------
|
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
|
||||||
int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg) {
|
if (pHelper == NULL || pRepo == NULL) return -1;
|
||||||
if (pHelper == NULL || pCfg == NULL || tsdbCheckHelperCfg(pCfg) < 0) return -1;
|
|
||||||
|
|
||||||
memset((void *)pHelper, 0, sizeof(*pHelper));
|
memset((void *)pHelper, 0, sizeof(*pHelper));
|
||||||
|
|
||||||
// Init global configuration
|
// Init global configuration
|
||||||
pHelper->config = *pCfg;
|
pHelper->config.type = type;
|
||||||
|
pHelper->config.maxTables = pRepo->config.maxTables;
|
||||||
|
pHelper->config.maxRowSize = pRepo->tsdbMeta->maxRowBytes;
|
||||||
|
pHelper->config.maxRows = pRepo->config.maxRowsPerFileBlock;
|
||||||
|
pHelper->config.maxCols = pRepo->tsdbMeta->maxCols;
|
||||||
|
pHelper->config.minRowsPerFileBlock = pRepo->config.minRowsPerFileBlock;
|
||||||
|
pHelper->config.maxRowsPerFileBlock = pRepo->config.maxRowsPerFileBlock;
|
||||||
|
pHelper->config.compress = pRepo->config.compression;
|
||||||
|
|
||||||
pHelper->state = TSDB_HELPER_CLEAR_STATE;
|
pHelper->state = TSDB_HELPER_CLEAR_STATE;
|
||||||
|
|
||||||
// Init file part
|
// Init file part
|
||||||
|
@ -128,6 +135,15 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------ OPERATIONS FOR OUTSIDE ------------------------------------------
|
||||||
|
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||||
|
return tsdbInitHelper(pHelper, pRepo, TSDB_READ_HELPER);
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo) {
|
||||||
|
return tsdbInitHelper(pHelper, pRepo, TSDB_WRITE_HELPER);
|
||||||
|
}
|
||||||
|
|
||||||
void tsdbDestroyHelper(SRWHelper *pHelper) {
|
void tsdbDestroyHelper(SRWHelper *pHelper) {
|
||||||
if (pHelper) {
|
if (pHelper) {
|
||||||
tsdbDestroyHelperFile(pHelper);
|
tsdbDestroyHelperFile(pHelper);
|
||||||
|
@ -243,18 +259,22 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema) {
|
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
||||||
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
|
ASSERT(helperHasState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
|
||||||
|
|
||||||
// Clear members and state used by previous table
|
// Clear members and state used by previous table
|
||||||
tsdbResetHelperTable(pHelper);
|
tsdbResetHelperTable(pHelper);
|
||||||
ASSERT(pHelper->state == (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
|
ASSERT(pHelper->state == (TSDB_HELPER_FILE_SET_AND_OPEN | TSDB_HELPER_IDX_LOAD));
|
||||||
|
|
||||||
pHelper->tableInfo = *pHelperTable;
|
pHelper->tableInfo.tid = pTable->tableId.tid;
|
||||||
|
pHelper->tableInfo.uid = pTable->tableId.uid;
|
||||||
|
pHelper->tableInfo.sversion = pTable->sversion;
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pRepo->tsdbMeta, pTable);
|
||||||
|
|
||||||
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
||||||
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
tdInitDataCols(pHelper->pDataCols[1], pSchema);
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelperTable->tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pTable->tableId.tid;
|
||||||
if (pIdx->offset > 0 && pIdx->hasLast) {
|
if (pIdx->offset > 0 && pIdx->hasLast) {
|
||||||
pHelper->hasOldLastBlock = true;
|
pHelper->hasOldLastBlock = true;
|
||||||
}
|
}
|
||||||
|
@ -610,10 +630,10 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
|
// static int tsdbCheckHelperCfg(SHelperCfg *pCfg) {
|
||||||
// TODO
|
// // TODO
|
||||||
return 0;
|
// return 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
// static void tsdbClearHelperFile(SHelperFile *pHFile) {
|
// static void tsdbClearHelperFile(SHelperFile *pHFile) {
|
||||||
// pHFile->fid = -1;
|
// pHFile->fid = -1;
|
||||||
|
|
|
@ -191,29 +191,15 @@ TEST(TsdbTest, createRepo) {
|
||||||
|
|
||||||
// Read from file
|
// Read from file
|
||||||
SRWHelper rhelper;
|
SRWHelper rhelper;
|
||||||
SHelperCfg helperCfg = {
|
tsdbInitReadHelper(&rhelper, repo);
|
||||||
.type = TSDB_READ_HELPER,
|
|
||||||
.maxTables = repo->config.maxTables,
|
|
||||||
.maxRowSize = repo->tsdbMeta->maxRowBytes,
|
|
||||||
.maxRows = repo->config.maxRowsPerFileBlock,
|
|
||||||
.maxCols = repo->tsdbMeta->maxCols,
|
|
||||||
.minRowsPerFileBlock = repo->config.minRowsPerFileBlock,
|
|
||||||
.maxRowsPerFileBlock = repo->config.maxRowsPerFileBlock,
|
|
||||||
.compress = repo->config.compression,
|
|
||||||
|
|
||||||
};
|
|
||||||
tsdbInitHelper(&rhelper, &helperCfg);
|
|
||||||
|
|
||||||
SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
|
SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
|
||||||
ASSERT_NE(pFGroup, nullptr);
|
ASSERT_NE(pFGroup, nullptr);
|
||||||
ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
|
ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
|
||||||
|
|
||||||
SHelperTable htable = {
|
STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid);
|
||||||
.uid = tCfg.tableId.uid,
|
ASSERT_NE(pTable, nullptr);
|
||||||
.tid = tCfg.tableId.tid,
|
tsdbSetHelperTable(&rhelper, pTable, repo);
|
||||||
.sversion = tCfg.sversion
|
|
||||||
};
|
|
||||||
tsdbSetHelperTable(&rhelper, &htable, schema);
|
|
||||||
|
|
||||||
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
|
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
|
||||||
ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0);
|
ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0);
|
||||||
|
|
Loading…
Reference in New Issue