diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a92e8189a1..17f30b5194 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -184,9 +184,9 @@ static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pR static void setComposedBlockFlag(STsdbReader* pReader, bool composed); static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order); -static void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, +static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow); -static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, +static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, STsdbReader* pReader); @@ -1395,7 +1395,11 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -1422,7 +1426,11 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -1456,12 +1464,16 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return code; } else { ASSERT(0); return TSDB_CODE_SUCCESS; @@ -1618,12 +1630,16 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* } } - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); tRowMergerClear(&merge); - return TSDB_CODE_SUCCESS; + return code; } #if 0 @@ -1907,7 +1923,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); - tRowMergerGetRow(&merge, &pTSRow); + int32_t code = tRowMergerGetRow(&merge, &pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + doAppendRowFromTSRow(pReader->pResBlock, pReader, pTSRow, pBlockScanInfo->uid); taosMemoryFree(pTSRow); @@ -3026,8 +3046,8 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc return TSDB_CODE_SUCCESS; } -void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, - STsdbReader* pReader, bool* freeTSRow) { +int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow, + STsdbReader* pReader, bool* freeTSRow) { TSDBROW* pNextRow = NULL; TSDBROW current = *pRow; @@ -3037,19 +3057,19 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr if (!pIter->hasVal) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } else { // has next point in mem/imem pNextRow = getValidMemRow(pIter, pDelList, pReader); if (pNextRow == NULL) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } if (current.pTSRow->ts != pNextRow->pTSRow->ts) { *pTSRow = current.pTSRow; *freeTSRow = false; - return; + return TSDB_CODE_SUCCESS; } } } @@ -3069,13 +3089,17 @@ void doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SAr tRowMergerAdd(&merge, pNextRow, pTSchema1); doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader); - tRowMergerGetRow(&merge, pTSRow); - tRowMergerClear(&merge); + int32_t code = tRowMergerGetRow(&merge, pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + tRowMergerClear(&merge); *freeTSRow = true; + return TSDB_CODE_SUCCESS; } -void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, +int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow) { SRowMerger merge = {0}; @@ -3100,7 +3124,8 @@ void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlo doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader); } - tRowMergerGetRow(&merge, pTSRow); + int32_t code = tRowMergerGetRow(&merge, pTSRow); + return code; } int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey, @@ -3130,28 +3155,30 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR TSDBKEY k = TSDBROW_KEY(pRow); TSDBKEY ik = TSDBROW_KEY(piRow); + int32_t code = TSDB_CODE_SUCCESS; if (ik.ts != k.ts) { if (((ik.ts < k.ts) && asc) || ((ik.ts > k.ts) && (!asc))) { // ik.ts < k.ts - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } else if (((k.ts < ik.ts) && asc) || ((k.ts > ik.ts) && (!asc))) { - doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); + code = doMergeMemTableMultiRows(pRow, uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } } else { // ik.ts == k.ts - doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); *freeTSRow = true; + code = doMergeMemIMemRows(pRow, piRow, pBlockScanInfo, pReader, pTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - return TSDB_CODE_SUCCESS; + return code; } if (pBlockScanInfo->iter.hasVal && pRow != NULL) { - doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); - return TSDB_CODE_SUCCESS; + return doMergeMemTableMultiRows(pRow, pBlockScanInfo->uid, &pBlockScanInfo->iter, pDelList, pTSRow, pReader, freeTSRow); } if (pBlockScanInfo->iiter.hasVal && piRow != NULL) { - doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); - return TSDB_CODE_SUCCESS; + return doMergeMemTableMultiRows(piRow, uid, &pBlockScanInfo->iiter, pDelList, pTSRow, pReader, freeTSRow); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 70180d6dc0..715eb4780c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -938,15 +938,17 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod for (int32_t i = 0; i < numOfCols; ++i) { STargetNode* pNode = (STargetNode*)nodesListGetNode(pNodeList, i); - SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; + if (nodeType(pNode->pExpr) == QUERY_NODE_COLUMN) { + SColumnNode* pColNode = (SColumnNode*)pNode->pExpr; - SColMatchInfo c = {0}; - c.output = true; - c.colId = pColNode->colId; - c.srcSlotId = pColNode->slotId; - c.matchType = type; - c.targetSlotId = pNode->slotId; - taosArrayPush(pList, &c); + SColMatchInfo c = {0}; + c.output = true; + c.colId = pColNode->colId; + c.srcSlotId = pColNode->slotId; + c.matchType = type; + c.targetSlotId = pNode->slotId; + taosArrayPush(pList, &c); + } } *numOfOutputCols = 0; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d8e5c0abe5..e17b994f0e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -263,8 +263,6 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ASSERT(pSup->resultRowSize > 0); pResult = getNewResultRow(pResultBuf, &pSup->currentPageId, pSup->resultRowSize); - initResultRow(pResult); - // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; tSimpleHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, @@ -817,13 +815,6 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } else { pInput->colDataAggIsSet = false; } - - // set the statistics data for primary time stamp column - // if (pCtx->functionId == FUNCTION_SPREAD && pColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { - // pCtx->isAggSet = true; - // pCtx->agg.min = pBlock->info.window.skey; - // pCtx->agg.max = pBlock->info.window.ekey; - // } } bool isTaskKilled(SExecTaskInfo* pTaskInfo) { @@ -860,146 +851,6 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int return win; } -#if 0 -static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { - - bool hasFirstLastFunc = false; - bool hasOtherFunc = false; - - if (status == BLK_DATA_DATA_LOAD || status == BLK_DATA_FILTEROUT) { - return status; - } - - for (int32_t i = 0; i < pQuery->numOfOutput; ++i) { - int32_t functionId = getExprFunctionId(&pQuery->pExpr1[i]); - - if (functionId == FUNCTION_TS || functionId == FUNCTION_TS_DUMMY || functionId == FUNCTION_TAG || - functionId == FUNCTION_TAG_DUMMY) { - continue; - } - - if (functionId == FUNCTION_FIRST_DST || functionId == FUNCTION_LAST_DST) { - hasFirstLastFunc = true; - } else { - hasOtherFunc = true; - } - - } - - if (hasFirstLastFunc && status == BLK_DATA_NOT_LOAD) { - if (!hasOtherFunc) { - return BLK_DATA_FILTEROUT; - } else { - return BLK_DATA_DATA_LOAD; - } - } - - return status; -} - -#endif - -// static void updateDataCheckOrder(SQInfo *pQInfo, SQueryTableReq* pQueryMsg, bool stableQuery) { -// STaskAttr* pQueryAttr = pQInfo->runtimeEnv.pQueryAttr; -// -// // in case of point-interpolation query, use asc order scan -// char msg[] = "QInfo:0x%"PRIx64" scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" -// PRId64 -// "-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64; -// -// // todo handle the case the the order irrelevant query type mixed up with order critical query type -// // descending order query for last_row query -// if (isFirstLastRowQuery(pQueryAttr)) { -// //qDebug("QInfo:0x%"PRIx64" scan order changed for last_row query, old:%d, new:%d", pQInfo->qId, -// pQueryAttr->order.order, TSDB_ORDER_ASC); -// -// pQueryAttr->order.order = TSDB_ORDER_ASC; -// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// } -// -// pQueryAttr->needReverseScan = false; -// return; -// } -// -// if (pQueryAttr->groupbyColumn && pQueryAttr->order.order == TSDB_ORDER_DESC) { -// pQueryAttr->order.order = TSDB_ORDER_ASC; -// if (pQueryAttr->window.skey > pQueryAttr->window.ekey) { -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// } -// -// pQueryAttr->needReverseScan = false; -// doUpdateLastKey(pQueryAttr); -// return; -// } -// -// if (pQueryAttr->pointInterpQuery && pQueryAttr->interval.interval == 0) { -// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { -// //qDebug(msg, pQInfo->qId, "interp", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, -// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); TSWAP(pQueryAttr->window.skey, -// pQueryAttr->window.ekey, TSKEY); -// } -// -// pQueryAttr->order.order = TSDB_ORDER_ASC; -// return; -// } -// -// if (pQueryAttr->interval.interval == 0) { -// if (onlyFirstQuery(pQueryAttr)) { -// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { -// //qDebug(msg, pQInfo->qId, "only-first", pQueryAttr->order.order, TSDB_ORDER_ASC, pQueryAttr->window.skey, -//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); -// -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// doUpdateLastKey(pQueryAttr); -// } -// -// pQueryAttr->order.order = TSDB_ORDER_ASC; -// pQueryAttr->needReverseScan = false; -// } else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) { -// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { -// //qDebug(msg, pQInfo->qId, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey, -//// pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey); -// -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// doUpdateLastKey(pQueryAttr); -// } -// -// pQueryAttr->order.order = TSDB_ORDER_DESC; -// pQueryAttr->needReverseScan = false; -// } -// -// } else { // interval query -// if (stableQuery) { -// if (onlyFirstQuery(pQueryAttr)) { -// if (!QUERY_IS_ASC_QUERY(pQueryAttr)) { -// //qDebug(msg, pQInfo->qId, "only-first stable", pQueryAttr->order.order, TSDB_ORDER_ASC, -//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, -/// pQueryAttr->window.skey); -// -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// doUpdateLastKey(pQueryAttr); -// } -// -// pQueryAttr->order.order = TSDB_ORDER_ASC; -// pQueryAttr->needReverseScan = false; -// } else if (onlyLastQuery(pQueryAttr)) { -// if (QUERY_IS_ASC_QUERY(pQueryAttr)) { -// //qDebug(msg, pQInfo->qId, "only-last stable", pQueryAttr->order.order, TSDB_ORDER_DESC, -//// pQueryAttr->window.skey, pQueryAttr->window.ekey, pQueryAttr->window.ekey, -/// pQueryAttr->window.skey); -// -// TSWAP(pQueryAttr->window.skey, pQueryAttr->window.ekey); -// doUpdateLastKey(pQueryAttr); -// } -// -// pQueryAttr->order.order = TSDB_ORDER_DESC; -// pQueryAttr->needReverseScan = false; -// } -// } -// } -//} - #if 0 static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockInfo) { STimeWindow w = {0}; @@ -1225,24 +1076,6 @@ static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) if (pTableQueryInfo == NULL) { return; } - - // TSWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey); - // pTableQueryInfo->lastKey = pTableQueryInfo->win.skey; - - // SWITCH_ORDER(pTableQueryInfo->cur.order); - // pTableQueryInfo->cur.vgroupIndex = -1; - - // set the index to be the end slot of result rows array - // SResultRowInfo* pResultRowInfo = &pTableQueryInfo->resInfo; - // if (pResultRowInfo->size > 0) { - // pResultRowInfo->curPos = pResultRowInfo->size - 1; - // } else { - // pResultRowInfo->curPos = -1; - // } -} - -void initResultRow(SResultRow* pResultRow) { - // pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); } void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { @@ -1255,15 +1088,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } -void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { - if (pTableQueryInfo == NULL) { - return; - } - - // taosVariantDestroy(&pTableQueryInfo->tag); - // cleanupResultRowInfo(&pTableQueryInfo->resInfo); -} - void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset) { bool init = false; for (int32_t i = 0; i < numOfOutput; ++i) { @@ -3022,7 +2846,6 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { resultRow->offset = pOffset; offset += valueLen; - initResultRow(resultRow); pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId)); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ab304b8d2d..26667cd64f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3873,7 +3873,6 @@ static int32_t setWindowOutputBuf(SResultWindowInfo* pWinInfo, SResultRow** pRes if (*pResult == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } - initResultRow(*pResult); // add a new result set for a new group pWinInfo->pos.pageId = (*pResult)->pageId;