TD-100
This commit is contained in:
parent
7e8fc3b2ae
commit
e165343c01
|
@ -460,7 +460,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
|
|||
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
|
||||
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target);
|
||||
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds);
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target);
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target);
|
||||
|
||||
// --------- For write operations
|
||||
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
|
|
|
@ -363,7 +363,7 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
|||
ASSERT(pCompBlock->last);
|
||||
|
||||
if (pCompBlock->numOfSubBlocks > 1) {
|
||||
if (tsdbLoadBlockData(pHelper, pIdx->numOfSuperBlocks - 1, NULL) < 0) return -1;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, pIdx->numOfSuperBlocks - 1), NULL) < 0) return -1;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfPoints > 0 &&
|
||||
pHelper->pDataCols[0]->numOfPoints < pHelper->config.minRowsPerFileBlock);
|
||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0],
|
||||
|
@ -607,8 +607,8 @@ _err:
|
|||
}
|
||||
|
||||
// Load the whole block data
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, int blkIdx, SDataCols *target) {
|
||||
SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target) {
|
||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||
|
||||
int numOfSubBlock = pCompBlock->numOfSubBlocks;
|
||||
if (numOfSubBlock > 1) pCompBlock = (SCompBlock *)((char *)pHelper->pCompInfo + pCompBlock->offset);
|
||||
|
@ -797,7 +797,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else {
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
ASSERT(pHelper->pDataCols[0]->numOfPoints == blockAtIdx(pHelper, blkIdx)->numOfPoints);
|
||||
// Merge
|
||||
if (tdMergeDataCols(pHelper->pDataCols[0], pDataCols, rowsWritten) < 0) goto _err;
|
||||
|
@ -852,7 +852,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
|||
if (tsdbAddSubBlock(pHelper, &compBlock, blkIdx, rowsWritten) < 0) goto _err;
|
||||
} else { // Load-Merge-Write
|
||||
// Load
|
||||
if (tsdbLoadBlockData(pHelper, blkIdx, NULL) < 0) goto _err;
|
||||
if (tsdbLoadBlockData(pHelper, blockAtIdx(pHelper, blkIdx), NULL) < 0) goto _err;
|
||||
if (blockAtIdx(pHelper, blkIdx)->last) pHelper->hasOldLastBlock = false;
|
||||
|
||||
rowsWritten = rows3;
|
||||
|
|
|
@ -128,6 +128,7 @@ typedef struct STsdbQueryHandle {
|
|||
SFileGroup* pFileGroup;
|
||||
SFileGroupIter fileIter;
|
||||
SCompIdx* compIndex;
|
||||
SRWHelper rhelper;
|
||||
} STsdbQueryHandle;
|
||||
|
||||
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
|
||||
|
@ -150,7 +151,8 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
|
|||
pQueryHandle->order = pCond->order;
|
||||
pQueryHandle->window = pCond->twindow;
|
||||
pQueryHandle->pTsdb = tsdb;
|
||||
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)),
|
||||
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx));
|
||||
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
|
||||
|
||||
pQueryHandle->loadDataAfterSeek = false;
|
||||
pQueryHandle->isFirstSlot = true;
|
||||
|
@ -299,11 +301,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
|
||||
|
||||
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
|
||||
if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) {
|
||||
fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY);
|
||||
} else {
|
||||
assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd));
|
||||
}
|
||||
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
|
||||
|
||||
// load all the comp offset value for all tables in this file
|
||||
// tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
|
||||
|
@ -314,7 +312,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||
|
||||
SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid];
|
||||
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
|
||||
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
|
||||
continue;//no data blocks in the file belongs to pCheckInfo->pTable
|
||||
} else {
|
||||
|
@ -329,7 +327,12 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
|||
}
|
||||
|
||||
// tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
|
||||
|
||||
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->tableId.uid);
|
||||
assert(pTable != NULL);
|
||||
|
||||
tsdbSetHelperTable(&pQueryHandle->rhelper, pTable, pQueryHandle->pTsdb);
|
||||
|
||||
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
|
||||
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
|
||||
|
||||
TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
|
||||
|
@ -420,26 +423,26 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
|||
|
||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
|
||||
|
||||
SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
||||
if (pFile->fd == FD_INITIALIZER) {
|
||||
pFile->fd = open(pFile->fname, O_RDONLY);
|
||||
}
|
||||
|
||||
// if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
|
||||
// SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||
|
||||
// pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
|
||||
// pBlockLoadInfo->slot = pQueryHandle->cur.slot;
|
||||
// pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
|
||||
|
||||
// blockLoaded = true;
|
||||
// SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
|
||||
// if (pFile->fd == FD_INITIALIZER) {
|
||||
// pFile->fd = open(pFile->fname, O_RDONLY);
|
||||
// }
|
||||
|
||||
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
|
||||
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
|
||||
|
||||
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
|
||||
pBlockLoadInfo->slot = pQueryHandle->cur.slot;
|
||||
pBlockLoadInfo->sid = pCheckInfo->pTableObj->tableId.tid;
|
||||
|
||||
blockLoaded = true;
|
||||
}
|
||||
|
||||
taosArrayDestroy(sa);
|
||||
tfree(data);
|
||||
|
||||
TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
|
||||
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
|
||||
// TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
|
||||
// assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
|
||||
|
||||
return blockLoaded;
|
||||
}
|
||||
|
@ -595,7 +598,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
|||
}
|
||||
}
|
||||
|
||||
int32_t start = MIN(cur->pos, endPos);
|
||||
// int32_t start = MIN(cur->pos, endPos);
|
||||
|
||||
// move the data block in the front to data block if needed
|
||||
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
||||
|
@ -607,9 +610,10 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
|||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
|
||||
|
||||
if (pCol->info.colId == colId) {
|
||||
SDataCol* pDataCol = &pCols->cols[i];
|
||||
memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start,
|
||||
pQueryHandle->realNumOfRows * pCol->info.bytes);
|
||||
// SDataCol* pDataCol = &pCols->cols[i];
|
||||
pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData;
|
||||
// memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start,
|
||||
// pQueryHandle->realNumOfRows * pCol->info.bytes);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1530,7 +1534,7 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
|||
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||
for (int32_t i = 0; i < cols; ++i) {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||
tfree(pColInfo->pData);
|
||||
// tfree(pColInfo->pData);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pQueryHandle->pColumns);
|
||||
|
|
|
@ -202,7 +202,7 @@ TEST(TsdbTest, createRepo) {
|
|||
tsdbSetHelperTable(&rhelper, pTable, repo);
|
||||
|
||||
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
|
||||
ASSERT_EQ(tsdbLoadBlockData(&rhelper, 0, NULL), 0);
|
||||
ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0);
|
||||
|
||||
int k = 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue