From 6a978be730f04f86614aa83397e725e2212766f2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 27 Oct 2022 20:38:57 +0800 Subject: [PATCH 1/7] opt mem --- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 32 +- source/libs/executor/src/scanoperator.c | 376 ++++++++++++++++++++++-- 4 files changed, 376 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index d200ed56a5..7996498534 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -157,6 +157,7 @@ int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTab const char *idstr); void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); +bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); void tsdbRetrieveDataBlockInfo(STsdbReader *pReader, SDataBlockInfo *pDataBlockInfo); int32_t tsdbRetrieveDatablockSMA(STsdbReader *pReader, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 98de107ad8..1906e59755 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3775,7 +3775,7 @@ bool tsdbNextDataBlock(STsdbReader* pReader) { return false; } -bool tsdbTableNextDataBlock(STsdbReader* pReader, uint64_t uid) { +bool tsdbTableNextDataBlock(STsdbReader* pReader, int64_t uid) { STableBlockScanInfo* pBlockScanInfo = taosHashGet(pReader->status.pTableMap, &uid, sizeof(uid)); if (pBlockScanInfo == NULL) { // no data block for the table of given uid return false; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1d9f26b7ed..8769e8ac2f 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -341,21 +341,23 @@ typedef struct STableScanInfo { } STableScanInfo; typedef struct STableMergeScanInfo { - STableListInfo* tableListInfo; - int32_t tableStartIndex; - int32_t tableEndIndex; - bool hasGroupId; - uint64_t groupId; - SArray* dataReaders; // array of tsdbReaderT* - SReadHandle readHandle; - int32_t bufPageSize; - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray* pSortInfo; - SSortHandle* pSortHandle; - SSDataBlock* pSortInputBlock; - int64_t startTs; // sort start time - SArray* sortSourceParams; - SLimitInfo limitInfo; + STableListInfo* tableListInfo; + int32_t tableStartIndex; + int32_t tableEndIndex; + bool hasGroupId; + uint64_t groupId; + SArray* dataReaders; // array of tsdbReaderT* + SArray* queryConds; // array of queryTableDataCond + STsdbReader* pReader; + SReadHandle readHandle; + int32_t bufPageSize; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* pSortInfo; + SSortHandle* pSortHandle; + SSDataBlock* pSortInputBlock; + int64_t startTs; // sort start time + SArray* sortSourceParams; + SLimitInfo limitInfo; SFileBlockLoadRecorder readRecorder; int64_t numOfRows; SScanInfo scanInfo; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 76035a65ae..c0db9b1fa3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -42,7 +42,7 @@ static int32_t buildDbTableInfoBlock(bool sysInfo, const SSDataBlock* p, const S static char* SYSTABLE_IDX_COLUMN[] = {"table_name", "db_name", "create_time", "columns", "ttl", "stable_name", "vgroup_id', 'uid", "type"}; -static char* SYSTABLE_IDX_EXCEPT[] = {"db_name", "vgroup_id"}; +static char* SYSTABLE_SPECIAL_COL[] = {"db_name", "vgroup_id"}; typedef int32_t (*__sys_filte)(void* pMeta, SNode* cond, SArray* result); typedef int32_t (*__sys_check)(SNode* cond); @@ -363,7 +363,8 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo if (pLimitInfo->remainOffset >= pBlock->info.rows) { pLimitInfo->remainOffset -= pBlock->info.rows; pBlock->info.rows = 0; - qDebug("current block ignore due to offset, current:%"PRId64", %s", pLimitInfo->remainOffset, GET_TASKID(pTaskInfo)); + qDebug("current block ignore due to offset, current:%" PRId64 ", %s", pLimitInfo->remainOffset, + GET_TASKID(pTaskInfo)); } else { blockDataTrimFirstNRows(pBlock, pLimitInfo->remainOffset); pLimitInfo->remainOffset = 0; @@ -376,9 +377,9 @@ void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo int32_t keep = pBlock->info.rows - overflowRows; blockDataKeepFirstNRows(pBlock, keep); - qDebug("output limit %"PRId64" has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); + qDebug("output limit %" PRId64 " has reached, %s", pLimit->limit, GET_TASKID(pTaskInfo)); -// setTaskStatus(pTaskInfo, TASK_COMPLETED); + // setTaskStatus(pTaskInfo, TASK_COMPLETED); pOperator->status = OP_EXEC_DONE; } } @@ -3248,18 +3249,80 @@ static int tableUidCompare(const void* a, const void* b) { } return u1 < u2 ? -1 : 1; } + +typedef struct MergeIndex { + int idx; + int len; +} MergeIndex; + +static FORCE_INLINE int optSysBinarySearch(SArray* arr, int s, int e, uint64_t k) { + uint64_t v; + int32_t m; + while (s <= e) { + m = s + (e - s) / 2; + v = *(uint64_t*)taosArrayGet(arr, m); + if (v >= k) { + e = m - 1; + } else { + s = m + 1; + } + } + return s; +} + +void optSysIntersection(SArray* in, SArray* out) { + int32_t sz = (int32_t)taosArrayGetSize(in); + if (sz <= 0) { + return; + } + MergeIndex* mi = taosMemoryCalloc(sz, sizeof(MergeIndex)); + for (int i = 0; i < sz; i++) { + SArray* t = taosArrayGetP(in, i); + mi[i].len = (int32_t)taosArrayGetSize(t); + mi[i].idx = 0; + } + + SArray* base = taosArrayGetP(in, 0); + for (int i = 0; i < taosArrayGetSize(base); i++) { + uint64_t tgt = *(uint64_t*)taosArrayGet(base, i); + bool has = true; + for (int j = 1; j < taosArrayGetSize(in); j++) { + SArray* oth = taosArrayGetP(in, j); + int mid = optSysBinarySearch(oth, mi[j].idx, mi[j].len - 1, tgt); + if (mid >= 0 && mid < mi[j].len) { + uint64_t val = *(uint64_t*)taosArrayGet(oth, mid); + has = (val == tgt ? true : false); + mi[j].idx = mid; + } else { + has = false; + } + } + if (has == true) { + taosArrayPush(out, &tgt); + } + } + taosMemoryFreeClear(mi); +} + static int32_t optSysMergeRslt(SArray* mRslt, SArray* rslt) { // TODO, find comm mem from mRslt for (int i = 0; i < taosArrayGetSize(mRslt); i++) { - SArray* aRslt = taosArrayGetP(mRslt, i); - taosArrayAddAll(rslt, aRslt); + SArray* arslt = taosArrayGetP(mRslt, i); + taosArraySort(arslt, tableUidCompare); + } + optSysIntersection(mRslt, rslt); + return 0; +} +static int32_t optSysSpecialColumn(SNode* cond) { + SOperatorNode* pOper = (SOperatorNode*)cond; + SColumnNode* pCol = (SColumnNode*)pOper->pLeft; + for (int i = 0; i < sizeof(SYSTABLE_SPECIAL_COL) / sizeof(SYSTABLE_SPECIAL_COL[0]); i++) { + if (0 == strcmp(pCol->colName, SYSTABLE_SPECIAL_COL[i])) { + return 1; + } } - taosArraySort(rslt, tableUidCompare); - taosArrayRemoveDuplicate(rslt, tableUidCompare, NULL); - return 0; } - static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { int ret = -1; if (nodeType(cond) == QUERY_NODE_OPERATOR) { @@ -3283,7 +3346,6 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { SNodeList* pList = (SNodeList*)pNode->pParameterList; int32_t len = LIST_LENGTH(pList); - if (len <= 0) return ret; bool hasIdx = false; bool hasRslt = true; @@ -3294,12 +3356,16 @@ static int32_t optSysTabFilte(void* arg, SNode* cond, SArray* result) { if (cell == NULL) break; SArray* aRslt = taosArrayInit(16, sizeof(int64_t)); - ret = optSysTabFilteImpl(arg, cell->pNode, aRslt); if (ret == 0) { // has index hasIdx = true; - taosArrayPush(mRslt, &aRslt); + if (optSysSpecialColumn(cell->pNode) == 0) { + taosArrayPush(mRslt, &aRslt); + } else { + // db_name/vgroup not result + taosArrayDestroy(aRslt); + } } else if (ret == -2) { // current vg hasIdx = true; @@ -4169,6 +4235,131 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* return TSDB_CODE_SUCCESS; } +int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, + STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, + STsdbReader** ppReader, const char* idstr) { + STsdbReader* pReader = NULL; + SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo)); + for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) { + taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i)); + } + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, &pReader, idstr); + if (code != 0) { + taosArrayDestroy(subTableList); + return code; + } + *ppReader = pReader; + return TSDB_CODE_SUCCESS; +} + +static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + uint64_t uid = pBlock->info.uid; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + STsdbReader* reader = pTableScanInfo->pReader; + tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + STsdbReader* reader = pTableScanInfo->pReader; + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + + // currently only the tbname pseudo column + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + } + + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); + + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } + } + return TSDB_CODE_SUCCESS; +} + // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { @@ -4291,9 +4482,137 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc typedef struct STableMergeScanSortSourceParam { SOperatorInfo* pOperator; int32_t readerIdx; + int64_t uid; SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; +static SSDataBlock* getTableDataBlockTemp(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t readIdx = source->readerIdx; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); + + blockDataCleanup(pBlock); + + int64_t st = taosGetTimestampUs(); + + SArray* subTable = taosArrayInit(1, sizeof(STableKeyInfo)); + taosArrayPush(subTable, taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex)); + SReadHandle* pHandle = &pInfo->readHandle; + tsdbReaderOpen(pHandle->vnode, pQueryCond, subTable, &pInfo->pReader, GET_TASKID(pTaskInfo)); + taosArrayDestroy(subTable); + + STsdbReader* reader = pInfo->pReader; + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + tsdbReaderClose(pInfo->pReader); + return pBlock; + } + tsdbReaderClose(pInfo->pReader); + return NULL; +} +static SSDataBlock* getTableDataBlock2(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int64_t uid = source->uid; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pBlock); + + STsdbReader* reader = pTableScanInfo->pReader; + while (tsdbTableNextDataBlock(reader, uid)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + return pBlock; + } + return NULL; +} + static SSDataBlock* getTableDataBlock(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator; @@ -4372,6 +4691,14 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { return pList; } +int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { + memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); + dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); + for (int i = 0; i < src->numOfCols; i++) { + dst->colList[i] = src->colList[i]; + } + return 0; +} int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4392,9 +4719,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { int32_t tableEndIdx = pInfo->tableEndIndex; STableListInfo* tableListInfo = pInfo->tableListInfo; + pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); - createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx, - pInfo->dataReaders, GET_TASKID(pTaskInfo)); + pInfo->pReader = NULL; // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result @@ -4403,18 +4730,27 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, pInfo->pSortInputBlock, pTaskInfo->id.str); - tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlock, NULL, NULL); + tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockTemp, NULL, NULL); + + // one table has one data block + int32_t numOfTable = tableEndIdx - tableStartIdx + 1; + pInfo->queryConds = taosArrayInit(numOfTable, sizeof(SQueryTableDataCond)); + + for (int32_t i = 0; i < numOfTable; ++i) { + STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i + tableStartIdx); - size_t numReaders = taosArrayGetSize(pInfo->dataReaders); - for (int32_t i = 0; i < numReaders; ++i) { STableMergeScanSortSourceParam param = {0}; param.readerIdx = i; param.pOperator = pOperator; param.inputBlock = createOneDataBlock(pInfo->pResBlock, false); taosArrayPush(pInfo->sortSourceParams, ¶m); + + SQueryTableDataCond cond; + dumpSQueryTableCond(&pInfo->cond, &cond); + taosArrayPush(pInfo->queryConds, &cond); } - for (int32_t i = 0; i < numReaders; ++i) { + for (int32_t i = 0; i < numOfTable; ++i) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); ps->param = param; @@ -4548,6 +4884,8 @@ void destroyTableMergeScanOperatorInfo(void* param) { } taosArrayDestroy(pTableScanInfo->dataReaders); + tsdbReaderClose(pTableScanInfo->pReader); + if (pTableScanInfo->matchInfo.pList != NULL) { taosArrayDestroy(pTableScanInfo->matchInfo.pList); } From 9de0afc8ea91d328e8c7fb9ef8ec2e98627815df Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 27 Oct 2022 21:22:36 +0800 Subject: [PATCH 2/7] opt mem --- source/dnode/vnode/inc/vnode.h | 8 +++++--- source/dnode/vnode/src/tsdb/tsdbRead.c | 1 + source/libs/executor/src/scanoperator.c | 11 ++++++++--- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 7996498534..bed6e93e5a 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -152,9 +152,10 @@ typedef struct STsdbReader STsdbReader; #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 -int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); -int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, - const char *idstr); +int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); +int32_t tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *pTableList, STsdbReader **ppReader, + const char *idstr); + void tsdbReaderClose(STsdbReader *pReader); bool tsdbNextDataBlock(STsdbReader *pReader); bool tsdbTableNextDataBlock(STsdbReader *pReader, int64_t uid); @@ -164,6 +165,7 @@ SArray *tsdbRetrieveDataBlock(STsdbReader *pTsdbReadHandle, SArray *pColumnIdLi int32_t tsdbReaderReset(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); +bool tsdbIsAscendingOrder(STsdbReader *pReader); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t getReaderMaxVersion(STsdbReader *pReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1906e59755..5a495f263e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -4174,3 +4174,4 @@ void tsdbUntakeReadSnap(STsdb* pTsdb, STsdbReadSnap* pSnap, const char* idStr) { } tsdbTrace("vgId:%d, untake read snapshot, %s", TD_VID(pTsdb->pVnode), idStr); } +bool tsdbIsAscendingOrder(STsdbReader* pReader) { return ASCENDING_TRAVERSE(pReader->order); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c0db9b1fa3..0123eb2575 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4443,7 +4443,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); + STsdbReader* reader = pTableScanInfo->pReader; // taosArrayGetP(pTableScanInfo->dataReaders, readerIdx); SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); if (pCols == NULL) { return terrno; @@ -4529,7 +4529,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { pBlock->info.window = binfo.window; pBlock->info.rows = binfo.rows; - pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; + if (tsdbIsAscendingOrder(pInfo->pReader)) { + pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; + } else { + pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; + } uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); @@ -4551,9 +4555,11 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; return pBlock; } tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; return NULL; } static SSDataBlock* getTableDataBlock2(void* param) { @@ -4648,7 +4654,6 @@ static SSDataBlock* getTableDataBlock(void* param) { uint32_t status = 0; int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readerIdx, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pOperator->pTaskInfo->env, code); } From 07d8ece3703b29436305d7ee67e967bab79c4c1d Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 27 Oct 2022 22:19:47 +0800 Subject: [PATCH 3/7] opt mem --- source/libs/executor/src/scanoperator.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0123eb2575..f0577358b1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4891,6 +4891,12 @@ void destroyTableMergeScanOperatorInfo(void* param) { tsdbReaderClose(pTableScanInfo->pReader); + for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { + SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i); + taosMemoryFree(pCond->colList); + } + taosArrayDestroy(pTableScanInfo->queryConds); + if (pTableScanInfo->matchInfo.pList != NULL) { taosArrayDestroy(pTableScanInfo->matchInfo.pList); } From 29dedb5c8cecf4b4467188873b84e7d43ef9c81f Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Thu, 27 Oct 2022 23:18:04 +0800 Subject: [PATCH 4/7] opt mem --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f0577358b1..f751e1a989 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4551,7 +4551,7 @@ static SSDataBlock* getTableDataBlockTemp(void* param) { pBlock->info.groupId = *groupId; } - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; tsdbReaderClose(pInfo->pReader); From f8e0ff86d33db06877089b727c2653146ae7bfef Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 08:48:52 +0800 Subject: [PATCH 5/7] opt mem --- source/libs/executor/src/scanoperator.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f751e1a989..66301fa10b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4796,6 +4796,13 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); tsdbReaderClose(reader); } + for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) { + SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i); + taosMemoryFree(cond->colList); + } + taosArrayDestroy(pInfo->queryConds); + pInfo->queryConds = NULL; + taosArrayDestroy(pInfo->dataReaders); pInfo->dataReaders = NULL; return TSDB_CODE_SUCCESS; From 63827f7551d75763e1b3ad8b03c1c8eeda9ea2b9 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 13:56:55 +0800 Subject: [PATCH 6/7] use sys mem --- source/libs/executor/src/sortoperator.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 16d853ac7e..26f1932b12 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -38,7 +38,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; - int32_t code = extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + int32_t code = + extractColMatchInfo(pSortNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); @@ -62,8 +63,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16; // TODO dynamic set the available sort buffer - pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, - getExplainExecInfo); + pOperator->fpSet = + createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, getExplainExecInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -126,7 +127,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -316,7 +317,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -592,7 +593,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData blockDataEnsureCapacity(p, capacity); - _retry: +_retry: while (1) { STupleHandle* pTupleHandle = NULL; if (pInfo->groupSort) { @@ -647,13 +648,13 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); -// ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); + // ASSERT(pColMatchInfo-> == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); colDataAssign(pDst, pSrc, p->info.rows, &pDataBlock->info); } - + pInfo->limitInfo.numOfOutputRows += p->info.rows; pDataBlock->info.rows = p->info.rows; pDataBlock->info.groupId = pInfo->groupId; } @@ -734,7 +735,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size SArray* pSortInfo = createSortInfo(pMergePhyNode->pMergeKeys); int32_t numOfOutputCols = 0; - code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo); + code = extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, COL_MATCH_FROM_SLOT_ID, + &pInfo->matchInfo); if (code != TSDB_CODE_SUCCESS) { goto _error; } From 419e5010020ab494fa614c77e41fe3d71d619911 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 28 Oct 2022 14:21:00 +0800 Subject: [PATCH 7/7] refactor reader --- source/libs/executor/src/scanoperator.c | 27 ++++++++++--------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 66301fa10b..7e1ff3d6d7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -4725,9 +4725,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableListInfo* tableListInfo = pInfo->tableListInfo; - pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); + // pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES); pInfo->pReader = NULL; - // todo the total available buffer should be determined by total capacity of buffer of this task. // the additional one is reserved for merge result pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1); @@ -4775,7 +4774,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { STableMergeScanInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - size_t numReaders = taosArrayGetSize(pInfo->dataReaders); + int32_t numOfTable = taosArrayGetSize(pInfo->queryConds); SSortExecInfo sortExecInfo = tsortGetSortExecInfo(pInfo->pSortHandle); pInfo->sortExecInfo.sortMethod = sortExecInfo.sortMethod; @@ -4784,7 +4783,7 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortExecInfo.readBytes += sortExecInfo.readBytes; pInfo->sortExecInfo.writeBytes += sortExecInfo.writeBytes; - for (int32_t i = 0; i < numReaders; ++i) { + for (int32_t i = 0; i < numOfTable; ++i) { STableMergeScanSortSourceParam* param = taosArrayGet(pInfo->sortSourceParams, i); blockDataDestroy(param->inputBlock); } @@ -4792,10 +4791,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { tsortDestroySortHandle(pInfo->pSortHandle); - for (int32_t i = 0; i < numReaders; ++i) { - STsdbReader* reader = taosArrayGetP(pInfo->dataReaders, i); - tsdbReaderClose(reader); - } for (int32_t i = 0; i < taosArrayGetSize(pInfo->queryConds); i++) { SQueryTableDataCond* cond = taosArrayGet(pInfo->queryConds, i); taosMemoryFree(cond->colList); @@ -4803,8 +4798,6 @@ int32_t stopGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayDestroy(pInfo->queryConds); pInfo->queryConds = NULL; - taosArrayDestroy(pInfo->dataReaders); - pInfo->dataReaders = NULL; return TSDB_CODE_SUCCESS; } @@ -4888,15 +4881,17 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { void destroyTableMergeScanOperatorInfo(void* param) { STableMergeScanInfo* pTableScanInfo = (STableMergeScanInfo*)param; cleanupQueryTableDataCond(&pTableScanInfo->cond); + + int32_t numOfTable = taosArrayGetSize(pTableScanInfo->queryConds); + + for (int32_t i = 0; i < numOfTable; i++) { + STableMergeScanSortSourceParam* param = taosArrayGet(pTableScanInfo->sortSourceParams, i); + blockDataDestroy(param->inputBlock); + } taosArrayDestroy(pTableScanInfo->sortSourceParams); - for (int32_t i = 0; i < taosArrayGetSize(pTableScanInfo->dataReaders); ++i) { - STsdbReader* reader = taosArrayGetP(pTableScanInfo->dataReaders, i); - tsdbReaderClose(reader); - } - taosArrayDestroy(pTableScanInfo->dataReaders); - tsdbReaderClose(pTableScanInfo->pReader); + pTableScanInfo->pReader = NULL; for (int i = 0; i < taosArrayGetSize(pTableScanInfo->queryConds); i++) { SQueryTableDataCond* pCond = taosArrayGet(pTableScanInfo->queryConds, i);