diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 47bd0d0b02..c0ea5e79c7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2677,15 +2677,6 @@ typedef struct { int32_t tSerializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); int32_t tDeserializeSMDropSmaReq(void* buf, int32_t bufLen, SMDropSmaReq* pReq); -typedef struct { - int32_t vgId; - SEpSet epSet; -} SVgEpSet; - -typedef struct { - int32_t padding; -} SRSmaExecMsg; - typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 2fb934aaad..8638cc5118 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -38,7 +38,6 @@ static SSdbRow *mndSmaActionDecode(SSdbRaw *pRaw); static int32_t mndSmaActionInsert(SSdb *pSdb, SSmaObj *pSma); static int32_t mndSmaActionDelete(SSdb *pSdb, SSmaObj *pSpSmatb); static int32_t mndSmaActionUpdate(SSdb *pSdb, SSmaObj *pOld, SSmaObj *pNew); -static int32_t mndSmaGetVgEpSet(SMnode *pMnode, SDbObj *pDb, SVgEpSet **ppVgEpSet, int32_t *numOfVgroups); static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq); static int32_t mndProcessDropSmaReq(SRpcMsg *pReq); static int32_t mndProcessGetSmaReq(SRpcMsg *pReq); @@ -841,6 +840,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p _OVER: mndTransDrop(pTrans); + mndReleaseStream(pMnode, pStream); mndReleaseVgroup(pMnode, pVgroup); mndReleaseStb(pMnode, pStb); return code; @@ -961,6 +961,7 @@ _OVER: mError("sma:%s, failed to drop since %s", dropReq.name, terrstr()); } + mndReleaseSma(pMnode, pSma); mndReleaseDb(pMnode, pDb); return code; } diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 352b6be321..fef0ff49ac 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -532,6 +532,9 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { SDecoder dc = {0}; rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData); + if (rc < 0) { + return -1; + } int64_t version = ((SUidIdxVal *)pData)[0].version; tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 6e04b67d65..17c766a854 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -129,11 +129,17 @@ typedef struct SFileBlockDumpInfo { bool allDumped; } SFileBlockDumpInfo; +typedef struct SUidOrderCheckInfo { + uint64_t* tableUidList; // access table uid list in uid ascending order list + int32_t currentIndex; // index in table uid list +} SUidOrderCheckInfo; + typedef struct SReaderStatus { bool loadFromFile; // check file stage bool composedDataBlock; // the returned data block is a composed block or not SHashObj* pTableMap; // SHash STableBlockScanInfo* pTableIter; // table iterator used in building in-memory buffer data blocks. + SUidOrderCheckInfo uidCheckInfo; // check all table in uid order SFileBlockDumpInfo fBlockDumpInfo; SDFileSet* pCurrentFileset; // current opened file set SBlockData fileBlockData; @@ -576,7 +582,7 @@ static void cleanupTableScanInfo(SHashObj* pTableMap) { } // reset the index in last block when handing a new file - px->indexInBlockL = -1; + px->indexInBlockL = DEFAULT_ROW_INDEX_VAL; tMapDataClear(&px->mapData); taosArrayClear(px->pBlockList); } @@ -655,7 +661,8 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SArray* double el = (taosGetTimestampUs() - st) / 1000.0; tsdbDebug( - "load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, size:%.2f Kb, elapsed time:%.2f ms %s", + "load block of %d tables completed, blocks:%d in %d tables, lastBlock:%d, block-info-size:%.2f Kb, elapsed " + "time:%.2f ms %s", numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastBlocks, sizeInDisk / 1000.0, el, pReader->idStr); @@ -1814,7 +1821,8 @@ static void setAllRowsChecked(SLastBlockReader* pLastBlockReader) { } static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { - int32_t step = (pLastBlockReader->order == TSDB_ORDER_ASC) ? 1 : -1; + bool asc = ASCENDING_TRAVERSE(pLastBlockReader->order); + int32_t step = (asc) ? 1 : -1; if (*pLastBlockReader->rowIndex == ALL_ROWS_CHECKED_INDEX) { return false; } @@ -1823,8 +1831,20 @@ static bool nextRowInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc SBlockData* pBlockData = &pLastBlockReader->lastBlockData; for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { - if (pBlockData->aUid != NULL && pBlockData->aUid[i] != pLastBlockReader->uid) { - continue; + if (pBlockData->aUid != NULL) { + if (asc) { + if (pBlockData->aUid[i] < pLastBlockReader->uid) { + continue; + } else if (pBlockData->aUid[i] > pLastBlockReader->uid) { + break; + } + } else { + if (pBlockData->aUid[i] > pLastBlockReader->uid) { + continue; + } else if (pBlockData->aUid[i] < pLastBlockReader->uid) { + break; + } + } } int64_t ts = pBlockData->aTSKEY[i]; @@ -1992,10 +2012,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { setComposedBlockFlag(pReader, true); int64_t et = taosGetTimestampUs(); - tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 - " rows:%d, elapsed time:%.2f ms %s", - pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, - pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); + if (pResBlock->info.rows > 0) { + tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 + " rows:%d, elapsed time:%.2f ms %s", + pReader, pBlockScanInfo->uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, + pResBlock->info.rows, (et - st) / 1000.0, pReader->idStr); + } return TSDB_CODE_SUCCESS; } @@ -2191,7 +2213,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return code; } - code = tsdbReadBlockL(pReader->pFileReader, 0, pLastBlocks); + code = tsdbReadBlockL(pReader->pFileReader, pLastBlocks); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); return code; @@ -2266,7 +2288,7 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable return code; } - code = tsdbReadLastBlock(pReader->pFileReader, 0, pBlock, &pLastBlockReader->lastBlockData); + code = tsdbReadLastBlock(pReader->pFileReader, pBlock, &pLastBlockReader->lastBlockData); double el = (taosGetTimestampUs() - st) / 1000.0; if (code != TSDB_CODE_SUCCESS) { @@ -2287,22 +2309,99 @@ static int32_t doLoadRelatedLastBlock(SLastBlockReader* pLastBlockReader, STable return TSDB_CODE_SUCCESS; } +static int32_t uidComparFunc(const void* p1, const void* p2) { + uint64_t pu1 = *(uint64_t*)p1; + uint64_t pu2 = *(uint64_t*)p2; + if (pu1 == pu2) { + return 0; + } else { + return (pu1 < pu2) ? -1 : 1; + } +} + +static void extractOrderedTableUidList(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) { + int32_t index = 0; + int32_t total = taosHashGetSize(pStatus->pTableMap); + + void* p = taosHashIterate(pStatus->pTableMap, NULL); + while (p != NULL) { + STableBlockScanInfo* pScanInfo = p; + pOrderCheckInfo->tableUidList[index++] = pScanInfo->uid; + p = taosHashIterate(pStatus->pTableMap, p); + } + + taosSort(pOrderCheckInfo->tableUidList, total, sizeof(uint64_t), uidComparFunc); +} + +static int32_t initOrderCheckInfo(SUidOrderCheckInfo* pOrderCheckInfo, SReaderStatus* pStatus) { + if (pOrderCheckInfo->tableUidList == NULL) { + int32_t total = taosHashGetSize(pStatus->pTableMap); + + pOrderCheckInfo->currentIndex = 0; + pOrderCheckInfo->tableUidList = taosMemoryMalloc(total * sizeof(uint64_t)); + if (pOrderCheckInfo->tableUidList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + extractOrderedTableUidList(pOrderCheckInfo, pStatus); + + uint64_t uid = pOrderCheckInfo->tableUidList[0]; + pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + } else { + if (pStatus->pTableIter == NULL) { // it is the last block of a new file + // ASSERT(pOrderCheckInfo->currentIndex == taosHashGetSize(pStatus->pTableMap)); + + pOrderCheckInfo->currentIndex = 0; + uint64_t uid = pOrderCheckInfo->tableUidList[pOrderCheckInfo->currentIndex]; + pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + + // the tableMap has already updated + if (pStatus->pTableIter == NULL) { + void* p = + taosMemoryRealloc(pOrderCheckInfo->tableUidList, taosHashGetSize(pStatus->pTableMap) * sizeof(uint64_t)); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pOrderCheckInfo->tableUidList = p; + extractOrderedTableUidList(pOrderCheckInfo, pStatus); + + uid = pOrderCheckInfo->tableUidList[0]; + pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + } + } + } + + return TSDB_CODE_SUCCESS; +} + +static bool moveToNextTable(SUidOrderCheckInfo* pOrderedCheckInfo, SReaderStatus* pStatus) { + pOrderedCheckInfo->currentIndex += 1; + if (pOrderedCheckInfo->currentIndex >= taosHashGetSize(pStatus->pTableMap)) { + pStatus->pTableIter = NULL; + return false; + } + + uint64_t uid = pOrderedCheckInfo->tableUidList[pOrderedCheckInfo->currentIndex]; + pStatus->pTableIter = taosHashGet(pStatus->pTableMap, &uid, sizeof(uid)); + ASSERT(pStatus->pTableIter != NULL); + return true; +} + static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SLastBlockReader* pLastBlockReader = pStatus->fileIter.pLastBlockReader; - while (1) { - if (pStatus->pTableIter == NULL) { - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, NULL); - if (pStatus->pTableIter == NULL) { - return TSDB_CODE_SUCCESS; - } - } + SUidOrderCheckInfo* pOrderedCheckInfo = &pStatus->uidCheckInfo; + int32_t code = initOrderCheckInfo(pOrderedCheckInfo, pStatus); + if (code != TSDB_CODE_SUCCESS || (taosHashGetSize(pStatus->pTableMap) == 0)) { + return code; + } + while (1) { // load the last data block of current table - // todo opt perf by avoiding load last block repeatly STableBlockScanInfo* pScanInfo = pStatus->pTableIter; - int32_t code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); + code = doLoadRelatedLastBlock(pLastBlockReader, pScanInfo, pReader); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -2310,19 +2409,20 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { if (pLastBlockReader->currentBlockIndex != -1) { initLastBlockReader(pLastBlockReader, pScanInfo->uid, &pScanInfo->indexInBlockL); int32_t index = pScanInfo->indexInBlockL; + if (index == DEFAULT_ROW_INDEX_VAL || index == pLastBlockReader->lastBlockData.nRow) { bool hasData = nextRowInLastBlock(pLastBlockReader, pScanInfo); if (!hasData) { // current table does not have rows in last block, try next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); - if (pStatus->pTableIter == NULL) { + bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); + if (!hasNexTable) { return TSDB_CODE_SUCCESS; } continue; } } } else { // no data in last block, try next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); - if (pStatus->pTableIter == NULL) { + bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); + if (!hasNexTable) { return TSDB_CODE_SUCCESS; } continue; @@ -2338,8 +2438,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { } // current table is exhausted, let's try next table - pStatus->pTableIter = taosHashIterate(pStatus->pTableMap, pStatus->pTableIter); - if (pStatus->pTableIter == NULL) { + bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); + if (!hasNexTable) { return TSDB_CODE_SUCCESS; } } @@ -3354,6 +3454,8 @@ void tsdbReaderClose(STsdbReader* pReader) { tsdbDataFReaderClose(&pReader->pFileReader); } + taosMemoryFree(pReader->status.uidCheckInfo.tableUidList); + SFilesetIter* pFilesetIter = &pReader->status.fileIter; if (pFilesetIter->pLastBlockReader != NULL) { tBlockDataDestroy(&pFilesetIter->pLastBlockReader->lastBlockData, true); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1b3650ad77..f059a90c9c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -207,6 +207,7 @@ typedef struct SExprSupp { typedef struct SOperatorInfo { uint16_t operatorType; + int16_t resultDataBlockId; bool blocking; // block operator or not uint8_t status; // denote if current operator is completed char* name; // name, for debug purpose @@ -218,7 +219,6 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; - int16_t resultDataBlockId; } SOperatorInfo; typedef enum { @@ -909,21 +909,15 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createLastrowScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, STimeWindowAggSupp* pTwAggSupp, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); - -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - bool mergeResultBlock, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - SNode* pCondition, bool mergeResultBlocks, SExecTaskInfo* pTaskInfo); - +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, + SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, + SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, @@ -931,22 +925,16 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition, SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, + SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); - -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, - SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo); - +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); - SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); - SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); @@ -957,10 +945,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); -#if 0 -SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); -#endif - int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 893acf1bbc..6f4c84f9c0 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -600,7 +600,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId); if (pResult->info.rows > 0 && !createNewColModel) { - colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], + colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows); } else { colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info); @@ -638,7 +638,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; ASSERT(pResult->info.capacity > 0); - colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); colDataDestroy(&idata); numOfRows = dest.numOfRows; @@ -703,7 +703,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc int32_t startOffset = createNewColModel ? 0 : pResult->info.rows; ASSERT(pResult->info.capacity > 0); - colDataMergeCol(pResColData, startOffset, &pResult->info.capacity, &idata, dest.numOfRows); + colDataMergeCol(pResColData, startOffset, (int32_t*)&pResult->info.capacity, &idata, dest.numOfRows); colDataDestroy(&idata); numOfRows = dest.numOfRows; @@ -4024,7 +4024,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo)); if (code) { pTaskInfo->code = code; - qError("failed to createScanTableListInfo, code: %s", tstrerror(code)); + qError("failed to createScanTableListInfo, code:%s, %s", tstrerror(code), GET_TASKID(pTaskInfo)); return NULL; } @@ -4162,9 +4162,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo if (ops[i] == NULL) { taosMemoryFree(ops); return NULL; - } else { - ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } + + ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } SOperatorInfo* pOptr = NULL; @@ -4216,37 +4216,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == type) { SMergeAlignedIntervalPhysiNode* pIntervalPhyNode = (SMergeAlignedIntervalPhysiNode*)pPhyNode; - - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - - SInterval interval = {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; - - int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, - pPhyNode->pConditions, pIntervalPhyNode->window.mergeDataBlock, - pTaskInfo); + pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; - - SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - - SInterval interval = {.interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; - - int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; - pOptr = createMergeIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, - pIntervalPhyNode->window.mergeDataBlock, pTaskInfo); + pOptr = createMergeIntervalOperatorInfo(ops[0], pIntervalPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL == type) { int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); @@ -4275,17 +4248,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { SStateWinodwPhysiNode* pStateNode = (SStateWinodwPhysiNode*)pPhyNode; - - STimeWindowAggSupp as = {.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType}; - - SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; - - SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; - SColumn col = extractColumnFromColumnNode(pColNode); - pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, - pTaskInfo); + pOptr = createStatewindowOperatorInfo(ops[0], pStateNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { @@ -4299,8 +4262,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else { ASSERT(0); } + taosMemoryFree(ops); - if (pOptr) pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + if (pOptr) { + pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + } + return pOptr; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 53709c7dcc..9d7e833b19 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -702,8 +702,8 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { } SArray* groupArray = taosArrayInit(taosHashGetSize(pInfo->pGroupSet), sizeof(SDataGroupInfo)); - void* pGroupIter = NULL; - pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL); + + void* pGroupIter = taosHashIterate(pInfo->pGroupSet, NULL); while (pGroupIter != NULL) { SDataGroupInfo* pGroupInfo = pGroupIter; taosArrayPush(groupArray, pGroupInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index cadaf4a9d5..ebafa91046 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1900,8 +1900,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, @@ -2700,20 +2698,26 @@ _error: return NULL; } -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, - SSDataBlock* pResBlock, STimeWindowAggSupp* pTwAggSup, int32_t tsSlotId, - SColumn* pStateKeyCol, SNode* pCondition, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, + SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } - pInfo->stateCol = *pStateKeyCol; + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); + int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; + + SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; + + pInfo->stateCol = extractColumnFromColumnNode(pColNode); pInfo->stateKey.type = pInfo->stateCol.type; pInfo->stateKey.bytes = pInfo->stateCol.bytes; pInfo->stateKey.pData = taosMemoryCalloc(1, pInfo->stateCol.bytes); - pInfo->pCondition = pCondition; + pInfo->pCondition = pStateNode->window.node.pConditions; if (pInfo->stateKey.pData == NULL) { goto _error; } @@ -2721,16 +2725,15 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExpr, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } initBasicInfo(&pInfo->binfo, pResBlock); - initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->twAggSup = *pTwAggSup; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pStateNode->window.watermark, .calTrigger = pStateNode->window.triggerType};; initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->tsSlotId = tsSlotId; @@ -2738,8 +2741,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExpr; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; @@ -2813,8 +2814,6 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, @@ -3449,8 +3448,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = @@ -3636,8 +3633,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->exprSupp.numOfExprs = numOfCols; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo, @@ -4898,8 +4893,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->pTaskInfo = pTaskInfo; pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL, @@ -5115,9 +5108,7 @@ static SSDataBlock* mergeAlignedIntervalAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pRes; } -SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, - int32_t primaryTsSlotId, SNode* pCondition, bool mergeResultBlock, +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo) { SMergeAlignedIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeAlignedIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -5130,24 +5121,33 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pNode->window.node.pOutputDataBlockDesc); + + SInterval interval = {.interval = pNode->interval, + .sliding = pNode->sliding, + .intervalUnit = pNode->intervalUnit, + .slidingUnit = pNode->slidingUnit, + .offset = pNode->offset, + .precision = ((SColumnNode*)pNode->window.pTspk)->node.resType.precision}; + SIntervalAggOperatorInfo* iaInfo = miaInfo->intervalAggOperatorInfo; SExprSupp* pSup = &pOperator->exprSupp; - miaInfo->pCondition = pCondition; - miaInfo->curTs = INT64_MIN; - - iaInfo->win = pTaskInfo->window; - iaInfo->inputOrder = TSDB_ORDER_ASC; - iaInfo->interval = *pInterval; - iaInfo->execModel = pTaskInfo->execModel; - iaInfo->primaryTsIndex = primaryTsSlotId; - iaInfo->binfo.mergeResultBlock = mergeResultBlock; + miaInfo->pCondition = pNode->window.node.pConditions; + miaInfo->curTs = INT64_MIN; + iaInfo->win = pTaskInfo->window; + iaInfo->inputOrder = TSDB_ORDER_ASC; + iaInfo->interval = interval; + iaInfo->execModel = pTaskInfo->execModel; + iaInfo->primaryTsIndex = ((SColumnNode*)pNode->window.pTspk)->slotId; + iaInfo->binfo.mergeResultBlock = pNode->window.mergeDataBlock; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = - initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -5155,7 +5155,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); - iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, numOfCols, iaInfo); + iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pSup->pCtx, num, iaInfo); if (iaInfo->timeWindowInterpo) { iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); } @@ -5163,14 +5163,12 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, initResultRowInfo(&iaInfo->binfo.resultRowInfo); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); - pOperator->name = "TimeMergeAlignedIntervalAggOperator"; + pOperator->name = "TimeMergeAlignedIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = miaInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = miaInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL, destroyMergeAlignedIntervalOperatorInfo, NULL, NULL, NULL); @@ -5427,57 +5425,64 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pRes; } -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, - bool mergeBlock, SExecTaskInfo* pTaskInfo) { - SMergeIntervalAggOperatorInfo* miaInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, + SExecTaskInfo* pTaskInfo) { + SMergeIntervalAggOperatorInfo* pMergeIntervalInfo = taosMemoryCalloc(1, sizeof(SMergeIntervalAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (miaInfo == NULL || pOperator == NULL) { + if (pMergeIntervalInfo == NULL || pOperator == NULL) { goto _error; } - miaInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); + int32_t num = 0; + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); + SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc); - SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; - iaInfo->win = pTaskInfo->window; - iaInfo->inputOrder = TSDB_ORDER_ASC; - iaInfo->interval = *pInterval; - iaInfo->execModel = pTaskInfo->execModel; - iaInfo->binfo.mergeResultBlock = mergeBlock; + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; - iaInfo->primaryTsIndex = primaryTsSlotId; + pMergeIntervalInfo->groupIntervals = tdListNew(sizeof(SGroupTimeWindow)); + + SIntervalAggOperatorInfo* pIntervalInfo = &pMergeIntervalInfo->intervalAggOperatorInfo; + pIntervalInfo->win = pTaskInfo->window; + pIntervalInfo->inputOrder = TSDB_ORDER_ASC; + pIntervalInfo->interval = interval; + pIntervalInfo->execModel = pTaskInfo->execModel; + pIntervalInfo->binfo.mergeResultBlock = pIntervalPhyNode->window.mergeDataBlock; + pIntervalInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; SExprSupp* pExprSupp = &pOperator->exprSupp; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(pExprSupp, &iaInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } - initBasicInfo(&iaInfo->binfo, pResBlock); - initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); + initBasicInfo(&pIntervalInfo->binfo, pResBlock); + initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); - iaInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, numOfCols, iaInfo); - if (iaInfo->timeWindowInterpo) { - iaInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); - if (iaInfo->binfo.resultRowInfo.openWindow == NULL) { + pIntervalInfo->timeWindowInterpo = timeWindowinterpNeeded(pExprSupp->pCtx, num, pIntervalInfo); + if (pIntervalInfo->timeWindowInterpo) { + pIntervalInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); + if (pIntervalInfo->binfo.resultRowInfo.openWindow == NULL) { goto _error; } } - initResultRowInfo(&iaInfo->binfo.resultRowInfo); + initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); - pOperator->name = "TimeMergeIntervalAggOperator"; + pOperator->name = "TimeMergeIntervalAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL; - pOperator->blocking = false; - pOperator->status = OP_NOT_OPENED; - pOperator->exprSupp.pExprInfo = pExprInfo; - pOperator->pTaskInfo = pTaskInfo; - pOperator->exprSupp.numOfExprs = numOfCols; - pOperator->info = miaInfo; + pOperator->blocking = false; + pOperator->status = OP_NOT_OPENED; + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pMergeIntervalInfo; pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL, destroyMergeIntervalOperatorInfo, NULL, NULL, NULL); @@ -5490,7 +5495,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI return pOperator; _error: - destroyMergeIntervalOperatorInfo(miaInfo); + destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; return NULL; diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index fe3065b2b7..7d7a14483a 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -186,15 +186,16 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) { int32_t len1 = varDataLen(pLeft); int32_t len2 = varDataLen(pRight); - if (len1 != len2) { - return len1 > len2 ? 1 : -1; - } else { - int32_t ret = strncmp(varDataVal(pLeft), varDataVal(pRight), len1); - if (ret == 0) { + int32_t minLen = TMIN(len1, len2); + int32_t ret = strncmp(varDataVal(pLeft), varDataVal(pRight), minLen); + if (ret == 0) { + if (len1 == len2) { return 0; } else { - return ret > 0 ? 1 : -1; + return len1 > len2 ? 1 : -1; } + } else { + return ret > 0 ? 1 : -1; } } diff --git a/tests/script/tsim/parser/groupby.sim b/tests/script/tsim/parser/groupby.sim index 12a698b1cc..4ee9c530a7 100644 --- a/tests/script/tsim/parser/groupby.sim +++ b/tests/script/tsim/parser/groupby.sim @@ -557,7 +557,7 @@ if $data10 != @{slop:0.000000, intercept:1.000000}@ then return -1 endi -if $data90 != @{slop:0.000000, intercept:9.000000}@ then +if $data90 != @{slop:0.000000, intercept:17.000000}@ then return -1 endi diff --git a/tests/system-test/2-query/json_tag.py b/tests/system-test/2-query/json_tag.py index 856d764747..d9715579ae 100644 --- a/tests/system-test/2-query/json_tag.py +++ b/tests/system-test/2-query/json_tag.py @@ -338,7 +338,7 @@ class TDTestCase: tdSql.query(f"select * from {dbname}.jsons1 where jtag->'tag1' between 1 and 30") tdSql.checkRows(3) tdSql.query(f"select * from {dbname}.jsons1 where jtag->'tag1' between 'femail' and 'beijing'") - tdSql.checkRows(2) + tdSql.checkRows(0) # test with tbname/normal column tdSql.query(f"select * from {dbname}.jsons1 where tbname = 'jsons1_1'")