refactor: do some internal refactor.
This commit is contained in:
parent
1470e51dc1
commit
b39198a53a
|
@ -664,8 +664,6 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) {
|
|||
return pBlockInfo;
|
||||
}
|
||||
|
||||
static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; }
|
||||
|
||||
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
|
||||
// start end position
|
||||
int s, e;
|
||||
|
@ -2353,7 +2351,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
|||
|
||||
// currently loaded file data block is consumed
|
||||
if ((pBlockData->nRow > 0) && (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0)) {
|
||||
// pBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
|
||||
break;
|
||||
}
|
||||
|
@ -3361,7 +3358,6 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc
|
|||
CHECK_FILEBLOCK_STATE st;
|
||||
|
||||
SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
|
||||
// SDataBlk* pCurrentBlock = getCurrentBlock(&pReader->status.blockIter);
|
||||
if (pFileBlockInfo == NULL) {
|
||||
st = CHECK_FILEBLOCK_QUIT;
|
||||
break;
|
||||
|
|
|
@ -0,0 +1,680 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "osDef.h"
|
||||
#include "tsdb.h"
|
||||
#include "tsdbDataFileRW.h"
|
||||
#include "tsdbFS2.h"
|
||||
#include "tsdbMerge.h"
|
||||
#include "tsdbUtil2.h"
|
||||
#include "tsimplehash.h"
|
||||
#include "tsdbReadUtil.h"
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||
int32_t num = numOfTables / pBuf->numPerBucket;
|
||||
int32_t remainder = numOfTables % pBuf->numPerBucket;
|
||||
if (pBuf->pData == NULL) {
|
||||
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
if (remainder > 0) {
|
||||
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
pBuf->numOfTables = numOfTables;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||
if (numOfTables <= pBuf->numOfTables) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pBuf->numOfTables > 0) {
|
||||
STableBlockScanInfo** p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
|
||||
taosMemoryFree(*p);
|
||||
pBuf->numOfTables /= pBuf->numPerBucket;
|
||||
}
|
||||
|
||||
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
|
||||
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
|
||||
if (pBuf->pData == NULL) {
|
||||
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
if (remainder > 0) {
|
||||
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
|
||||
if (p == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
taosArrayPush(pBuf->pData, &p);
|
||||
}
|
||||
|
||||
pBuf->numOfTables = numOfTables;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
|
||||
size_t num = taosArrayGetSize(pBuf->pData);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
char** p = taosArrayGet(pBuf->pData, i);
|
||||
taosMemoryFree(*p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pBuf->pData);
|
||||
}
|
||||
|
||||
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
|
||||
int32_t bucketIndex = index / pBuf->numPerBucket;
|
||||
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
|
||||
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
|
||||
}
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) {
|
||||
STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid));
|
||||
if (p == NULL || *p == NULL) {
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
int32_t size = tSimpleHashGetSize(pTableMap);
|
||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return *p;
|
||||
}
|
||||
|
||||
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
|
||||
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables) {
|
||||
// allocate buffer in order to load data blocks from file
|
||||
// todo use simple hash instead, optimize the memory consumption
|
||||
SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT));
|
||||
if (pTableMap == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
initBlockScanInfoBuf(pBuf, numOfTables);
|
||||
|
||||
pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||
if (pUidList->tableUidList == NULL) {
|
||||
tSimpleHashCleanup(pTableMap);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pUidList->currentIndex = 0;
|
||||
|
||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
|
||||
|
||||
pScanInfo->uid = idList[j].uid;
|
||||
pUidList->tableUidList[j] = idList[j].uid;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->info.order)) {
|
||||
int64_t skey = pTsdbReader->info.window.skey;
|
||||
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
pScanInfo->lastKeyInStt = skey;
|
||||
} else {
|
||||
int64_t ekey = pTsdbReader->info.window.ekey;
|
||||
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||
pScanInfo->lastKeyInStt = ekey;
|
||||
}
|
||||
|
||||
tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid,
|
||||
pScanInfo->lastKey, pTsdbReader->idStr);
|
||||
}
|
||||
|
||||
taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
||||
pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
|
||||
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
|
||||
pTsdbReader->idStr);
|
||||
|
||||
return pTableMap;
|
||||
}
|
||||
|
||||
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) {
|
||||
void* p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
||||
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
|
||||
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
|
||||
|
||||
pInfo->iterInit = false;
|
||||
pInfo->iter.hasVal = false;
|
||||
pInfo->iiter.hasVal = false;
|
||||
|
||||
if (pInfo->iter.iter != NULL) {
|
||||
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
|
||||
}
|
||||
|
||||
if (pInfo->iiter.iter != NULL) {
|
||||
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
|
||||
}
|
||||
|
||||
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||
pInfo->lastKey = ts;
|
||||
pInfo->lastKeyInStt = ts + step;
|
||||
}
|
||||
}
|
||||
|
||||
void clearBlockScanInfo(STableBlockScanInfo* p) {
|
||||
p->iterInit = false;
|
||||
p->iter.hasVal = false;
|
||||
p->iiter.hasVal = false;
|
||||
|
||||
if (p->iter.iter != NULL) {
|
||||
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||
}
|
||||
|
||||
if (p->iiter.iter != NULL) {
|
||||
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
|
||||
}
|
||||
|
||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||
p->pMemDelData = taosArrayDestroy(p->pMemDelData);
|
||||
p->pfileDelData = taosArrayDestroy(p->pfileDelData);
|
||||
}
|
||||
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap) {
|
||||
void* p = NULL;
|
||||
int32_t iter = 0;
|
||||
|
||||
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
|
||||
clearBlockScanInfo(*(STableBlockScanInfo**)p);
|
||||
}
|
||||
|
||||
tSimpleHashCleanup(pTableMap);
|
||||
}
|
||||
|
||||
static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
||||
// reset the index in last block when handing a new file
|
||||
taosArrayClear(pScanInfo->pBlockList);
|
||||
taosArrayClear(pScanInfo->pfileDelData); // del data from each file set
|
||||
}
|
||||
|
||||
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap) {
|
||||
STableBlockScanInfo** p = NULL;
|
||||
|
||||
int32_t iter = 0;
|
||||
while ((p = tSimpleHashIterate(pTableMap, p, &iter)) != NULL) {
|
||||
doCleanupInfoForNextFileset(*p);
|
||||
}
|
||||
}
|
||||
|
||||
// brin records iterator
|
||||
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList) {
|
||||
memset(&pIter->block, 0, sizeof(SBrinBlock));
|
||||
memset(&pIter->record, 0, sizeof(SBrinRecord));
|
||||
pIter->blockIndex = -1;
|
||||
pIter->recordIndex = -1;
|
||||
|
||||
pIter->pReader = pReader;
|
||||
pIter->pBrinBlockList = pList;
|
||||
}
|
||||
|
||||
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
|
||||
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) {
|
||||
pIter->blockIndex += 1;
|
||||
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex);
|
||||
|
||||
tBrinBlockClear(&pIter->block);
|
||||
tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block);
|
||||
pIter->recordIndex = -1;
|
||||
}
|
||||
|
||||
pIter->recordIndex += 1;
|
||||
tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record);
|
||||
return &pIter->record;
|
||||
}
|
||||
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter) { tBrinBlockDestroy(&pIter->block); }
|
||||
|
||||
// initialize the file block access order
|
||||
// sort the file blocks according to the offset of each data block in the files
|
||||
static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) {
|
||||
taosMemoryFreeClear(pSup->numOfBlocksPerTable);
|
||||
taosMemoryFreeClear(pSup->indexPerTable);
|
||||
|
||||
for (int32_t i = 0; i < pSup->numOfTables; ++i) {
|
||||
SBlockOrderWrapper* pBlockInfo = pSup->pDataBlockInfo[i];
|
||||
taosMemoryFreeClear(pBlockInfo);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pSup->pDataBlockInfo);
|
||||
}
|
||||
|
||||
static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) {
|
||||
pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
|
||||
pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables);
|
||||
pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables);
|
||||
|
||||
if (pSup->numOfBlocksPerTable == NULL || pSup->indexPerTable == NULL || pSup->pDataBlockInfo == NULL) {
|
||||
cleanupBlockOrderSupporter(pSup);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, void* param) {
|
||||
int32_t leftIndex = *(int32_t*)pLeft;
|
||||
int32_t rightIndex = *(int32_t*)pRight;
|
||||
|
||||
SBlockOrderSupporter* pSupporter = (SBlockOrderSupporter*)param;
|
||||
|
||||
int32_t leftTableBlockIndex = pSupporter->indexPerTable[leftIndex];
|
||||
int32_t rightTableBlockIndex = pSupporter->indexPerTable[rightIndex];
|
||||
|
||||
if (leftTableBlockIndex > pSupporter->numOfBlocksPerTable[leftIndex]) {
|
||||
/* left block is empty */
|
||||
return 1;
|
||||
} else if (rightTableBlockIndex > pSupporter->numOfBlocksPerTable[rightIndex]) {
|
||||
/* right block is empty */
|
||||
return -1;
|
||||
}
|
||||
|
||||
SBlockOrderWrapper* pLeftBlock = &pSupporter->pDataBlockInfo[leftIndex][leftTableBlockIndex];
|
||||
SBlockOrderWrapper* pRightBlock = &pSupporter->pDataBlockInfo[rightIndex][rightTableBlockIndex];
|
||||
|
||||
return pLeftBlock->offset > pRightBlock->offset ? 1 : -1;
|
||||
}
|
||||
|
||||
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList) {
|
||||
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||
|
||||
SBlockOrderSupporter sup = {0};
|
||||
pBlockIter->numOfBlocks = numOfBlocks;
|
||||
taosArrayClear(pBlockIter->blockList);
|
||||
|
||||
pBlockIter->pTableMap = pReader->status.pTableMap;
|
||||
|
||||
// access data blocks according to the offset of each block in asc/desc order.
|
||||
int32_t numOfTables = taosArrayGetSize(pTableList);
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
int32_t code = initBlockOrderSupporter(&sup, numOfTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t cnt = 0;
|
||||
|
||||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
STableBlockScanInfo* pTableScanInfo = taosArrayGetP(pTableList, i);
|
||||
// ASSERT(pTableScanInfo->pBlockList != NULL && taosArrayGetSize(pTableScanInfo->pBlockList) > 0);
|
||||
|
||||
size_t num = taosArrayGetSize(pTableScanInfo->pBlockList);
|
||||
sup.numOfBlocksPerTable[sup.numOfTables] = num;
|
||||
|
||||
char* buf = taosMemoryMalloc(sizeof(SBlockOrderWrapper) * num);
|
||||
if (buf == NULL) {
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
sup.pDataBlockInfo[sup.numOfTables] = (SBlockOrderWrapper*)buf;
|
||||
|
||||
for (int32_t k = 0; k < num; ++k) {
|
||||
SBrinRecord* pRecord = taosArrayGet(pTableScanInfo->pBlockList, k);
|
||||
sup.pDataBlockInfo[sup.numOfTables][k] =
|
||||
(SBlockOrderWrapper){.uid = pTableScanInfo->uid, .offset = pRecord->blockOffset, .pInfo = pTableScanInfo};
|
||||
cnt++;
|
||||
}
|
||||
|
||||
sup.numOfTables += 1;
|
||||
}
|
||||
|
||||
if (numOfBlocks != cnt && sup.numOfTables != numOfTables) {
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// since there is only one table qualified, blocks are not sorted
|
||||
if (sup.numOfTables == 1) {
|
||||
for (int32_t i = 0; i < numOfBlocks; ++i) {
|
||||
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[0][i].uid, .tbBlockIdx = i};
|
||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[0][i].pInfo->pBlockList, i);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
tsdbDebug("%p create blocks info struct completed for one table, %d blocks not sorted, elapsed time:%.2f ms %s",
|
||||
pReader, numOfBlocks, (et - st) / 1000.0, pReader->idStr);
|
||||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables,
|
||||
pReader->idStr);
|
||||
|
||||
SMultiwayMergeTreeInfo* pTree = NULL;
|
||||
|
||||
uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t numOfTotal = 0;
|
||||
while (numOfTotal < cnt) {
|
||||
int32_t pos = tMergeTreeGetChosenIndex(pTree);
|
||||
int32_t index = sup.indexPerTable[pos]++;
|
||||
|
||||
SFileDataBlockInfo blockInfo = {.uid = sup.pDataBlockInfo[pos][index].uid, .tbBlockIdx = index};
|
||||
blockInfo.record = *(SBrinRecord*)taosArrayGet(sup.pDataBlockInfo[pos][index].pInfo->pBlockList, index);
|
||||
|
||||
taosArrayPush(pBlockIter->blockList, &blockInfo);
|
||||
|
||||
// set data block index overflow, in order to disable the offset comparator
|
||||
if (sup.indexPerTable[pos] >= sup.numOfBlocksPerTable[pos]) {
|
||||
sup.indexPerTable[pos] = sup.numOfBlocksPerTable[pos] + 1;
|
||||
}
|
||||
|
||||
numOfTotal += 1;
|
||||
tMergeTreeAdjust(pTree, tMergeTreeGetAdjustIndex(pTree));
|
||||
}
|
||||
|
||||
int64_t et = taosGetTimestampUs();
|
||||
tsdbDebug("%p %d data blocks access order completed, elapsed time:%.2f ms %s", pReader, numOfBlocks,
|
||||
(et - st) / 1000.0, pReader->idStr);
|
||||
cleanupBlockOrderSupporter(&sup);
|
||||
taosMemoryFree(pTree);
|
||||
|
||||
pBlockIter->index = asc ? 0 : (numOfBlocks - 1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr) {
|
||||
bool asc = ASCENDING_TRAVERSE(pBlockIter->order);
|
||||
|
||||
int32_t step = asc ? 1 : -1;
|
||||
if ((pBlockIter->index >= pBlockIter->numOfBlocks - 1 && asc) || (pBlockIter->index <= 0 && (!asc))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
pBlockIter->index += step;
|
||||
return true;
|
||||
}
|
||||
|
||||
// load tomb data API
|
||||
//static int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer);
|
||||
|
||||
//int32_t loadSttTombData(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pScanInfo, uint64_t maxVer) {
|
||||
// int32_t size = taosArrayGetSize(pLDataIterList);
|
||||
// if (size <= 0) {
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
// }
|
||||
//
|
||||
// uint64_t uid = pScanInfo->uid;
|
||||
// if (pScanInfo->pfileDelData == NULL) {
|
||||
// pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
// }
|
||||
//
|
||||
// for (int32_t i = 0; i < size; ++i) {
|
||||
// SArray* pLeveledLDataIter = taosArrayGetP(pLDataIterList, i);
|
||||
//
|
||||
// int32_t numOfIter = taosArrayGetSize(pLeveledLDataIter);
|
||||
// if (numOfIter == 0) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// for (int32_t f = 0; f < numOfIter; ++f) {
|
||||
// SLDataIter* pIter = taosArrayGetP(pLeveledLDataIter, f);
|
||||
//
|
||||
// SArray* pTombBlockArray = pIter->pBlockLoadInfo->pTombBlockArray;
|
||||
// int32_t numOfBlocks = taosArrayGetSize(pTombBlockArray);
|
||||
// for (int32_t k = 0; k < numOfBlocks; ++k) {
|
||||
// STombBlock* pBlock = taosArrayGetP(pTombBlockArray, k);
|
||||
//
|
||||
// int32_t code = checkTombBlockRecords(pScanInfo->pfileDelData, pBlock, suid, uid, maxVer);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// return code;
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
|
||||
int32_t doLoadTombDataFromTombBlk(const TTombBlkArray* pTombBlkArray, STsdbReader* pReader,
|
||||
SDataFileReader* pFileReader, SSttFileReader* pSttReader, bool isFile) {
|
||||
int32_t code = 0;
|
||||
|
||||
STableUidList* pList = &pReader->status.uidList;
|
||||
int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap);
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while (i < pTombBlkArray->size && j < numOfTables) {
|
||||
STombBlk* pTombBlk = &pTombBlkArray->data[i];
|
||||
if (pTombBlk->maxTbid.suid < pReader->info.suid) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTombBlk->minTbid.suid > pReader->info.suid) {
|
||||
break;
|
||||
}
|
||||
|
||||
ASSERT(pTombBlk->minTbid.suid <= pReader->info.suid && pTombBlk->maxTbid.suid >= pReader->info.suid);
|
||||
if (pTombBlk->maxTbid.suid == pReader->info.suid && pTombBlk->maxTbid.uid < pList->tableUidList[0]) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pTombBlk->minTbid.suid == pReader->info.suid && pTombBlk->minTbid.uid > pList->tableUidList[numOfTables - 1]) {
|
||||
break;
|
||||
}
|
||||
|
||||
STombBlock block = {0};
|
||||
code = isFile ? tsdbDataFileReadTombBlock(pFileReader, &pTombBlkArray->data[i], &block)
|
||||
: tsdbSttFileReadTombBlock(pSttReader, &pTombBlkArray->data[i], &block);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
uint64_t uid = pReader->status.uidList.tableUidList[j];
|
||||
|
||||
STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (pScanInfo->pfileDelData == NULL) {
|
||||
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
STombRecord record = {0};
|
||||
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
|
||||
code = tTombBlockGet(&block, k, &record);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (record.suid < pReader->info.suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (record.suid > pReader->info.suid) {
|
||||
tTombBlockDestroy(&block);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
bool newTable = false;
|
||||
if (uid < record.uid) {
|
||||
while (pReader->status.uidList.tableUidList[j] < record.uid && j < numOfTables) {
|
||||
j += 1;
|
||||
newTable = true;
|
||||
}
|
||||
|
||||
if (j >= numOfTables) {
|
||||
tTombBlockDestroy(&block);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
uid = pReader->status.uidList.tableUidList[j];
|
||||
}
|
||||
|
||||
if (record.uid < uid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
ASSERT(record.suid == pReader->info.suid && uid == record.uid);
|
||||
|
||||
if (newTable) {
|
||||
pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr);
|
||||
if (pScanInfo->pfileDelData == NULL) {
|
||||
pScanInfo->pfileDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
}
|
||||
|
||||
if (record.version <= pReader->info.verRange.maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush(pScanInfo->pfileDelData, &delData);
|
||||
}
|
||||
}
|
||||
|
||||
i += 1;
|
||||
tTombBlockDestroy(&block);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader) {
|
||||
if (pReader->status.pCurrentFileset == NULL || pReader->status.pCurrentFileset->farr[3] == NULL) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
const TTombBlkArray* pBlkArray = NULL;
|
||||
|
||||
int32_t code = tsdbDataFileReadTombBlk(pReader->pFileReader, &pBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return doLoadTombDataFromTombBlk(pBlkArray, pReader, pReader->pFileReader, NULL, true);
|
||||
}
|
||||
|
||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo) {
|
||||
if (pLoadInfo->pTombBlockArray == NULL) {
|
||||
pLoadInfo->pTombBlockArray = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
||||
const TTombBlkArray* pBlkArray = NULL;
|
||||
int32_t code = tsdbSttFileReadTombBlk(pSttFileReader, &pBlkArray);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
return doLoadTombDataFromTombBlk(pBlkArray, pReader, NULL, pSttFileReader, false);
|
||||
}
|
||||
|
||||
void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) {
|
||||
if (pScanInfo->pMemDelData == NULL) {
|
||||
pScanInfo->pMemDelData = taosArrayInit(4, sizeof(SDelData));
|
||||
}
|
||||
|
||||
SDelData* p = NULL;
|
||||
if (pMemTbData != NULL) {
|
||||
p = pMemTbData->pHead;
|
||||
while (p) {
|
||||
if (p->version <= ver) {
|
||||
taosArrayPush(pScanInfo->pMemDelData, p);
|
||||
}
|
||||
|
||||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
|
||||
if (piMemTbData != NULL) {
|
||||
p = piMemTbData->pHead;
|
||||
while (p) {
|
||||
if (p->version <= ver) {
|
||||
taosArrayPush(pScanInfo->pMemDelData, p);
|
||||
}
|
||||
p = p->pNext;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t checkTombBlockRecords(SArray* pData, STombBlock* pBlock, uint64_t suid, uint64_t uid, int64_t maxVer) {
|
||||
STombRecord record = {0};
|
||||
for (int32_t j = 0; j < pBlock->suid->size; ++j) {
|
||||
int32_t code = tTombBlockGet(pBlock, j, &record);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (record.suid < suid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (record.suid > suid || (record.suid == suid && record.uid > uid)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (record.uid < uid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (record.version <= maxVer) {
|
||||
SDelData delData = {.version = record.version, .sKey = record.skey, .eKey = record.ekey};
|
||||
taosArrayPush(pData, &delData);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
|
@ -0,0 +1,265 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef TDENGINE_TSDBREADUTIL_H
|
||||
#define TDENGINE_TSDBREADUTIL_H
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include "tsdbUtil2.h"
|
||||
#include "tsdbDataFileRW.h"
|
||||
|
||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||
|
||||
typedef enum {
|
||||
READER_STATUS_SUSPEND = 0x1,
|
||||
READER_STATUS_NORMAL = 0x2,
|
||||
} EReaderStatus;
|
||||
|
||||
typedef enum {
|
||||
READ_MODE_COUNT_ONLY = 0x1,
|
||||
READ_MODE_ALL,
|
||||
} EReadMode;
|
||||
|
||||
typedef enum {
|
||||
EXTERNAL_ROWS_PREV = 0x1,
|
||||
EXTERNAL_ROWS_MAIN = 0x2,
|
||||
EXTERNAL_ROWS_NEXT = 0x3,
|
||||
} EContentData;
|
||||
|
||||
typedef struct SBlockInfoBuf {
|
||||
int32_t currentIndex;
|
||||
SArray* pData;
|
||||
int32_t numPerBucket;
|
||||
int32_t numOfTables;
|
||||
} SBlockInfoBuf;
|
||||
|
||||
typedef struct {
|
||||
STbDataIter* iter;
|
||||
int32_t index;
|
||||
bool hasVal;
|
||||
} SIterInfo;
|
||||
|
||||
typedef struct STableBlockScanInfo {
|
||||
uint64_t uid;
|
||||
TSKEY lastKey;
|
||||
TSKEY lastKeyInStt; // last accessed key in stt
|
||||
SArray* pBlockList; // block data index list, SArray<SBrinRecord>
|
||||
SArray* pMemDelData; // SArray<SDelData>
|
||||
SArray* pfileDelData; // SArray<SDelData> from each file set
|
||||
SIterInfo iter; // mem buffer skip list iterator
|
||||
SIterInfo iiter; // imem buffer skip list iterator
|
||||
SArray* delSkyline; // delete info for this table
|
||||
int32_t fileDelIndex; // file block delete index
|
||||
int32_t lastBlockDelIndex; // delete index for last block
|
||||
bool iterInit; // whether to initialize the in-memory skip list iterator or not
|
||||
} STableBlockScanInfo;
|
||||
|
||||
typedef struct STsdbReaderInfo {
|
||||
uint64_t suid;
|
||||
STSchema* pSchema;
|
||||
EReadMode readMode;
|
||||
uint64_t rowsNum;
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int16_t order;
|
||||
} STsdbReaderInfo;
|
||||
|
||||
typedef struct SResultBlockInfo {
|
||||
SSDataBlock* pResBlock;
|
||||
bool freeBlock;
|
||||
int64_t capacity;
|
||||
} SResultBlockInfo;
|
||||
|
||||
typedef struct SCostSummary {
|
||||
int64_t numOfBlocks;
|
||||
double blockLoadTime;
|
||||
double buildmemBlock;
|
||||
int64_t headFileLoad;
|
||||
double headFileLoadTime;
|
||||
int64_t smaDataLoad;
|
||||
double smaLoadTime;
|
||||
int64_t lastBlockLoad;
|
||||
double lastBlockLoadTime;
|
||||
int64_t composedBlocks;
|
||||
double buildComposedBlockTime;
|
||||
double createScanInfoList;
|
||||
double createSkylineIterTime;
|
||||
double initLastBlockReader;
|
||||
} SCostSummary;
|
||||
|
||||
typedef struct STableUidList {
|
||||
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
||||
int32_t currentIndex; // index in table uid list
|
||||
} STableUidList;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfBlocks;
|
||||
int32_t numOfLastFiles;
|
||||
} SBlockNumber;
|
||||
|
||||
typedef struct SBlockIndex {
|
||||
int32_t ordinalIndex;
|
||||
int64_t inFileOffset;
|
||||
STimeWindow window; // todo replace it with overlap flag.
|
||||
} SBlockIndex;
|
||||
|
||||
typedef struct SBlockOrderWrapper {
|
||||
int64_t uid;
|
||||
int64_t offset;
|
||||
STableBlockScanInfo* pInfo;
|
||||
} SBlockOrderWrapper;
|
||||
|
||||
typedef struct SBlockOrderSupporter {
|
||||
SBlockOrderWrapper** pDataBlockInfo;
|
||||
int32_t* indexPerTable;
|
||||
int32_t* numOfBlocksPerTable;
|
||||
int32_t numOfTables;
|
||||
} SBlockOrderSupporter;
|
||||
|
||||
typedef struct SBlockLoadSuppInfo {
|
||||
TColumnDataAggArray colAggArray;
|
||||
SColumnDataAgg tsColAgg;
|
||||
int16_t* colId;
|
||||
int16_t* slotId;
|
||||
int32_t numOfCols;
|
||||
char** buildBuf; // build string tmp buffer, todo remove it later after all string format being updated.
|
||||
bool smaValid; // the sma on all queried columns are activated
|
||||
} SBlockLoadSuppInfo;
|
||||
|
||||
typedef struct SLastBlockReader {
|
||||
STimeWindow window;
|
||||
SVersionRange verRange;
|
||||
int32_t order;
|
||||
uint64_t uid;
|
||||
SMergeTree mergeTree;
|
||||
SSttBlockLoadInfo* pInfo;
|
||||
int64_t currentKey;
|
||||
} SLastBlockReader;
|
||||
|
||||
typedef struct SFilesetIter {
|
||||
int32_t numOfFiles; // number of total files
|
||||
int32_t index; // current accessed index in the list
|
||||
TFileSetArray* pFilesetList; // data file set list
|
||||
int32_t order;
|
||||
SLastBlockReader* pLastBlockReader; // last file block reader
|
||||
} SFilesetIter;
|
||||
|
||||
typedef struct SFileDataBlockInfo {
|
||||
// index position in STableBlockScanInfo in order to check whether neighbor block overlaps with it
|
||||
uint64_t uid;
|
||||
int32_t tbBlockIdx;
|
||||
SBrinRecord record;
|
||||
} SFileDataBlockInfo;
|
||||
|
||||
typedef struct SDataBlockIter {
|
||||
int32_t numOfBlocks;
|
||||
int32_t index;
|
||||
SArray* blockList; // SArray<SFileDataBlockInfo>
|
||||
int32_t order;
|
||||
SDataBlk block; // current SDataBlk data
|
||||
SSHashObj* pTableMap;
|
||||
} SDataBlockIter;
|
||||
|
||||
typedef struct SFileBlockDumpInfo {
|
||||
int32_t totalRows;
|
||||
int32_t rowIndex;
|
||||
int64_t lastKey;
|
||||
bool allDumped;
|
||||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SReaderStatus {
|
||||
bool loadFromFile; // check file stage
|
||||
bool composedDataBlock; // the returned data block is a composed block or not
|
||||
SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||
STableUidList uidList; // check tables in uid order, to avoid the repeatly load of blocks in STT.
|
||||
SFileBlockDumpInfo fBlockDumpInfo;
|
||||
STFileSet* pCurrentFileset; // current opened file set
|
||||
SBlockData fileBlockData;
|
||||
SFilesetIter fileIter;
|
||||
SDataBlockIter blockIter;
|
||||
SArray* pLDataIterArray;
|
||||
SRowMerger merger;
|
||||
SColumnInfoData* pPrimaryTsCol; // primary time stamp output col info data
|
||||
TFileSetArray* pfSetArray;
|
||||
} SReaderStatus;
|
||||
|
||||
struct STsdbReader {
|
||||
STsdb* pTsdb;
|
||||
STsdbReaderInfo info;
|
||||
TdThreadMutex readerMutex;
|
||||
EReaderStatus flag;
|
||||
int32_t code;
|
||||
uint64_t rowsNum;
|
||||
SResultBlockInfo resBlockInfo;
|
||||
SReaderStatus status;
|
||||
char* idStr; // query info handle, for debug purpose
|
||||
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
|
||||
SBlockLoadSuppInfo suppInfo;
|
||||
STsdbReadSnap* pReadSnap;
|
||||
SCostSummary cost;
|
||||
SHashObj** pIgnoreTables;
|
||||
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
|
||||
SDataFileReader* pFileReader; // the file reader
|
||||
SBlockInfoBuf blockInfoBuf;
|
||||
EContentData step;
|
||||
STsdbReader* innerReader[2];
|
||||
};
|
||||
|
||||
typedef struct SBrinRecordIter {
|
||||
SArray* pBrinBlockList;
|
||||
SBrinBlk* pCurrentBlk;
|
||||
int32_t blockIndex;
|
||||
int32_t recordIndex;
|
||||
SDataFileReader* pReader;
|
||||
SBrinBlock block;
|
||||
SBrinRecord record;
|
||||
} SBrinRecordIter;
|
||||
|
||||
STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id);
|
||||
|
||||
SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
|
||||
STableUidList* pUidList, int32_t numOfTables);
|
||||
void clearBlockScanInfo(STableBlockScanInfo* p);
|
||||
void destroyAllBlockScanInfo(SSHashObj* pTableMap);
|
||||
void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step);
|
||||
void cleanupInfoFoxNextFileset(SSHashObj* pTableMap);
|
||||
int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables);
|
||||
void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf);
|
||||
void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index);
|
||||
|
||||
// brin records iterator
|
||||
void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList);
|
||||
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter);
|
||||
void clearBrinBlockIter(SBrinRecordIter* pIter);
|
||||
|
||||
// initialize block iterator API
|
||||
int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList);
|
||||
bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr);
|
||||
|
||||
// load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time)
|
||||
//int32_t loadSttTombData(SArray* pLDataIterList, uint64_t suid, STableBlockScanInfo* pScanInfo, uint64_t maxVer);
|
||||
void loadMemTombData(STableBlockScanInfo* pScanInfo, STbData* pMemTbData, STbData* piMemTbData, int64_t ver);
|
||||
int32_t loadDataFileTombDataForAll(STsdbReader* pReader);
|
||||
int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif // TDENGINE_TSDBREADUTIL_H
|
Loading…
Reference in New Issue