fix(query): optimize the performance of tsdbread.
This commit is contained in:
parent
cff5d9295e
commit
68db9b1cc9
|
@ -132,7 +132,7 @@ typedef struct SReaderStatus {
|
||||||
bool loadFromFile; // check file stage
|
bool loadFromFile; // check file stage
|
||||||
bool composedDataBlock; // the returned data block is a composed block or not
|
bool composedDataBlock; // the returned data block is a composed block or not
|
||||||
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
SHashObj* pTableMap; // SHash<STableBlockScanInfo>
|
||||||
STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks.
|
STableBlockScanInfo**pTableIter; // table iterator used in building in-memory buffer data blocks.
|
||||||
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
|
SUidOrderCheckInfo uidCheckInfo; // check all table in uid order
|
||||||
SFileBlockDumpInfo fBlockDumpInfo;
|
SFileBlockDumpInfo fBlockDumpInfo;
|
||||||
SDFileSet* pCurrentFileset; // current opened file set
|
SDFileSet* pCurrentFileset; // current opened file set
|
||||||
|
@ -141,6 +141,12 @@ typedef struct SReaderStatus {
|
||||||
SDataBlockIter blockIter;
|
SDataBlockIter blockIter;
|
||||||
} SReaderStatus;
|
} SReaderStatus;
|
||||||
|
|
||||||
|
typedef struct SBlockInfoBuf {
|
||||||
|
int32_t currentIndex;
|
||||||
|
SArray* pData;
|
||||||
|
int32_t numPerBucket;
|
||||||
|
} SBlockInfoBuf;
|
||||||
|
|
||||||
struct STsdbReader {
|
struct STsdbReader {
|
||||||
STsdb* pTsdb;
|
STsdb* pTsdb;
|
||||||
uint64_t suid;
|
uint64_t suid;
|
||||||
|
@ -158,9 +164,9 @@ struct STsdbReader {
|
||||||
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
STSchema* pMemSchema; // the previous schema for in-memory data, to avoid load schema too many times
|
||||||
SDataFReader* pFileReader;
|
SDataFReader* pFileReader;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
|
SBlockInfoBuf blockInfoBuf;
|
||||||
int32_t step;
|
int32_t step;
|
||||||
STsdbReader* innerReader[2];
|
STsdbReader* innerReader[2];
|
||||||
};
|
};
|
||||||
|
|
||||||
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter);
|
||||||
|
@ -226,6 +232,50 @@ static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||||
|
int32_t num = numOfTables / pBuf->numPerBucket;
|
||||||
|
int32_t remainder = numOfTables % pBuf->numPerBucket;
|
||||||
|
if (pBuf->pData == NULL) {
|
||||||
|
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
|
||||||
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pBuf->pData, &p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remainder > 0) {
|
||||||
|
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
|
||||||
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
taosArrayPush(pBuf->pData, &p);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) {
|
||||||
|
size_t num = taosArrayGetSize(pBuf->pData);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
char** p = taosArrayGet(pBuf->pData, i);
|
||||||
|
taosMemoryFree(*p);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pBuf->pData);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
|
||||||
|
int32_t bucketIndex = index / pBuf->numPerBucket;
|
||||||
|
char** pBucket = taosArrayGet(pBuf->pData, bucketIndex);
|
||||||
|
return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
|
||||||
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
|
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) {
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
// todo use simple hash instead, optimize the memory consumption
|
// todo use simple hash instead, optimize the memory consumption
|
||||||
|
@ -236,9 +286,23 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables);
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfTables; ++j) {
|
for (int32_t j = 0; j < numOfTables; ++j) {
|
||||||
STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j);
|
||||||
|
pScanInfo->uid = idList[j].uid;
|
||||||
|
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||||
|
int64_t skey = pTsdbReader->window.skey;
|
||||||
|
pScanInfo->lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||||
|
} else {
|
||||||
|
int64_t ekey = pTsdbReader->window.ekey;
|
||||||
|
pScanInfo->lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid};
|
||||||
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
if (ASCENDING_TRAVERSE(pTsdbReader->order)) {
|
||||||
int64_t skey = pTsdbReader->window.skey;
|
int64_t skey = pTsdbReader->window.skey;
|
||||||
info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey;
|
||||||
|
@ -248,7 +312,9 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info));
|
||||||
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, info.uid, info.lastKey,
|
#endif
|
||||||
|
|
||||||
|
tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReader, pScanInfo->uid, pScanInfo->lastKey,
|
||||||
pTsdbReader->idStr);
|
pTsdbReader->idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -260,18 +326,20 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK
|
||||||
return pTableMap;
|
return pTableMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
static void resetAllDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) {
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
|
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
p->iterInit = false;
|
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**) p;
|
||||||
p->iiter.hasVal = false;
|
|
||||||
if (p->iter.iter != NULL) {
|
pInfo->iterInit = false;
|
||||||
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
pInfo->iiter.hasVal = false;
|
||||||
|
if (pInfo->iter.iter != NULL) {
|
||||||
|
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
|
||||||
p->lastKey = ts;
|
pInfo->lastKey = ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -292,10 +360,10 @@ static void clearBlockScanInfo(STableBlockScanInfo* p) {
|
||||||
tMapDataClear(&p->mapData);
|
tMapDataClear(&p->mapData);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
static void destroyAllBlockScanInfo(SHashObj* pTableMap) {
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo* p = NULL;
|
||||||
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pTableMap, p)) != NULL) {
|
||||||
clearBlockScanInfo(p);
|
clearBlockScanInfo(*(STableBlockScanInfo**)p);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pTableMap);
|
taosHashCleanup(pTableMap);
|
||||||
|
@ -500,7 +568,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd
|
||||||
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
pReader->verRange = getQueryVerRange(pVnode, pCond, level);
|
||||||
pReader->type = pCond->type;
|
pReader->type = pCond->type;
|
||||||
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows);
|
||||||
|
pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket
|
||||||
ASSERT(pCond->numOfCols > 0);
|
ASSERT(pCond->numOfCols > 0);
|
||||||
|
|
||||||
limitOutputBufferSize(pCond, &pReader->capacity);
|
limitOutputBufferSize(pCond, &pReader->capacity);
|
||||||
|
@ -566,7 +634,7 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFReader* pFileReader,
|
||||||
}
|
}
|
||||||
|
|
||||||
// this block belongs to a table that is not queried.
|
// this block belongs to a table that is not queried.
|
||||||
void* p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
|
void* p = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(uint64_t));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -591,7 +659,7 @@ _end:
|
||||||
}
|
}
|
||||||
|
|
||||||
static void cleanupTableScanInfo(SHashObj* pTableMap) {
|
static void cleanupTableScanInfo(SHashObj* pTableMap) {
|
||||||
STableBlockScanInfo* px = NULL;
|
STableBlockScanInfo** px = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
px = taosHashIterate(pTableMap, px);
|
px = taosHashIterate(pTableMap, px);
|
||||||
if (px == NULL) {
|
if (px == NULL) {
|
||||||
|
@ -599,8 +667,8 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset the index in last block when handing a new file
|
// reset the index in last block when handing a new file
|
||||||
tMapDataClear(&px->mapData);
|
tMapDataClear(&(*px)->mapData);
|
||||||
taosArrayClear(px->pBlockList);
|
taosArrayClear((*px)->pBlockList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1101,7 +1169,7 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableBlockScanInfo* pTableScanInfo = (STableBlockScanInfo*)ptr;
|
STableBlockScanInfo* pTableScanInfo = *(STableBlockScanInfo**)ptr;
|
||||||
if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
|
if (pTableScanInfo->pBlockList == NULL || taosArrayGetSize(pTableScanInfo->pBlockList) == 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2233,7 +2301,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
|
|
||||||
STableBlockScanInfo* pBlockScanInfo = NULL;
|
STableBlockScanInfo* pBlockScanInfo = NULL;
|
||||||
if (pBlockInfo != NULL) {
|
if (pBlockInfo != NULL) {
|
||||||
pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
if (pBlockScanInfo == NULL) {
|
if (pBlockScanInfo == NULL) {
|
||||||
code = TSDB_CODE_INVALID_PARA;
|
code = TSDB_CODE_INVALID_PARA;
|
||||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||||
|
@ -2253,7 +2321,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // file blocks not exist
|
} else { // file blocks not exist
|
||||||
pBlockScanInfo = pReader->status.pTableIter;
|
pBlockScanInfo = *pReader->status.pTableIter;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
@ -2478,7 +2546,7 @@ static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SRea
|
||||||
|
|
||||||
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
void* p = taosHashIterate(pStatus->pTableMap, NULL);
|
||||||
while (p != NULL) {
|
while (p != NULL) {
|
||||||
STableBlockScanInfo* pScanInfo = p;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) p;
|
||||||
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
|
pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid;
|
||||||
p = taosHashIterate(pStatus->pTableMap, p);
|
p = taosHashIterate(pStatus->pTableMap, p);
|
||||||
}
|
}
|
||||||
|
@ -2552,7 +2620,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// load the last data block of current table
|
// load the last data block of current table
|
||||||
STableBlockScanInfo* pScanInfo = pStatus->pTableIter;
|
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**) pStatus->pTableIter;
|
||||||
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus);
|
||||||
|
@ -2590,9 +2658,9 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||||
|
|
||||||
if (pBlockInfo != NULL) {
|
if (pBlockInfo != NULL) {
|
||||||
pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
pScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
} else {
|
} else {
|
||||||
pScanInfo = pReader->status.pTableIter;
|
pScanInfo = *pReader->status.pTableIter;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pScanInfo == NULL) {
|
if (pScanInfo == NULL) {
|
||||||
|
@ -2657,11 +2725,11 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
|
STableBlockScanInfo** pBlockScanInfo = pStatus->pTableIter;
|
||||||
initMemDataIterator(pBlockScanInfo, pReader);
|
initMemDataIterator(*pBlockScanInfo, pReader);
|
||||||
|
|
||||||
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
|
int64_t endKey = (ASCENDING_TRAVERSE(pReader->order)) ? INT64_MAX : INT64_MIN;
|
||||||
int32_t code = buildDataBlockFromBuf(pReader, pBlockScanInfo, endKey);
|
int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3464,9 +3532,9 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
|
||||||
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
|
int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t num) {
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
|
|
||||||
STableBlockScanInfo* p = NULL;
|
STableBlockScanInfo** p = NULL;
|
||||||
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
|
while ((p = taosHashIterate(pReader->status.pTableMap, p)) != NULL) {
|
||||||
clearBlockScanInfo(p);
|
clearBlockScanInfo(*p);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
|
@ -3501,7 +3569,6 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
|
|
||||||
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader);
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
// resetDataBlockScanInfo(pReader->status.pTableMap, pReader->window.skey);
|
|
||||||
|
|
||||||
// no data in files, let's try buffer in memory
|
// no data in files, let's try buffer in memory
|
||||||
if (pReader->status.fileIter.numOfFiles == 0) {
|
if (pReader->status.fileIter.numOfFiles == 0) {
|
||||||
|
@ -3679,8 +3746,9 @@ void tsdbReaderClose(STsdbReader* pReader) {
|
||||||
cleanupDataBlockIterator(&pReader->status.blockIter);
|
cleanupDataBlockIterator(&pReader->status.blockIter);
|
||||||
|
|
||||||
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
|
||||||
destroyBlockScanInfo(pReader->status.pTableMap);
|
destroyAllBlockScanInfo(pReader->status.pTableMap);
|
||||||
blockDataDestroy(pReader->pResBlock);
|
blockDataDestroy(pReader->pResBlock);
|
||||||
|
clearBlockScanInfoBuf(&pReader->blockInfoBuf);
|
||||||
|
|
||||||
if (pReader->pFileReader != NULL) {
|
if (pReader->pFileReader != NULL) {
|
||||||
tsdbDataFReaderClose(&pReader->pFileReader);
|
tsdbDataFReaderClose(&pReader->pFileReader);
|
||||||
|
@ -3764,7 +3832,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
if (pReader->step == EXTERNAL_ROWS_PREV) {
|
||||||
// prepare for the main scan
|
// prepare for the main scan
|
||||||
int32_t code = doOpenReaderImpl(pReader);
|
int32_t code = doOpenReaderImpl(pReader);
|
||||||
resetDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
resetAllDataBlockScanInfo(pReader->status.pTableMap, pReader->innerReader[0]->window.ekey);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -3781,7 +3849,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
|
if (pReader->innerReader[1] != NULL && pReader->step == EXTERNAL_ROWS_MAIN) {
|
||||||
// prepare for the next row scan
|
// prepare for the next row scan
|
||||||
int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
|
int32_t code = doOpenReaderImpl(pReader->innerReader[1]);
|
||||||
resetDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
|
resetAllDataBlockScanInfo(pReader->innerReader[1]->status.pTableMap, pReader->window.ekey);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3797,7 +3865,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
|
bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) {
|
||||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
|
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid));
|
||||||
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
|
if (pBlockScanInfo == NULL) { // no data block for the table of given uid
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3912,7 +3980,7 @@ static SArray* doRetrieveDataBlock(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter);
|
||||||
STableBlockScanInfo* pBlockScanInfo = taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)taosHashGet(pStatus->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
|
||||||
if (pBlockScanInfo == NULL) {
|
if (pBlockScanInfo == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
|
||||||
|
@ -3967,7 +4035,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
resetDataBlockIterator(&pReader->status.blockIter, pReader->order);
|
||||||
|
|
||||||
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1;
|
||||||
resetDataBlockScanInfo(pReader->status.pTableMap, ts);
|
resetAllDataBlockScanInfo(pReader->status.pTableMap, ts);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
SDataBlockIter* pBlockIter = &pReader->status.blockIter;
|
||||||
|
@ -4072,7 +4140,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
||||||
|
|
||||||
while (pStatus->pTableIter != NULL) {
|
while (pStatus->pTableIter != NULL) {
|
||||||
STableBlockScanInfo* pBlockScanInfo = pStatus->pTableIter;
|
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
|
|
||||||
STbData* d = NULL;
|
STbData* d = NULL;
|
||||||
if (pReader->pTsdb->mem != NULL) {
|
if (pReader->pTsdb->mem != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue