refactor: opt query perf.
This commit is contained in:
parent
fd6ea6ba2f
commit
edca89b731
|
@ -130,17 +130,17 @@ typedef struct SFileBlockDumpInfo {
|
|||
bool allDumped;
|
||||
} SFileBlockDumpInfo;
|
||||
|
||||
typedef struct SUidOrderCheckInfo {
|
||||
typedef struct SUidOrderedList {
|
||||
uint64_t* tableUidList; // access table uid list in uid ascending order list
|
||||
int32_t currentIndex; // index in table uid list
|
||||
} SUidOrderCheckInfo;
|
||||
} SUidOrderedList;
|
||||
|
||||
typedef struct SReaderStatus {
|
||||
bool loadFromFile; // check file stage
|
||||
bool composedDataBlock; // the returned data block is a composed block or not
|
||||
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||
STableBlockScanInfo** pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
|
||||
SUidOrderedList uidCheckInfo; // check all table in uid order
|
||||
SFileBlockDumpInfo fBlockDumpInfo;
|
||||
SDFileSet* pCurrentFileset; // current opened file set
|
||||
SBlockData fileBlockData;
|
||||
|
@ -311,6 +311,16 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
|
|||
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
|
||||
}
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
|
||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||
// allocate buffer in order to load data blocks from file
|
||||
|
@ -324,9 +334,20 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
|
|||
int64_t st = taosGetTimestampUs();
|
||||
initBlockScanInfoBuf(pBuf, numOfTables);
|
||||
|
||||
SUidOrderedList* pOrderedCheckInfo = &pTsdbReader->status.uidCheckInfo;
|
||||
|
||||
pOrderedCheckInfo->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t));
|
||||
if (pOrderedCheckInfo->tableUidList == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
pOrderedCheckInfo->currentIndex = 0;
|
||||
|
||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j);
|
||||
|
||||
pScanInfo->uid = idList[j].uid;
|
||||
pOrderedCheckInfo->tableUidList[j] = idList[j].uid;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||
int64_t skey = pTsdbReader->window.skey;
|
||||
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||
|
@ -340,6 +361,8 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf
|
|||
pScanInfo->lastKey, pTsdbReader->idStr);
|
||||
}
|
||||
|
||||
taosSort(pOrderedCheckInfo->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc);
|
||||
|
||||
pTsdbReader->cost.createScanInfoList = (taosGetTimestampUs() - st) / 1000.0;
|
||||
tsdbDebug("%p create %d tables scan-info, size:%.2f Kb, elapsed time:%.2f ms, %s", pTsdbReader, numOfTables,
|
||||
(sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList,
|
||||
|
@ -663,28 +686,42 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
|||
int64_t et1 = taosGetTimestampUs();
|
||||
|
||||
SBlockIdx* pBlockIdx = NULL;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SUidOrderedList* pList = &pReader->status.uidCheckInfo;
|
||||
|
||||
int32_t i = 0, j = 0;
|
||||
while(i < num && j < numOfTables) {
|
||||
pBlockIdx = (SBlockIdx*)taosArrayGet(aBlockIdx, i);
|
||||
|
||||
// uid check
|
||||
if (pBlockIdx->suid != pReader->suid) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// this block belongs to a table that is not queried.
|
||||
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
if (pBlockIdx->uid < pList->tableUidList[j]) {
|
||||
i += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
|
||||
if (pBlockIdx->uid == pList->tableUidList[j]) {
|
||||
i += 1;
|
||||
j += 1;
|
||||
|
||||
// this block belongs to a table that is not queried.
|
||||
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
tsdbError("failed to locate the tableBlockScan Info in hashmap, uid:%"PRIu64", %s", pBlockIdx->uid, pReader->idStr);
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
}
|
||||
|
||||
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)p;
|
||||
if (pScanInfo->pBlockList == NULL) {
|
||||
pScanInfo->pBlockList = taosArrayInit(4, sizeof(SBlockIndex));
|
||||
}
|
||||
|
||||
taosArrayPush(pIndexList, pBlockIdx);
|
||||
}
|
||||
|
||||
taosArrayPush(pIndexList, pBlockIdx);
|
||||
if (taosArrayGetSize(pIndexList) == numOfTables) {
|
||||
break;
|
||||
if (pBlockIdx->uid > pList->tableUidList[j]) {
|
||||
j += 1;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2682,17 +2719,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t uidComparFunc(const void* p1, const void* p2) {
|
||||
uint64_t pu1 = *(uint64_t*)p1;
|
||||
uint64_t pu2 = *(uint64_t*)p2;
|
||||
if (pu1 == pu2) {
|
||||
return 0;
|
||||
} else {
|
||||
return (pu1 < pu2) ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
|
||||
static void extractOrderedTableUidList(SUidOrderedList* pOrderCheckInfo, SReaderStatus* pStatus, int32_t order) {
|
||||
int32_t index = 0;
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
|
||||
|
@ -2718,7 +2745,7 @@ static void resetScanBlockLastBlockDelIndex(SReaderStatus* pStatus, int32_t orde
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbReader* pReader) {
|
||||
static int32_t initOrderCheckInfo(SUidOrderedList* pOrderCheckInfo, STsdbReader* pReader) {
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
int32_t total = taosHashGetSize(pStatus->pTableMap);
|
||||
|
@ -2742,7 +2769,7 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbRead
|
|||
uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
|
||||
// the tableMap has already updated
|
||||
// the tableMap has already updated, let's also update the order list
|
||||
if (pStatus->pTableIter == NULL) {
|
||||
void* p = taosMemoryRealloc(pOrderCheckInfo->tableUidList, total * sizeof(uint64_t));
|
||||
if (p == NULL) {
|
||||
|
@ -2761,7 +2788,15 @@ static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, STsdbRead
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
||||
static void resetTableListIndex(SReaderStatus *pStatus) {
|
||||
SUidOrderedList* pList = &pStatus->uidCheckInfo;
|
||||
|
||||
pList->currentIndex = 0;
|
||||
uint64_t uid = pList->tableUidList[0];
|
||||
pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid));
|
||||
}
|
||||
|
||||
static bool moveToNextTable(SUidOrderedList* pOrderedCheckInfo, SReaderStatus* pStatus) {
|
||||
pOrderedCheckInfo->currentIndex += 1;
|
||||
if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) {
|
||||
pStatus->pTableIter = NULL;
|
||||
|
@ -2777,10 +2812,9 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
SReaderStatus* pStatus = &pReader->status;
|
||||
SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader;
|
||||
|
||||
SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
||||
int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pReader);
|
||||
if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) {
|
||||
return code;
|
||||
SUidOrderedList* pOrderedCheckInfo = &pStatus->uidCheckInfo;
|
||||
if (taosHashGetSize(pStatus->pTableMap) == 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SSDataBlock* pResBlock = pReader->pResBlock;
|
||||
|
@ -3028,6 +3062,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl
|
|||
} else { // no block data, only last block exists
|
||||
tBlockDataReset(&pReader->status.fileBlockData);
|
||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||
resetTableListIndex(&pReader->status);
|
||||
}
|
||||
|
||||
// set the correct start position according to the query time window
|
||||
|
@ -3069,6 +3104,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
// this file does not have data files, let's start check the last block file if exists
|
||||
if (pBlockIter->numOfBlocks == 0) {
|
||||
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
|
||||
resetTableListIndex(&pReader->status);
|
||||
goto _begin;
|
||||
}
|
||||
}
|
||||
|
@ -3101,6 +3137,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
tBlockDataReset(&pReader->status.fileBlockData);
|
||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
|
||||
resetTableListIndex(&pReader->status);
|
||||
goto _begin;
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
|
@ -3113,6 +3150,7 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) {
|
|||
// this file does not have blocks, let's start check the last block file
|
||||
if (pBlockIter->numOfBlocks == 0) {
|
||||
resetScanBlockLastBlockDelIndex(&pReader->status, pReader->order);
|
||||
resetTableListIndex(&pReader->status);
|
||||
goto _begin;
|
||||
}
|
||||
}
|
||||
|
@ -3774,11 +3812,15 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
|||
ASSERT(size >= num);
|
||||
|
||||
taosHashClear(pReader->status.pTableMap);
|
||||
SUidOrderedList* pUidList = &pReader->status.uidCheckInfo;
|
||||
pUidList->currentIndex = 0;
|
||||
|
||||
STableKeyInfo* pList = (STableKeyInfo*)pTableList;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i);
|
||||
pInfo->uid = pList[i].uid;
|
||||
pUidList->tableUidList[i] = pList[i].uid;
|
||||
|
||||
taosHashPut(pReader->status.pTableMap, &pInfo->uid, sizeof(uint64_t), &pInfo, POINTER_BYTES);
|
||||
}
|
||||
|
||||
|
@ -3825,13 +3867,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
|||
pCond->twindows.ekey -= 1;
|
||||
}
|
||||
|
||||
int32_t capacity = 0;
|
||||
if (pResBlock == NULL) {
|
||||
capacity = 4096;
|
||||
} else {
|
||||
capacity = pResBlock->info.capacity;
|
||||
}
|
||||
|
||||
int32_t capacity = (pResBlock == NULL)? 4096:pResBlock->info.capacity;
|
||||
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _err;
|
||||
|
@ -4304,12 +4340,14 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
pReader->order = pCond->order;
|
||||
pReader->type = TIMEWINDOW_RANGE_CONTAINED;
|
||||
pReader->status.loadFromFile = true;
|
||||
pReader->status.pTableIter = NULL;
|
||||
pStatus->loadFromFile = true;
|
||||
pStatus->pTableIter = NULL;
|
||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||
|
||||
// allocate buffer in order to load data blocks from file
|
||||
|
@ -4318,19 +4356,20 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
|||
pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||
|
||||
int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||
int32_t numOfTables = taosHashGetSize(pStatus->pTableMap);
|
||||
|
||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||
initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||
resetDataBlockIterator(pBlockIter, pReader->order);
|
||||
resetTableListIndex(&pReader->status);
|
||||
|
||||
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
||||
resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
|
||||
resetAllDataBlockScanInfo(pStatus->pTableMap, ts);
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
// no data in files, let's try buffer in memory
|
||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
||||
pReader->status.loadFromFile = false;
|
||||
if (pStatus->fileIter.numOfFiles == 0) {
|
||||
pStatus->loadFromFile = false;
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -4408,7 +4447,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader* pReader, STableBlockDistInfo* pTa
|
|||
hasNext = blockIteratorNext(&pStatus->blockIter, pReader->idStr);
|
||||
} else {
|
||||
code = initForFirstBlockInFile(pReader, pBlockIter);
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) {
|
||||
if ((code != TSDB_CODE_SUCCESS) || (pStatus->loadFromFile == false)) {
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue