TD-353
This commit is contained in:
parent
6c311f0d11
commit
63124ada40
|
@ -70,6 +70,8 @@ typedef struct {
|
||||||
SList* superList;
|
SList* superList;
|
||||||
SHashObj* uidMap;
|
SHashObj* uidMap;
|
||||||
SKVStore* pStore;
|
SKVStore* pStore;
|
||||||
|
int maxRowBytes;
|
||||||
|
int maxCols;
|
||||||
} STsdbMeta;
|
} STsdbMeta;
|
||||||
|
|
||||||
// ------------------ tsdbBuffer.c
|
// ------------------ tsdbBuffer.c
|
||||||
|
@ -106,8 +108,6 @@ typedef struct {
|
||||||
STableData** tData;
|
STableData** tData;
|
||||||
SList* actList;
|
SList* actList;
|
||||||
SList* bufBlockList;
|
SList* bufBlockList;
|
||||||
int maxCols;
|
|
||||||
int maxRowBytes;
|
|
||||||
} SMemTable;
|
} SMemTable;
|
||||||
|
|
||||||
// ------------------ tsdbFile.c
|
// ------------------ tsdbFile.c
|
||||||
|
|
|
@ -108,9 +108,6 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||||
pTableData->numOfRows++;
|
pTableData->numOfRows++;
|
||||||
|
|
||||||
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
|
ASSERT(pTableData->numOfRows == tSkipListGetSize(pTableData->pData));
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
|
||||||
if (schemaNCols(pSchema) > pMemTable->maxCols) pMemTable->maxCols = schemaNCols(pSchema);
|
|
||||||
if (schemaTLen(pSchema) > pMemTable->maxRowBytes) pMemTable->maxRowBytes = schemaTLen(pSchema);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
|
tsdbTrace("vgId:%d a row is inserted to table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
|
||||||
|
@ -360,6 +357,7 @@ static void *tsdbCommitData(void *arg) {
|
||||||
SMemTable * pMem = pRepo->imem;
|
SMemTable * pMem = pRepo->imem;
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
SDataCols * pDataCols = NULL;
|
SDataCols * pDataCols = NULL;
|
||||||
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
SCommitIter *iters = NULL;
|
SCommitIter *iters = NULL;
|
||||||
SRWHelper whelper = {0};
|
SRWHelper whelper = {0};
|
||||||
ASSERT(pRepo->commit == 1);
|
ASSERT(pRepo->commit == 1);
|
||||||
|
@ -380,10 +378,10 @@ static void *tsdbCommitData(void *arg) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pDataCols = tdNewDataCols(pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
|
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
|
tsdbError("vgId:%d failed to init data cols with maxRowBytes %d maxCols %d maxRowsPerFileBlock %d since %s",
|
||||||
REPO_ID(pRepo), pMem->maxRowBytes, pMem->maxCols, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
|
REPO_ID(pRepo), pMeta->maxCols, pMeta->maxRowBytes, pCfg->maxRowsPerFileBlock, tstrerror(terrno));
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -737,6 +737,12 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
|
||||||
|
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
|
||||||
|
}
|
||||||
|
|
||||||
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||||
|
|
||||||
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
@ -754,6 +760,11 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
SListIter lIter = {0};
|
SListIter lIter = {0};
|
||||||
SListNode *pNode = NULL;
|
SListNode *pNode = NULL;
|
||||||
STable * tTable = NULL;
|
STable * tTable = NULL;
|
||||||
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
|
|
||||||
|
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
int maxCols = schemaNCols(pSchema);
|
||||||
|
int maxRowBytes = schemaTLen(pSchema);
|
||||||
|
|
||||||
if (rmFromIdx) tsdbWLockRepoMeta(pRepo);
|
if (rmFromIdx) tsdbWLockRepoMeta(pRepo);
|
||||||
|
|
||||||
|
@ -779,6 +790,19 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
|
|
||||||
taosHashRemove(pMeta->uidMap, (char *)(&(TABLE_UID(pTable))), sizeof(TABLE_UID(pTable)));
|
taosHashRemove(pMeta->uidMap, (char *)(&(TABLE_UID(pTable))), sizeof(TABLE_UID(pTable)));
|
||||||
|
|
||||||
|
if (maxCols == pMeta->maxCols || maxRowBytes == pMeta->maxRowBytes) {
|
||||||
|
maxCols = 0;
|
||||||
|
maxRowBytes = 0;
|
||||||
|
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||||
|
STable *pTable = pMeta->tables[i];
|
||||||
|
if (pTable != NULL) {
|
||||||
|
pSchema = tsdbGetTableSchema(pTable);
|
||||||
|
maxCols = MAX(maxCols, schemaNCols(pSchema));
|
||||||
|
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (rmFromIdx) tsdbUnlockRepoMeta(pRepo);
|
if (rmFromIdx) tsdbUnlockRepoMeta(pRepo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1096,11 +1096,12 @@ static void tsdbResetHelperBlock(SRWHelper *pHelper) {
|
||||||
|
|
||||||
static int tsdbInitHelperBlock(SRWHelper *pHelper) {
|
static int tsdbInitHelperBlock(SRWHelper *pHelper) {
|
||||||
STsdbRepo *pRepo = helperRepo(pHelper);
|
STsdbRepo *pRepo = helperRepo(pHelper);
|
||||||
|
STsdbMeta *pMeta = pHelper->pRepo->tsdbMeta;
|
||||||
|
|
||||||
pHelper->pDataCols[0] =
|
pHelper->pDataCols[0] =
|
||||||
tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
|
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
pHelper->pDataCols[1] =
|
pHelper->pDataCols[1] =
|
||||||
tdNewDataCols(pRepo->imem->maxRowBytes, pRepo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
|
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
|
if (pHelper->pDataCols[0] == NULL || pHelper->pDataCols[1] == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1120,6 +1121,7 @@ static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
|
||||||
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
|
static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t type) {
|
||||||
STsdbCfg *pCfg = &pRepo->config;
|
STsdbCfg *pCfg = &pRepo->config;
|
||||||
memset((void *)pHelper, 0, sizeof(*pHelper));
|
memset((void *)pHelper, 0, sizeof(*pHelper));
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
|
||||||
helperType(pHelper) = type;
|
helperType(pHelper) = type;
|
||||||
helperRepo(pHelper) = pRepo;
|
helperRepo(pHelper) = pRepo;
|
||||||
|
@ -1135,8 +1137,8 @@ static int tsdbInitHelper(SRWHelper *pHelper, STsdbRepo *pRepo, tsdb_rw_helper_t
|
||||||
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
|
if (tsdbInitHelperBlock(pHelper) < 0) goto _err;
|
||||||
|
|
||||||
pHelper->pBuffer =
|
pHelper->pBuffer =
|
||||||
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pRepo->imem->maxCols +
|
tmalloc(sizeof(SCompData) + (sizeof(SCompCol) + sizeof(TSCKSUM) + COMP_OVERFLOW_BYTES) * pMeta->maxCols +
|
||||||
pRepo->imem->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
|
pMeta->maxRowBytes * pCfg->maxRowsPerFileBlock + sizeof(TSCKSUM));
|
||||||
if (pHelper->pBuffer == NULL) {
|
if (pHelper->pBuffer == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
|
|
@ -589,10 +589,10 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
if (pCheckInfo->pDataCols == NULL) {
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
// STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
||||||
// TODO
|
// TODO
|
||||||
pCheckInfo->pDataCols =
|
pCheckInfo->pDataCols =
|
||||||
tdNewDataCols(pCheckInfo->imem->maxRowBytes, pCheckInfo->imem->maxCols, pRepo->config.maxRowsPerFileBlock);
|
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
|
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
|
||||||
|
|
Loading…
Reference in New Issue