diff --git a/include/client/taos.h b/include/client/taos.h index 73ab52357a..55b24c8721 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -210,6 +210,7 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); +DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 080416ca2e..b4dd6d61e4 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -113,10 +113,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u if (pColAgg != NULL && pColAgg->colId != -1) { if (pColAgg->numOfNull == totalRows) { - ASSERT(pColumnInfoData->nullbitmap == NULL); return true; } else if (pColAgg->numOfNull == 0) { - ASSERT(pColumnInfoData->nullbitmap == NULL); return false; } } @@ -159,40 +157,32 @@ static FORCE_INLINE void colDataSetNNULL(SColumnInfoData* pColumnInfoData, uint3 } static FORCE_INLINE void colDataSetInt8(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int8_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT || - pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int8_t*)p = *(int8_t*)v; } static FORCE_INLINE void colDataSetInt16(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int16_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || - pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int16_t*)p = *(int16_t*)v; } static FORCE_INLINE void colDataSetInt32(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int32_t* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int32_t*)p = *(int32_t*)v; } static FORCE_INLINE void colDataSetInt64(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int64_t* v) { int32_t type = pColumnInfoData->info.type; - ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(int64_t*)p = *(int64_t*)v; } static FORCE_INLINE void colDataSetFloat(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, float* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(float*)p = *(float*)v; } static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, double* v) { - ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE); char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; *(double*)p = *(double*)v; } diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index edaa86befe..4a78ce957d 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -860,6 +860,44 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) { return pResInfo->pCol[columnIndex].offset; } +int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){ + if (res == NULL || result == NULL || rows == NULL || *rows <= 0 || + columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) { + return TSDB_CODE_INVALID_PARA; + } + + int32_t numOfFields = taos_num_fields(res); + if (columnIndex >= numOfFields || numOfFields == 0) { + return TSDB_CODE_INVALID_PARA; + } + + SReqResultInfo *pResInfo = tscGetCurResInfo(res); + TAOS_FIELD *pField = &pResInfo->userFields[columnIndex]; + SResultColumn *pCol = &pResInfo->pCol[columnIndex]; + + if (*rows > pResInfo->numOfRows){ + *rows = pResInfo->numOfRows; + } + if (IS_VAR_DATA_TYPE(pField->type)) { + for(int i = 0; i < *rows; i++){ + if(pCol->offset[i] == -1){ + result[i] = true; + }else{ + result[i] = false; + } + } + }else{ + for(int i = 0; i < *rows; i++){ + if (colDataIsNull_f(pCol->nullbitmap, i)){ + result[i] = true; + }else{ + result[i] = false; + } + } + } + return 0; +} + int taos_validate_sql(TAOS *taos, const char *sql) { TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 16710d9555..8e50c943b9 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -81,7 +81,7 @@ int32_t getJsonValueLen(const char* data) { } else if (tTagIsJson(data)) { // json string dataLen = ((STag*)(data))->len; } else { - ASSERT(0); + uError("Invalid data type:%d in Json", *data); } return dataLen; } @@ -801,7 +801,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd size_t rowSize = blockDataGetRowSize(pBlock); int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize); if (capacity <= 0) { - return TSDB_CODE_FAILED; + return terrno; } *stopIndex = startIndex + capacity - 1; @@ -835,7 +835,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd if (size > pageSize) { // pageSize must be able to hold one row *stopIndex = j - 1; if (*stopIndex < startIndex) { - return TSDB_CODE_FAILED; + return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; } return TSDB_CODE_SUCCESS; @@ -2060,7 +2060,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat } // todo disable it temporarily - // ASSERT(pColInfoData->info.type != 0); + // A S S E R T(pColInfoData->info.type != 0); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { pBlock->info.hasVarCol = true; } @@ -2100,14 +2100,18 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t payloadSize = pageSize - extraSize; int32_t rowSize = pBlock->info.rowSize; int32_t nRows = payloadSize / rowSize; - ASSERT(nRows >= 1); + if (nRows < 1) { + uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } int32_t numVarCols = 0; int32_t numFixCols = 0; for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); if (pCol == NULL) { - return terrno; + return -1; } if (IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -2135,7 +2139,11 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int int32_t newRows = (result != -1) ? result - 1 : nRows; // the true value must be less than the value of nRows - ASSERT(newRows <= nRows && newRows >= 1); + if (newRows > nRows || newRows < 1) { + uError("invalid newRows:%d, nRows:%d", newRows, nRows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } return newRows; } @@ -2616,7 +2624,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } // the rsma result should has the same column number with schema. - ASSERT(colNum == pTSchema->numOfCols); + if (colNum != pTSchema->numOfCols) { + uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols); + code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + goto _end; + } SSubmitTbData tbData = {0}; @@ -2652,10 +2664,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: - ASSERT(pColInfoData->info.type == pCol->type); + if (pColInfoData->info.type != pCol->type) { + uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } if (!isStartKey) { isStartKey = true; - ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); + if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) { + uError("the first timestamp colId %d is not primary colId", pCol->colId); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); void* px = taosArrayPush(pVals, &cv); if (px == NULL) { @@ -2679,7 +2699,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY - ASSERT(pColInfoData->info.type == pCol->type); + if (pColInfoData->info.type != pCol->type) { + uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } if (colDataIsNull_s(pColInfoData, j)) { SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); void* px = taosArrayPush(pVals, &cv); @@ -2704,7 +2728,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_MEDIUMBLOB: uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); - ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; break; default: if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { @@ -2752,7 +2777,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat } } else { uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); - ASSERT(0); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; } break; } @@ -2763,7 +2789,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat goto _end; } - ASSERT(pRow); void* px = taosArrayPush(tbData.aRowP, &pRow); if (px == NULL) { code = terrno; @@ -2902,7 +2927,11 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t* rows = (int32_t*)data; *rows = pBlock->info.rows; data += sizeof(int32_t); - ASSERT(*rows > 0); + if (*rows <= 0) { + uError("Invalid rows %d in block", *rows); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return -1; + } int32_t* cols = (int32_t*)data; *cols = numOfCols; @@ -3055,7 +3084,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos for (int32_t i = 0; i < numOfCols; ++i) { colLen[i] = htonl(colLen[i]); - ASSERT(colLen[i] >= 0); + if (colLen[i] < 0) { + uError("block decode colLen:%d error, colIdx:%d", colLen[i], i); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); if (pColInfoData == NULL) { @@ -3099,7 +3132,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos pBlock->info.dataLoad = 1; pBlock->info.rows = numOfRows; pBlock->info.blankFill = blankFill; - ASSERT(pStart - pData == dataLen); + if (pStart - pData != dataLen) { + uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen); + terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; + return terrno; + } *pEndPos = pStart; return code; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index e35b26627b..95035dd96f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -32,7 +32,6 @@ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ do { \ - assert(sizeof(_uid) == sizeof(uint64_t)); \ *(uint64_t*)(_k) = (_uid); \ (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 64db1a57a0..ceb0037b8d 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -357,7 +357,6 @@ typedef struct SMJoinOperatorInfo { #define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \ do { \ - ASSERT(taosArrayGetSize((_cache)->grps) <= 1); \ SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \ (_cache)->rowNum += (_blk)->info.rows; \ pGrp->blk = (_blk); \ @@ -381,7 +380,6 @@ typedef struct SMJoinOperatorInfo { do { \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ if (NULL != pGrp) { \ - ASSERT(pGrp->blk == (_tb)->blk); \ pGrp->beginIdx = (_tb)->blkRowIdx; \ pGrp->readIdx = pGrp->beginIdx; \ } \ diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 7d09be3300..0df676c6e2 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -182,6 +182,7 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo); void destroyOperator(SOperatorInfo* pOperator); +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** stream, int32_t num); int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index fe82e0eb62..f0e0f81cf5 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -137,22 +137,14 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); if (pInfo != NULL) { destroyAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/countwindowoperator.c b/source/libs/executor/src/countwindowoperator.c index 1d72b0bb58..28b2c22053 100644 --- a/source/libs/executor/src/countwindowoperator.c +++ b/source/libs/executor/src/countwindowoperator.c @@ -378,20 +378,14 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyCountWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index a75bfb8f4b..8058fa9afe 100644 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -1006,14 +1006,14 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyDynQueryCtrlOperator(pInfo); } - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 0b5fd074b0..591590a261 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -140,20 +140,14 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyEWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 4315624d97..120dcbc205 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -444,7 +444,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4ae3226f48..246a5e2a6d 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -511,26 +511,21 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); - } + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + if (pInfo != NULL) { destroyFillOperatorInfo(pInfo); } - + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } return code; } diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 9b213487ed..f0e0894bd2 100644 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -1510,20 +1510,14 @@ int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfD qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyGroupCacheOperator(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) { - destroyOperator(*pDownstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f0a4914c24..9b46db609f 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -600,22 +600,12 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: + if (pInfo != NULL) destroyGroupOperatorInfo(pInfo); + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; - if (pInfo != NULL) { - destroyGroupOperatorInfo(pInfo); - } - - if (pOperator) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } - return code; } @@ -1209,6 +1199,11 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo pInfo->rowCapacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); + if (pInfo->rowCapacity < 0) { + code = terrno; + goto _error; + } + pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); @@ -1229,20 +1224,14 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyPartitionOperatorInfo(pInfo); } pTaskInfo->code = code; - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); TAOS_RETURN(code); } @@ -1778,18 +1767,12 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 55620defba..f253aefe95 100644 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -1228,15 +1228,14 @@ int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDow qDebug("create hash Join operator done"); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _return: if (pInfo != NULL) { destroyHashJoinOperator(pInfo); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 808aac66c2..14f3a08e17 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -1869,6 +1869,7 @@ int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDo SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { QRY_OPTR_CHECK(pOptrInfo); + int32_t oldNum = numOfDownstream; bool newDownstreams = false; int32_t code = TSDB_CODE_SUCCESS; SOperatorInfo* pOperator = NULL; @@ -1921,8 +1922,7 @@ _return: if (newDownstreams) { taosMemoryFree(pDownstream); } - - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum); pTaskInfo->code = code; return code; diff --git a/source/libs/executor/src/mergeoperator.c b/source/libs/executor/src/mergeoperator.c index 9e0ad5f497..c12bfd8798 100644 --- a/source/libs/executor/src/mergeoperator.c +++ b/source/libs/executor/src/mergeoperator.c @@ -649,14 +649,13 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyMultiwayMergeOperatorInfo(pInfo); } - pTaskInfo->code = code; - taosMemoryFree(pOperator); + destroyOperatorAndDownstreams(pOperator, downStreams, numStreams); return code; } diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index fc52b97388..3f48d0f0a8 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -633,7 +633,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } else { code = TSDB_CODE_INVALID_PARA; pTaskInfo->code = code; + for (int32_t i = 0; i < size; ++i) { + destroyOperator(ops[i]); + } taosMemoryFree(ops); + qError("invalid operator type %d", type); return code; } @@ -672,6 +676,23 @@ void destroyOperator(SOperatorInfo* pOperator) { taosMemoryFreeClear(pOperator); } +void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) { + if (downstreams != NULL) { + for (int i = 0; i < num; i++) { + destroyOperator(downstreams[i]); + } + } + + if (pOperator != NULL) { + pOperator->info = NULL; + if (pOperator->pDownstream != NULL) { + taosMemoryFreeClear(pOperator->pDownstream); + pOperator->pDownstream = NULL; + } + destroyOperator(pOperator); + } +} + int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { SExplainExecInfo execInfo = {0}; SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 7e06c083ed..a9ba57e1d4 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -177,17 +177,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -525,17 +519,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 557794a062..b63fe1198d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1528,7 +1528,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index fb4b61c7a8..6083cbdcf8 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -155,20 +155,13 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroySortOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -841,19 +834,13 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: pTaskInfo->code = code; if (pInfo != NULL) { destroyGroupSortOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); return code; } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 54ad12cff0..4f11afd35a 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -920,20 +920,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamCountAggOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index a8e14bce68..d7519d90e9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -982,17 +982,11 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; diff --git a/source/libs/executor/src/streamfilloperator.c b/source/libs/executor/src/streamfilloperator.c index 3a6d0c709c..9a66f6d688 100644 --- a/source/libs/executor/src/streamfilloperator.c +++ b/source/libs/executor/src/streamfilloperator.c @@ -1451,20 +1451,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: - if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); - } + qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo)); + if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 6a1a5942d6..8d5aa7104f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2041,17 +2041,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -3875,20 +3869,13 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStreamSessionAggOperatorInfo(pInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -4091,6 +4078,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* pOperator = NULL; code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator); if (pOperator == NULL || code != 0) { + downstream = NULL; QUERY_CHECK_CODE(code, lino, _error); } @@ -4152,9 +4140,6 @@ _error: } if (pOperator != NULL) { pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } destroyOperator(pOperator); } pTaskInfo->code = code; @@ -5052,17 +5037,11 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return code; @@ -5398,17 +5377,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 1b8e6709d1..d57a8c7c5b 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -1192,22 +1192,17 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN // int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); code = appendDownstream(pOperator, &downstream, 1); + QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); } if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 024e0393f0..5499fa3026 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1428,19 +1428,13 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyIntervalOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1703,20 +1697,14 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) { destroyStateWindowOperatorInfo(pInfo); } - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -1805,17 +1793,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2122,17 +2104,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge QUERY_CHECK_CODE(code, lino, _error); *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } @@ -2462,19 +2438,12 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva } *pOptrInfo = pOperator; - return code; + return TSDB_CODE_SUCCESS; _error: if (pMergeIntervalInfo != NULL) { destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); } - - if (pOperator != NULL) { - pOperator->info = NULL; - if (pOperator->pDownstream == NULL && downstream != NULL) { - destroyOperator(downstream); - } - destroyOperator(pOperator); - } + destroyOperatorAndDownstreams(pOperator, &downstream, 1); pTaskInfo->code = code; return code; } diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 55628c18f9..83716d72ad 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -183,7 +183,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { setBufPageDirty(pFirst, true); setBufPageDirty(pLast, true); - // ASSERT(pLast->num >= nodeSize + sizeof(SFilePage)); + // A S S E R T(pLast->num >= nodeSize + sizeof(SFilePage)); pFirst->num += nodeSize; pLast->num -= nodeSize; diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 621d643361..fa1ccc3100 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -970,6 +970,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); + if (numOfRows < 0) { + return terrno; + } + int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); if (code) { return code; @@ -1999,6 +2003,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray* int32_t code = TSDB_CODE_SUCCESS; int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); + if (rowCap < 0) { + return terrno; + } code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); if (code) { diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index c1afc4afb3..7bddc068e3 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1089,8 +1089,6 @@ void nodesDestroyNode(SNode* pNode) { if (pStmt->destroyParseFileCxt) { pStmt->destroyParseFileCxt(&pStmt->pParFileCxt); } - - assert(TSDB_CODE_SUCCESS == taosCloseFile(&pStmt->fp)); break; } case QUERY_NODE_CREATE_DATABASE_STMT: diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index d48871fd70..3aab190eda 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -6578,7 +6578,6 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode, void* pCtx) { return false; } - assert(pFuncs); FOREACH(pTmpNode, pFuncs) { SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) && @@ -7271,7 +7270,6 @@ static int32_t tsmaOptRewriteParent(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParen if (code == TSDB_CODE_SUCCESS && pWindow) { SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode; - assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID); nodesDestroyNode(pWindow->pTspk); pWindow->pTspk = NULL; code = nodesCloneNode((SNode*)pCol, &pWindow->pTspk); diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index cbc7ca77bb..f5712d135b 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -5221,7 +5221,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options) FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat)); } - return code; + return TSDB_CODE_SUCCESS; _return: diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index afe315bf58..3b7b999dcb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -310,6 +310,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { + stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId); if (pTask->pState != NULL) { stDebug("s-task:0x%x start to free task state", pTask->id.taskId); streamStateClose(pTask->pState, remove); @@ -319,8 +320,11 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { pTask->pBackend = NULL; pTask->pState = NULL; } else { + stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId, + pTask->backendPath ? pTask->backendPath : "NULL"); if (remove) { if (pTask->backendPath != NULL) { + stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath); taosRemoveDir(pTask->backendPath); } } @@ -373,11 +377,11 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { int32_t taskId = 0; if (pTask->info.fillHistory) { - streamId = pTask->hTaskInfo.id.taskId; - taskId = pTask->hTaskInfo.id.taskId; - } else { - streamId = pTask->streamTaskId.taskId; + streamId = pTask->streamTaskId.streamId; taskId = pTask->streamTaskId.taskId; + } else { + streamId = pTask->id.streamId; + taskId = pTask->id.taskId; } char id[128] = {0}; @@ -393,6 +397,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) { } (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); + stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath); return 0; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index b532372148..073d6c0f17 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -253,20 +253,20 @@ static void cliWalkCb(uv_handle_t* handle, void* arg); #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) -#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ - do { \ - int i = 0, sz = transQueueSize(&conn->cliMsgs); \ - for (; i < sz; i++) { \ - pMsg = transQueueGet(&conn->cliMsgs, i); \ - if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ - break; \ - } \ - } \ - if (i == sz) { \ - pMsg = NULL; \ - } else { \ - pMsg = transQueueRm(&conn->cliMsgs, i); \ - } \ +#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ + do { \ + int i = 0, sz = transQueueSize(&conn->cliMsgs); \ + for (; i < sz; i++) { \ + pMsg = transQueueGet(&conn->cliMsgs, i); \ + if (pMsg->msg.msgType != TDMT_SCH_DROP_TASK && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ + break; \ + } \ + } \ + if (i == sz) { \ + pMsg = NULL; \ + } else { \ + pMsg = transQueueRm(&conn->cliMsgs, i); \ + } \ } while (0) #define CONN_GET_NEXT_SENDMSG(conn) \ diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 33e0076c6e..24b60e8d13 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -374,6 +374,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * LRUStatus status = TAOS_LRU_STATUS_OK; SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); if (!lastReferenceList) { + taosLRUEntryFree(e); return TAOS_LRU_STATUS_FAIL; } @@ -385,13 +386,12 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); if (handle == NULL) { if (!taosArrayPush(lastReferenceList, &e)) { - (void)taosThreadMutexUnlock(&shard->mutex); taosLRUEntryFree(e); - return status; + goto _exit; } } else { if (freeOnFail) { - taosMemoryFree(e); + taosLRUEntryFree(e); *handle = NULL; } @@ -410,9 +410,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * shard->usage -= old->totalCharge; if (!taosArrayPush(lastReferenceList, &old)) { - (void)taosThreadMutexUnlock(&shard->mutex); + taosLRUEntryFree(e); taosLRUEntryFree(old); - return status; + goto _exit; } } } @@ -427,6 +427,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry * } } +_exit: (void)taosThreadMutexUnlock(&shard->mutex); for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index db5eb21ad8..e5902856e6 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) add_executable(get_db_name_test get_db_name_test.c) add_executable(tmq_offset tmqOffset.c) +add_executable(tmq_multi_thread_test tmq_multi_thread_test.c) add_executable(tmq_offset_test tmq_offset_test.c) add_executable(varbinary_test varbinary_test.c) add_executable(replay_test replay_test.c) @@ -22,6 +23,15 @@ target_link_libraries( PUBLIC common PUBLIC os ) + +target_link_libraries( + tmq_multi_thread_test + PUBLIC taos + PUBLIC util + PUBLIC common + PUBLIC os +) + target_link_libraries( create_table PUBLIC taos diff --git a/utils/test/c/tmq_multi_thread_test.c b/utils/test/c/tmq_multi_thread_test.c new file mode 100644 index 0000000000..0b6117c89b --- /dev/null +++ b/utils/test/c/tmq_multi_thread_test.c @@ -0,0 +1,115 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" +#include "types.h" + +void* consumeThreadFunc(void* param) { + int32_t* index = (int32_t*) param; + tmq_conf_t* conf = tmq_conf_new(); + char groupId[64] = {0}; + int64_t t = taosGetTimestampMs(); + sprintf(groupId, "group_%"PRId64"_%d", t, *index); + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", groupId); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "false"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "select_d4"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + int32_t timeout = 200; + int32_t totalRows = 0; + while (1) { + printf("start to poll\n"); + + TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + int32_t rows = 0; + void* data = NULL; + taos_fetch_raw_block(pRes, &rows, &data); + + totalRows+=rows; + int cols = taos_num_fields(pRes); + for(int32_t i = 0; i < cols; ++i) { + int64_t start = taosGetTimestampUs(); + for (int32_t j = 0; j < rows; ++j) { + //int64_t t1 = taosGetTimestampUs(); + taos_is_null(pRes, j, i); + //int64_t t2 = taosGetTimestampUs(); + //printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1); + } + int64_t end = taosGetTimestampUs(); + bool* isNULL = taosMemoryCalloc(rows, sizeof(bool)); + int code = taos_is_null_by_column(pRes, i, isNULL, &rows); + printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code); + } + + taos_free_result(pRes); + } else { + printf("no data\n"); + break; + } + } + tmq_consumer_close(tmq); + return NULL; +} + +int main(int argc, char* argv[]) { + if (argc != 2) { + printf("Usage: %s \n", argv[0]); + return 0; + } + + int32_t numOfThread = atoi(argv[1]); + TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread)); + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + int64_t t1 = taosGetTimestampUs(); + // pthread_create one thread to consume + int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t)); + for (int32_t i = 0; i < numOfThread; ++i) { + paras[i] = i; + taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(¶s[i])); + } + + for (int32_t i = 0; i < numOfThread; i++) { + taosThreadJoin(thread[i], NULL); + taosThreadClear(&thread[i]); + } + + int64_t t2 = taosGetTimestampUs(); + printf("total cost %"PRId64" us\n", t2 - t1); + taosMemoryFree(paras); + return 0; +}