diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 41bf823095..beb4303156 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -567,7 +567,6 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead pMTree->pLoadInfo = pBlockLoadInfo; pMTree->destroyLoadInfo = destroyLoadInfo; - ASSERT(pMTree->pLoadInfo != NULL); for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file struct SLDataIter *pIter = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 050d03cf73..346ad854df 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -243,7 +243,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC return TSDB_CODE_SUCCESS; } -static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { +static int32_t updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) { int32_t i = 0, j = 0; while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) { @@ -251,7 +251,7 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) if (pTCol->colId == pSupInfo->colId[j]) { if (!IS_BSMA_ON(pTCol)) { pSupInfo->smaValid = false; - return; + return TSDB_CODE_SUCCESS; } i += 1; @@ -260,9 +260,11 @@ static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) // do nothing i += 1; } else { - ASSERT(0); + return TSDB_CODE_INVALID_PARA; } } + + return TSDB_CODE_SUCCESS; } static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) { @@ -579,7 +581,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd } if (VND_IS_TSMA(pVnode)) { - tsdbDebug("vgId:%d, tsma is selected to query", TD_VID(pVnode)); + tsdbDebug("vgId:%d, tsma is selected to query, %s", TD_VID(pVnode), idstr); } initReaderStatus(&pReader->status); @@ -594,7 +596,6 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pReader->pTsdb, &pCond->twindows); pReader->blockInfoBuf.numPerBucket = 1000; // 1000 tables per bucket - ASSERT(pCond->numOfCols > 0); if (pReader->pResBlock == NULL) { pReader->freeBlock = true; @@ -605,6 +606,12 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd } } + if (pCond->numOfCols <= 0) { + tsdbError("vgId:%d, invalid column number %d in query cond, %s", TD_VID(pVnode), pCond->numOfCols, idstr); + code = TSDB_CODE_INVALID_PARA; + goto _end; + } + // todo refactor. limitOutputBufferSize(pCond, &pReader->capacity); @@ -794,8 +801,9 @@ static void doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_ } static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { - if (taosArrayGetSize(pBlockIter->blockList) == 0) { - ASSERT(pBlockIter->numOfBlocks == taosArrayGetSize(pBlockIter->blockList)); + size_t num = taosArrayGetSize(pBlockIter->blockList); + if (num == 0) { + ASSERT(pBlockIter->numOfBlocks == num); return NULL; } @@ -805,73 +813,6 @@ static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { static SDataBlk* getCurrentBlock(SDataBlockIter* pBlockIter) { return &pBlockIter->block; } -int32_t binarySearchForTs(char* pValue, int num, TSKEY key, int order) { - int32_t midPos = -1; - int32_t numOfRows; - - ASSERT(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC); - - TSKEY* keyList = (TSKEY*)pValue; - int32_t firstPos = 0; - int32_t lastPos = num - 1; - - if (order == TSDB_ORDER_DESC) { - // find the first position which is smaller than the key - while (1) { - if (key >= keyList[firstPos]) return firstPos; - if (key == keyList[lastPos]) return lastPos; - - if (key < keyList[lastPos]) { - lastPos += 1; - if (lastPos >= num) { - return -1; - } else { - return lastPos; - } - } - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1) + firstPos; - - if (key < keyList[midPos]) { - firstPos = midPos + 1; - } else if (key > keyList[midPos]) { - lastPos = midPos - 1; - } else { - break; - } - } - - } else { - // find the first position which is bigger than the key - while (1) { - if (key <= keyList[firstPos]) return firstPos; - if (key == keyList[lastPos]) return lastPos; - - if (key > keyList[lastPos]) { - lastPos = lastPos + 1; - if (lastPos >= num) - return -1; - else - return lastPos; - } - - numOfRows = lastPos - firstPos + 1; - midPos = (numOfRows >> 1u) + firstPos; - - if (key < keyList[midPos]) { - lastPos = midPos - 1; - } else if (key > keyList[midPos]) { - firstPos = midPos + 1; - } else { - break; - } - } - } - - return midPos; -} - static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) { // start end position int s, e; @@ -972,8 +913,8 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo int32_t step = asc? 1:-1; - // make sure it is aligned to 8bit - ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); + // make sure it is aligned to 8bit, the allocated memory address is aligned to 256bit +// ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // 1. copy data in a batch model memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); @@ -1183,7 +1124,6 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - ASSERT(pBlockInfo != NULL); SDataBlk* pBlock = getCurrentBlock(pBlockIter); code = tsdbReadDataBlock(pReader->pFileReader, pBlock, pBlockData); @@ -1221,8 +1161,6 @@ static void cleanupBlockOrderSupporter(SBlockOrderSupporter* pSup) { } static int32_t initBlockOrderSupporter(SBlockOrderSupporter* pSup, int32_t numOfTables) { - ASSERT(numOfTables >= 1); - pSup->numOfBlocksPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); pSup->indexPerTable = taosMemoryCalloc(1, sizeof(int32_t) * numOfTables); pSup->pDataBlockInfo = taosMemoryCalloc(1, POINTER_BYTES * numOfTables); @@ -1329,7 +1267,10 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte sup.numOfTables += 1; } - ASSERT(numOfBlocks == cnt); + if (numOfBlocks != cnt && sup.numOfTables != numOfTables) { + cleanupBlockOrderSupporter(&sup); + return TSDB_CODE_INVALID_PARA; + } // since there is only one table qualified, blocks are not sorted if (sup.numOfTables == 1) { @@ -1351,10 +1292,9 @@ static int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIte tsdbDebug("%p create data blocks info struct completed, %d blocks in %d tables %s", pReader, cnt, sup.numOfTables, pReader->idStr); - ASSERT(cnt <= numOfBlocks && sup.numOfTables <= numOfTables); - SMultiwayMergeTreeInfo* pTree = NULL; - uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); + + uint8_t ret = tMergeTreeCreate(&pTree, sup.numOfTables, &sup, fileDataBlockOrderCompar); if (ret != TSDB_CODE_SUCCESS) { cleanupBlockOrderSupporter(&sup); return TSDB_CODE_OUT_OF_MEMORY; @@ -1432,8 +1372,6 @@ static bool getNeighborBlockOfSameTable(SFileDataBlockInfo* pBlockInfo, STableBl } static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pFBlockInfo) { - ASSERT(pBlockIter != NULL && pFBlockInfo != NULL); - int32_t step = ASCENDING_TRAVERSE(pBlockIter->order) ? 1 : -1; int32_t index = pBlockIter->index; @@ -1924,7 +1862,6 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, } doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLastBlock, &merge, &pReader->verRange); - ASSERT(mergeBlockData); // merge with block data if ts == key if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { @@ -1990,7 +1927,6 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader tRowMergerClear(&merge); return code; } else { - ASSERT(0); return TSDB_CODE_SUCCESS; } } else { // desc order @@ -2011,7 +1947,6 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pDelList, pReader); TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); - ASSERT(pRow != NULL && piRow != NULL); int64_t tsLast = INT64_MIN; if (hasDataInLastBlock(pLastBlockReader)) { @@ -2235,7 +2170,6 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea if (pReader->pReadSnap->pMem != NULL) { d = tsdbGetTbDataFromMemTable(pReader->pReadSnap->pMem, pReader->suid, pBlockScanInfo->uid); if (d != NULL) { - ASSERT(pBlockScanInfo->iter.iter == NULL); code = tsdbTbDataIterCreate(d, &startKey, backward, &pBlockScanInfo->iter.iter); if (code == TSDB_CODE_SUCCESS) { pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL); @@ -2349,10 +2283,9 @@ static int64_t getCurrentKeyInLastBlock(SLastBlockReader* pLastBlockReader) { static bool hasDataInLastBlock(SLastBlockReader* pLastBlockReader) { return pLastBlockReader->mergeTree.pIter != NULL; } bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { - if (pBlockData->nRow > 0) { - ASSERT(pBlockData->nRow == pDumpInfo->totalRows); + if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { + return false; // this is an invalid result. } - return pBlockData->nRow > 0 && (!pDumpInfo->allDumped); } @@ -2583,7 +2516,6 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* int32_t code = 0; SArray* pDelData = taosArrayInit(4, sizeof(SDelData)); - ASSERT(pReader->pReadSnap != NULL); SDelFile* pDelFile = pReader->pReadSnap->fs.pDelFile; if (pDelFile && taosArrayGetSize(pReader->pDelIdx) > 0) { @@ -2868,7 +2800,6 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); if (pBlockInfo == NULL) { // build data block from last data file - ASSERT(pBlockIter->numOfBlocks == 0); code = buildComposedDataBlock(pReader); } else if (fileBlockShouldLoad(pReader, pBlockInfo, pBlock, pScanInfo, keyInBuf, pLastBlockReader)) { code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pScanInfo->uid); @@ -3837,7 +3768,10 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } if (pReader->pSchema != NULL) { - updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); + code = updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } } STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader; @@ -4113,25 +4047,27 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave) { + SColumnDataAgg*** pBlockSMA = &pDataBlock->pBlockAgg; + int32_t code = 0; - SColumnDataAgg ***pBlockSMA = &pDataBlock->pBlockAgg; *allHave = false; + *pBlockSMA = NULL; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { - *pBlockSMA = NULL; return TSDB_CODE_SUCCESS; } // there is no statistics data for composed block if (pReader->status.composedDataBlock || (!pReader->suppInfo.smaValid)) { - *pBlockSMA = NULL; return TSDB_CODE_SUCCESS; } SFileDataBlockInfo* pFBlock = getCurrentBlockInfo(&pReader->status.blockIter); SBlockLoadSuppInfo* pSup = &pReader->suppInfo; - ASSERT(pReader->pResBlock->info.id.uid == pFBlock->uid); + if (pReader->pResBlock->info.id.uid != pFBlock->uid) { + return TSDB_CODE_SUCCESS; + } SDataBlk* pBlock = getCurrentBlock(&pReader->status.blockIter); if (tDataBlkHasSma(pBlock)) { @@ -4187,7 +4123,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SSDataBlock* pDataBlock, } else if (pAgg->colId < pSup->colId[j]) { i += 1; } else if (pSup->colId[j] < pAgg->colId) { - ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); + // ASSERT(pSup->colId[j] == PRIMARYKEY_TIMESTAMP_COL_ID); pResBlock->pBlockAgg[pSup->slotId[j]] = &pSup->tsColAgg; j += 1; } @@ -4418,9 +4354,12 @@ int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int6 return terrno; } sversion = mr.me.stbEntry.schemaRow.version; - } else { - ASSERT(mr.me.type == TSDB_NORMAL_TABLE); + } else if (mr.me.type == TSDB_NORMAL_TABLE) { sversion = mr.me.ntbEntry.schemaRow.version; + } else { + terrno = TSDB_CODE_INVALID_PARA; + metaReaderClear(&mr); + return terrno; } metaReaderClear(&mr); diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index eff7a5ef93..a8051ea7c3 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -62,8 +62,8 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock); pEntry->dataLen = sizeof(SDeleterRes); - ASSERT(1 == pEntry->numOfRows); - ASSERT(3 == pEntry->numOfCols); +// ASSERT(1 == pEntry->numOfRows); +// ASSERT(3 == pEntry->numOfCols); pBuf->useSize = sizeof(SDataCacheEntry); @@ -167,7 +167,6 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataDeleterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - ASSERT(NULL != pBuf); memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); taosFreeQitem(pBuf); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index c2fa438c80..a603bffba5 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -77,8 +77,8 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn pBuf->useSize = sizeof(SDataCacheEntry); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); - ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); - ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); +// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); +// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); pBuf->useSize += pEntry->dataLen; @@ -162,15 +162,14 @@ static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryE SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); - ASSERT(NULL != pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); SDataCacheEntry* pEntry = (SDataCacheEntry*)pDispatcher->nextOutput.pData; *pLen = pEntry->dataLen; - ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); - ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); +// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); +// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); *pQueryEnd = pDispatcher->queryEnd; qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, @@ -193,8 +192,8 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { pOutput->numOfCols = pEntry->numOfCols; pOutput->compressed = pEntry->compressed; - ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); - ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); +// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); +// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); atomic_sub_fetch_64(&pDispatcher->cachedSize, pEntry->dataLen); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 4103ca82dc..9873c52006 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -373,7 +373,6 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->useconds = htobe64(pRsp->useconds); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); - ASSERT(pRsp != NULL); qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo); } else { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 668a93740d..fde13498ea 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -104,8 +104,6 @@ static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, void setOperatorCompleted(SOperatorInfo* pOperator) { pOperator->status = OP_EXEC_DONE; - ASSERT(pOperator->pTaskInfo != NULL); - pOperator->cost.totalCost = (taosGetTimestampUs() - pOperator->pTaskInfo->cost.start) / 1000.0; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); } @@ -524,7 +522,7 @@ bool functionNeedToExecute(SqlFunctionCtx* pCtx) { return true; } -static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, +static int32_t doCreateConstantValColumnSMAInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t type, int32_t paramIndex, int32_t numOfRows) { if (pInput->pData[paramIndex] == NULL) { pInput->pData[paramIndex] = taosMemoryCalloc(1, sizeof(SColumnInfoData)); @@ -548,8 +546,6 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF da = pInput->pColumnDataAgg[paramIndex]; } - ASSERT(!IS_VAR_DATA_TYPE(type)); - if (type == TSDB_DATA_TYPE_BIGINT) { int64_t v = pFuncParam->param.i; *da = (SColumnDataAgg){.numOfNull = 0, .min = v, .max = v, .sum = v * numOfRows}; @@ -570,7 +566,7 @@ static int32_t doCreateConstantValColumnAggInfo(SInputColumnInfoData* pInput, SF } else if (type == TSDB_DATA_TYPE_TIMESTAMP) { // do nothing } else { - ASSERT(0); + qError("invalid constant type for sma info"); } return TSDB_CODE_SUCCESS; @@ -600,7 +596,7 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB // the data in the corresponding SColumnInfoData will not be used. pInput->pData[j] = taosArrayGet(pBlock->pDataBlock, slotId); } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { - doCreateConstantValColumnAggInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); + doCreateConstantValColumnSMAInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); } } } else { @@ -2217,13 +2213,11 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { *ppNode = (STableScanPhysiNode*)pNode; return 0; } else { - ASSERT(0); terrno = TSDB_CODE_APP_ERROR; return -1; } } else { if (LIST_LENGTH(pNode->pChildren) != 1) { - ASSERT(0); terrno = TSDB_CODE_APP_ERROR; return -1; } @@ -2233,32 +2227,6 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) { return -1; } -#if 0 -int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) { - STableScanInfo* pTableScanInfo = NULL; - if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) { - return -1; - } - - STableScanPhysiNode* pNode = NULL; - if (extractTableScanNode(plan->pNode, &pNode) < 0) { - ASSERT(0); - } - - tsdbReaderClose(pTableScanInfo->dataReader); - - STableListInfo info = {0}; - pTableScanInfo->dataReader = doCreateDataReader(pNode, pHandle, &info, NULL); - if (pTableScanInfo->dataReader == NULL) { - ASSERT(0); - qError("failed to create data reader"); - return TSDB_CODE_APP_ERROR; - } - // TODO: set uid and ts to data reader - return 0; -} -#endif - int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 8a097a23ce..88ed9eccb3 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -42,38 +42,40 @@ typedef struct SJoinOperatorInfo { static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param); -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode); +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, + SSortMergeJoinPhysiNode* pJoinNode, const char* idStr); -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode) { +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t num, + SSortMergeJoinPhysiNode* pJoinNode, const char* idStr) { SNode* pMergeCondition = pJoinNode->pMergeCondition; - if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; - SColumnNode* col1 = (SColumnNode*)pNode->pLeft; - SColumnNode* col2 = (SColumnNode*)pNode->pRight; - SColumnNode* leftTsCol = NULL; - SColumnNode* rightTsCol = NULL; - if (col1->dataBlockId == col2->dataBlockId ) { + if (nodeType(pMergeCondition) != QUERY_NODE_OPERATOR) { + qError("not support this in join operator, %s", idStr); + return; // do not handle this + } + + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; + SColumnNode* col1 = (SColumnNode*)pNode->pLeft; + SColumnNode* col2 = (SColumnNode*)pNode->pRight; + SColumnNode* leftTsCol = NULL; + SColumnNode* rightTsCol = NULL; + if (col1->dataBlockId == col2->dataBlockId) { + leftTsCol = col1; + rightTsCol = col2; + } else { + if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { + ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); leftTsCol = col1; rightTsCol = col2; } else { - if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { - ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); - leftTsCol = col1; - rightTsCol = col2; - } else { - ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId); - ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId); - leftTsCol = col2; - rightTsCol = col1; - } + ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId); + ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId); + leftTsCol = col2; + rightTsCol = col1; } - setJoinColumnInfo(&pInfo->leftCol, leftTsCol); - setJoinColumnInfo(&pInfo->rightCol, rightTsCol); - } else { - ASSERT(false); - }} + } + setJoinColumnInfo(&pInfo->leftCol, leftTsCol); + setJoinColumnInfo(&pInfo->rightCol, rightTsCol); +} SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -97,7 +99,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.numOfExprs = numOfCols; - extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); + extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode, GET_TASKID(pTaskInfo)); if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); @@ -364,8 +366,6 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs char* pRightVal = colDataGetData(pRightCol, pJoinInfo->rightPos); *pRightTs = *(int64_t*)pRightVal; - ASSERT(pLeftCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); - ASSERT(pRightCol->info.type == TSDB_DATA_TYPE_TIMESTAMP); return true; } diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 7ac007b7cb..ee7f88e813 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -139,7 +139,6 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); - // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -272,7 +271,6 @@ void destroySortOperatorInfo(void* param) { } int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - ASSERT(pOptr != NULL); SSortExecInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SSortOperatorInfo* pOperatorInfo = (SSortOperatorInfo*)pOptr->info; @@ -329,7 +327,6 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo int32_t numOfCols = taosArrayGetSize(pColMatchInfo); for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pmInfo = taosArrayGet(pColMatchInfo, i); - // ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID); SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId); SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->dstSlotId); @@ -746,7 +743,6 @@ void destroyMultiwayMergeOperatorInfo(void* param) { } int32_t getMultiwayMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len) { - ASSERT(pOptr != NULL); SSortExecInfo* pSortExecInfo = taosMemoryCalloc(1, sizeof(SSortExecInfo)); SMultiwayMergeOperatorInfo* pInfo = (SMultiwayMergeOperatorInfo*)pOptr->info; diff --git a/source/libs/executor/src/tsimplehash.c b/source/libs/executor/src/tsimplehash.c index 484d917069..fd6215e3a1 100644 --- a/source/libs/executor/src/tsimplehash.c +++ b/source/libs/executor/src/tsimplehash.c @@ -49,7 +49,9 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { } SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { - ASSERT(fn != NULL); + if (fn == NULL) { + return NULL; + } if (capacity == 0) { capacity = 4; @@ -66,7 +68,6 @@ SSHashObj *tSimpleHashInit(size_t capacity, _hash_fn_t fn) { pHashObj->equalFp = memcmp; pHashObj->hashFp = fn; - ASSERT((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); pHashObj->hashList = (SHNode **)taosMemoryCalloc(pHashObj->capacity, sizeof(void *)); if (!pHashObj->hashList) { diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 30911887bb..fa0cdb3943 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -800,6 +800,7 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) { } } + // all sources are completed. if (pHandle->cmpParam.numOfSources == pHandle->numOfCompletedSources) { return NULL; }