fix(tsdb/read): use int32_t instead of size_t for taosHashGetSize

This commit is contained in:
Minglei Jin 2022-12-29 18:21:38 +08:00
parent 2ed9c19da4
commit d3c97947f3
1 changed files with 20 additions and 15 deletions

View File

@ -2902,14 +2902,15 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
// set the correct start position in case of the first/last file block, according to the query time window // set the correct start position in case of the first/last file block, according to the query time window
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
SDataBlk* pBlock = getCurrentBlock(pBlockIter); int64_t lastKey = ASCENDING_TRAVERSE(pReader->order) ? INT64_MIN : INT64_MAX;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SDataBlk* pBlock = getCurrentBlock(pBlockIter);
STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid)); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
if (pScanInfo == NULL) { if (pBlockInfo) {
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list", pBlockInfo->uid); STableBlockScanInfo* pScanInfo = taosHashGet(pBlockIter->pTableMap, &pBlockInfo->uid, sizeof(pBlockInfo->uid));
return; if (pScanInfo) {
lastKey = pScanInfo->lastKey;
}
} }
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo;
@ -2917,7 +2918,7 @@ static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter)
pDumpInfo->totalRows = pBlock->nRow; pDumpInfo->totalRows = pBlock->nRow;
pDumpInfo->allDumped = false; pDumpInfo->allDumped = false;
pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->order) ? 0 : pBlock->nRow - 1;
pDumpInfo->lastKey = pScanInfo->lastKey; pDumpInfo->lastKey = lastKey;
} }
static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) { static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBlockIter) {
@ -4058,11 +4059,11 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) {
int32_t tsdbReaderResume(STsdbReader* pReader) { int32_t tsdbReaderResume(STsdbReader* pReader) {
int32_t code = 0; int32_t code = 0;
STableBlockScanInfo* pBlockScanInfo = *pReader->status.pTableIter; STableBlockScanInfo** pBlockScanInfo = pReader->status.pTableIter;
// restore reader's state // restore reader's state
// task snapshot // task snapshot
size_t numOfTables = taosHashGetSize(pReader->status.pTableMap); int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap);
if (numOfTables > 0) { if (numOfTables > 0) {
code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); code = tsdbTakeReadSnap(pReader, tsdbSetQueryReseek, &pReader->pReadSnap);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -4100,8 +4101,8 @@ int32_t tsdbReaderResume(STsdbReader* pReader) {
pReader->suspended = false; pReader->suspended = false;
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRIu64 ", in this query %s", pReader, tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
pBlockScanInfo ? pBlockScanInfo->uid : 0, numOfTables, pReader->idStr); pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
return code; return code;
_err: _err:
@ -4201,9 +4202,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) {
} }
} }
if (pStatus->composedDataBlock) { taosThreadMutexUnlock(&pReader->readerMutex);
taosThreadMutexUnlock(&pReader->readerMutex);
}
return false; return false;
} }
@ -4335,6 +4334,8 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock,
*pBlockSMA = pResBlock->pBlockAgg; *pBlockSMA = pResBlock->pBlockAgg;
pReader->cost.smaDataLoad += 1; pReader->cost.smaDataLoad += 1;
taosThreadMutexUnlock(&pReader->readerMutex);
tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr); tsdbDebug("vgId:%d, succeed to load block SMA for uid %" PRIu64 ", %s", 0, pFBlock->uid, pReader->idStr);
return code; return code;
} }
@ -4353,6 +4354,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid, tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", pBlockInfo->uid,
taosHashGetSize(pReader->status.pTableMap), pReader->idStr); taosHashGetSize(pReader->status.pTableMap), pReader->idStr);
taosThreadMutexUnlock(&pReader->readerMutex);
return NULL; return NULL;
} }
@ -4360,6 +4362,7 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tBlockDataDestroy(&pStatus->fileBlockData); tBlockDataDestroy(&pStatus->fileBlockData);
terrno = code; terrno = code;
taosThreadMutexUnlock(&pReader->readerMutex);
return NULL; return NULL;
} }
@ -4369,6 +4372,8 @@ static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) {
return pReader->pResBlock; return pReader->pResBlock;
} }
void tsdbReleaseDataBlock(STsdbReader* pReader) { taosThreadMutexUnlock(&pReader->readerMutex); }
SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) { SSDataBlock* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) {
if (pReader->step == EXTERNAL_ROWS_PREV) { if (pReader->step == EXTERNAL_ROWS_PREV) {