fix(tsdb/read): new tsdbReleaseDataBlock api from release reader's lock
This commit is contained in:
parent
d3c97947f3
commit
e4e72dd9e9
|
@ -176,6 +176,7 @@ void tsdbReaderClose(STsdbReader *pReader);
|
||||||
bool tsdbNextDataBlock(STsdbReader *pReader);
|
bool tsdbNextDataBlock(STsdbReader *pReader);
|
||||||
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
void tsdbRetrieveDataBlockInfo(const STsdbReader *pReader, int32_t *rows, uint64_t *uid, STimeWindow *pWindow);
|
||||||
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave);
|
||||||
|
void tsdbReleaseDataBlock(STsdbReader *pReader);
|
||||||
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
SSDataBlock *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
|
|
|
@ -4514,13 +4514,18 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
int64_t rows = 0;
|
int64_t rows = 0;
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
taosThreadMutexLock(&pReader->readerMutex);
|
||||||
|
if (pReader->suspended) {
|
||||||
|
tsdbReaderResume(pReader);
|
||||||
|
}
|
||||||
|
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL);
|
||||||
|
|
||||||
while (pStatus->pTableIter != NULL) {
|
while (pStatus->pTableIter != NULL) {
|
||||||
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
|
||||||
|
|
||||||
STbData* d = NULL;
|
STbData* d = NULL;
|
||||||
if (pReader->pTsdb->mem != NULL) {
|
if (pReader->pReadSnap->pMem != NULL) {
|
||||||
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
|
d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid);
|
||||||
if (d != NULL) {
|
if (d != NULL) {
|
||||||
rows += tsdbGetNRowsInTbData(d);
|
rows += tsdbGetNRowsInTbData(d);
|
||||||
|
@ -4528,7 +4533,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* di = NULL;
|
STbData* di = NULL;
|
||||||
if (pReader->pTsdb->imem != NULL) {
|
if (pReader->pReadSnap->pIMem != NULL) {
|
||||||
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
|
di = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pIMem, pReader->suid, pBlockScanInfo->uid);
|
||||||
if (di != NULL) {
|
if (di != NULL) {
|
||||||
rows += tsdbGetNRowsInTbData(di);
|
rows += tsdbGetNRowsInTbData(di);
|
||||||
|
@ -4538,7 +4543,7 @@ int64_t tsdbGetNumOfRowsInMemTable(STsdbReader* pReader) {
|
||||||
// current table is exhausted, let's try the next table
|
// current table is exhausted, let's try the next table
|
||||||
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter);
|
||||||
}
|
}
|
||||||
|
tsdbReleaseDataBlock(pReader);
|
||||||
return rows;
|
return rows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -302,12 +302,14 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
pCost->filterOutBlocks += 1;
|
pCost->filterOutBlocks += 1;
|
||||||
pCost->totalRows += pBlock->info.rows;
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo, 1);
|
||||||
pCost->skipBlocks += 1;
|
pCost->skipBlocks += 1;
|
||||||
|
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_SMA_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
|
@ -352,7 +354,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca
|
||||||
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block skipped due to dynamic prune, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
pCost->skipBlocks += 1;
|
pCost->skipBlocks += 1;
|
||||||
|
tsdbReleaseDataBlock(pTableScanInfo->dataReader);
|
||||||
*status = FUNC_DATA_REQUIRED_FILTEROUT;
|
*status = FUNC_DATA_REQUIRED_FILTEROUT;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue