fix: optimize performance
This commit is contained in:
parent
61571d9296
commit
a87d35be08
|
@ -178,7 +178,7 @@ typedef struct STsdbReader STsdbReader;
|
||||||
|
|
||||||
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
int32_t tsdbSetTableList(STsdbReader *pReader, const void *pTableList, int32_t num);
|
||||||
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
|
||||||
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr);
|
SSDataBlock *pResBlock, STsdbReader **ppReader, const char *idstr, bool countOnly);
|
||||||
|
|
||||||
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
void tsdbReaderSetId(STsdbReader* pReader, const char* idstr);
|
||||||
void tsdbReaderClose(STsdbReader *pReader);
|
void tsdbReaderClose(STsdbReader *pReader);
|
||||||
|
|
|
@ -224,6 +224,8 @@ int32_t tsdbTbDataIterCreate(STbData *pTbData, TSDBKEY *pFrom, int8_t backward,
|
||||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||||
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
void tsdbTbDataIterOpen(STbData *pTbData, TSDBKEY *pFrom, int8_t backward, STbDataIter *pIter);
|
||||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||||
|
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum);
|
||||||
|
|
||||||
// STbData
|
// STbData
|
||||||
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
int32_t tsdbGetNRowsInTbData(STbData *pTbData);
|
||||||
// tsdbFile.c ==============================================================================================
|
// tsdbFile.c ==============================================================================================
|
||||||
|
|
|
@ -282,6 +282,34 @@ bool tsdbTbDataIterNext(STbDataIter *pIter) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tsdbCountTbDataRows(STbData *pTbData) {
|
||||||
|
SMemSkipListNode *pNode = NULL;
|
||||||
|
int64_t rowsNum = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
pNode = SL_GET_NODE_FORWARD(pTbData->sl.pHead, 0);
|
||||||
|
if (pNode == pTbData->sl.pTail) {
|
||||||
|
return rowsNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsNum++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void tsdbMemTableCountRows(SMemTable *pMemTable, SHashObj* pTableMap, int64_t *rowsNum) {
|
||||||
|
taosRLockLatch(&pMemTable->latch);
|
||||||
|
for (int32_t i = 0; i < pMemTable->nBucket; ++i) {
|
||||||
|
STbData *pTbData = pMemTable->aBucket[i];
|
||||||
|
|
||||||
|
void* p = taosHashGet(pTableMap, &pTbData->uid, sizeof(pTbData->uid));
|
||||||
|
if (p == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
rowsNum += tsdbCountTbDataRows(pTbData);
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pMemTable->latch);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
static int32_t tsdbMemTableRehash(SMemTable *pMemTable) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -787,4 +815,4 @@ SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return aTbDataP;
|
return aTbDataP;
|
||||||
}
|
}
|
||||||
|
|
|
@ -3113,7 +3113,7 @@ static int32_t doSumBlockRows(STsdbReader* pReader, SDataFReader* pFileReader) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
|
STableBlockScanInfo** p = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(pBlockIdx->uid));
|
||||||
if (p == NULL || *p == NULL) {
|
if (p == NULL) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3189,6 +3189,10 @@ static int32_t readRowsCountFromStt(STsdbReader* pReader) {
|
||||||
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
taosArrayClear(pBlockLoadInfo->aSttBlk);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
|
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||||
|
pReader->rowsNum += p->nRow;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
SSttBlk *p = taosArrayGet(pBlockLoadInfo->aSttBlk, i);
|
||||||
|
@ -3210,6 +3214,22 @@ static int32_t readRowsCountFromStt(STsdbReader* pReader) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t readRowsCountFromMem(STsdbReader* pReader) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t memNum = 0, imemNum = 0;
|
||||||
|
if (pReader->pReadSnap->pMem != NULL) {
|
||||||
|
tsdbMemTableCountRows(pReader->pReadSnap->pMem, pReader->status.pTableMap, &memNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pReader->pReadSnap->pIMem != NULL) {
|
||||||
|
tsdbMemTableCountRows(pReader->pReadSnap->pIMem, pReader->status.pTableMap, &imemNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
pReader->rowsNum += memNum + imemNum;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
|
||||||
SReaderStatus* pStatus = &pReader->status;
|
SReaderStatus* pStatus = &pReader->status;
|
||||||
|
@ -4170,7 +4190,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
|
||||||
|
|
||||||
// ====================================== EXPOSED APIs ======================================
|
// ====================================== EXPOSED APIs ======================================
|
||||||
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
|
||||||
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr) {
|
SSDataBlock* pResBlock, STsdbReader** ppReader, const char* idstr, bool countOnly) {
|
||||||
STimeWindow window = pCond->twindows;
|
STimeWindow window = pCond->twindows;
|
||||||
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
if (pCond->type == TIMEWINDOW_RANGE_EXTERNAL) {
|
||||||
pCond->twindows.skey += 1;
|
pCond->twindows.skey += 1;
|
||||||
|
@ -4262,9 +4282,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
|
||||||
|
|
||||||
pReader->suspended = true;
|
pReader->suspended = true;
|
||||||
|
|
||||||
|
if (countOnly) {
|
||||||
|
pReader->readMode = READ_MODE_COUNT_ONLY;
|
||||||
|
}
|
||||||
|
|
||||||
pReader->readMode = READ_MODE_COUNT_ONLY;
|
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
tsdbDebug("%p total numOfTable:%d in this query %s", pReader, numOfTables, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
|
@ -4588,6 +4609,11 @@ static bool tsdbReadRowsCountOnly(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = readRowsCountFromMem(pReader);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
pBlock->info.rows = pReader->rowsNum;
|
pBlock->info.rows = pReader->rowsNum;
|
||||||
pBlock->info.id.uid = 0;
|
pBlock->info.id.uid = 0;
|
||||||
pBlock->info.dataLoad = 0;
|
pBlock->info.dataLoad = 0;
|
||||||
|
|
|
@ -339,6 +339,7 @@ typedef struct STableScanInfo {
|
||||||
int8_t scanMode;
|
int8_t scanMode;
|
||||||
int8_t assignBlockUid;
|
int8_t assignBlockUid;
|
||||||
bool hasGroupByTag;
|
bool hasGroupByTag;
|
||||||
|
bool countOnly;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
|
|
|
@ -1140,7 +1140,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
|
int32_t num = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||||
|
|
||||||
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
|
if (tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &pTableScanInfo->base.cond, pList, num,
|
||||||
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL) < 0 ||
|
pTableScanInfo->pResBlock, &pTableScanInfo->base.dataReader, NULL, false) < 0 ||
|
||||||
pTableScanInfo->base.dataReader == NULL) {
|
pTableScanInfo->base.dataReader == NULL) {
|
||||||
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
|
qError("tsdbReaderOpen failed. uid:%" PRIi64, pOffset->uid);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1192,7 +1192,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
|
STableKeyInfo* pList = tableListGetInfo(pTaskInfo->pTableInfoList, 0);
|
||||||
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
|
int32_t size = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||||
|
|
||||||
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL);
|
tsdbReaderOpen(pInfo->vnode, &pTaskInfo->streamInfo.tableCond, pList, size, NULL, &pInfo->dataReader, NULL, false);
|
||||||
|
|
||||||
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond);
|
||||||
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
strcpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName);
|
||||||
|
|
|
@ -31,6 +31,9 @@
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
|
|
||||||
|
int32_t scanDebug = 0;
|
||||||
|
|
||||||
|
|
||||||
#define MULTI_READER_MAX_TABLE_NUM 5000
|
#define MULTI_READER_MAX_TABLE_NUM 5000
|
||||||
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN)
|
||||||
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
|
||||||
|
@ -785,7 +788,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
ASSERT(pInfo->base.dataReader == NULL);
|
ASSERT(pInfo->base.dataReader == NULL);
|
||||||
|
|
||||||
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||||
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
@ -916,6 +919,10 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (scanDebug) {
|
||||||
|
pInfo->countOnly = true;
|
||||||
|
}
|
||||||
|
|
||||||
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
|
||||||
optrDefaultBufFn, getTableScannerExecInfo);
|
optrDefaultBufFn, getTableScannerExecInfo);
|
||||||
|
@ -1007,7 +1014,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
STsdbReader* pReader = NULL;
|
STsdbReader* pReader = NULL;
|
||||||
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
||||||
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
|
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo), false);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
@ -2601,7 +2608,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||||
|
|
||||||
if (NULL == source->dataReader || !source->multiReader) {
|
if (NULL == source->dataReader || !source->multiReader) {
|
||||||
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo));
|
code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &source->dataReader, GET_TASKID(pTaskInfo), false);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -2267,7 +2267,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
|
||||||
size_t num = tableListGetSize(pTableListInfo);
|
size_t num = tableListGetSize(pTableListInfo);
|
||||||
void* pList = tableListGetInfo(pTableListInfo, 0);
|
void* pList = tableListGetInfo(pTableListInfo, 0);
|
||||||
|
|
||||||
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str);
|
code = tsdbReaderOpen(readHandle->vnode, &cond, pList, num, pInfo->pResBlock, &pInfo->pHandle, pTaskInfo->id.str, false);
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
Loading…
Reference in New Issue