Merge branch '3.0' into enh/TS-5297-3.0

This commit is contained in:
kailixu 2024-08-30 13:45:01 +08:00
commit 242a417cb2
38 changed files with 354 additions and 292 deletions

View File

@ -210,6 +210,7 @@ DLL_EXPORT int taos_select_db(TAOS *taos, const char *db);
DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields); DLL_EXPORT int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields);
DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT void taos_stop_query(TAOS_RES *res);
DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows);
DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows);

View File

@ -113,10 +113,8 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
if (pColAgg != NULL && pColAgg->colId != -1) { if (pColAgg != NULL && pColAgg->colId != -1) {
if (pColAgg->numOfNull == totalRows) { if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true; return true;
} else if (pColAgg->numOfNull == 0) { } else if (pColAgg->numOfNull == 0) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return false; return false;
} }
} }
@ -159,40 +157,32 @@ static FORCE_INLINE void colDataSetNNULL(SColumnInfoData* pColumnInfoData, uint3
} }
static FORCE_INLINE void colDataSetInt8(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int8_t* v) { static FORCE_INLINE void colDataSetInt8(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int8_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(int8_t*)p = *(int8_t*)v; *(int8_t*)p = *(int8_t*)v;
} }
static FORCE_INLINE void colDataSetInt16(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int16_t* v) { static FORCE_INLINE void colDataSetInt16(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(int16_t*)p = *(int16_t*)v; *(int16_t*)p = *(int16_t*)v;
} }
static FORCE_INLINE void colDataSetInt32(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int32_t* v) { static FORCE_INLINE void colDataSetInt32(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int32_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(int32_t*)p = *(int32_t*)v; *(int32_t*)p = *(int32_t*)v;
} }
static FORCE_INLINE void colDataSetInt64(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int64_t* v) { static FORCE_INLINE void colDataSetInt64(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, int64_t* v) {
int32_t type = pColumnInfoData->info.type; int32_t type = pColumnInfoData->info.type;
ASSERT(type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_UBIGINT || type == TSDB_DATA_TYPE_TIMESTAMP);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(int64_t*)p = *(int64_t*)v; *(int64_t*)p = *(int64_t*)v;
} }
static FORCE_INLINE void colDataSetFloat(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, float* v) { static FORCE_INLINE void colDataSetFloat(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, float* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(float*)p = *(float*)v; *(float*)p = *(float*)v;
} }
static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, double* v) { static FORCE_INLINE void colDataSetDouble(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, double* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex; char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex;
*(double*)p = *(double*)v; *(double*)p = *(double*)v;
} }

View File

@ -860,6 +860,44 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
return pResInfo->pCol[columnIndex].offset; return pResInfo->pCol[columnIndex].offset;
} }
int taos_is_null_by_column(TAOS_RES *res, int columnIndex, bool result[], int *rows){
if (res == NULL || result == NULL || rows == NULL || *rows <= 0 ||
columnIndex < 0 || TD_RES_TMQ_META(res) || TD_RES_TMQ_BATCH_META(res)) {
return TSDB_CODE_INVALID_PARA;
}
int32_t numOfFields = taos_num_fields(res);
if (columnIndex >= numOfFields || numOfFields == 0) {
return TSDB_CODE_INVALID_PARA;
}
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
SResultColumn *pCol = &pResInfo->pCol[columnIndex];
if (*rows > pResInfo->numOfRows){
*rows = pResInfo->numOfRows;
}
if (IS_VAR_DATA_TYPE(pField->type)) {
for(int i = 0; i < *rows; i++){
if(pCol->offset[i] == -1){
result[i] = true;
}else{
result[i] = false;
}
}
}else{
for(int i = 0; i < *rows; i++){
if (colDataIsNull_f(pCol->nullbitmap, i)){
result[i] = true;
}else{
result[i] = false;
}
}
}
return 0;
}
int taos_validate_sql(TAOS *taos, const char *sql) { int taos_validate_sql(TAOS *taos, const char *sql) {
TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP); TAOS_RES *pObj = taosQueryImpl(taos, sql, true, TD_REQ_FROM_APP);

View File

@ -81,7 +81,7 @@ int32_t getJsonValueLen(const char* data) {
} else if (tTagIsJson(data)) { // json string } else if (tTagIsJson(data)) { // json string
dataLen = ((STag*)(data))->len; dataLen = ((STag*)(data))->len;
} else { } else {
ASSERT(0); uError("Invalid data type:%d in Json", *data);
} }
return dataLen; return dataLen;
} }
@ -801,7 +801,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
size_t rowSize = blockDataGetRowSize(pBlock); size_t rowSize = blockDataGetRowSize(pBlock);
int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize); int32_t capacity = blockDataGetCapacityInRow(pBlock, pageSize, headerSize + colHeaderSize);
if (capacity <= 0) { if (capacity <= 0) {
return TSDB_CODE_FAILED; return terrno;
} }
*stopIndex = startIndex + capacity - 1; *stopIndex = startIndex + capacity - 1;
@ -835,7 +835,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
if (size > pageSize) { // pageSize must be able to hold one row if (size > pageSize) { // pageSize must be able to hold one row
*stopIndex = j - 1; *stopIndex = j - 1;
if (*stopIndex < startIndex) { if (*stopIndex < startIndex) {
return TSDB_CODE_FAILED; return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -2060,7 +2060,7 @@ int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoDat
} }
// todo disable it temporarily // todo disable it temporarily
// ASSERT(pColInfoData->info.type != 0); // A S S E R T(pColInfoData->info.type != 0);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
pBlock->info.hasVarCol = true; pBlock->info.hasVarCol = true;
} }
@ -2100,14 +2100,18 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int
int32_t payloadSize = pageSize - extraSize; int32_t payloadSize = pageSize - extraSize;
int32_t rowSize = pBlock->info.rowSize; int32_t rowSize = pBlock->info.rowSize;
int32_t nRows = payloadSize / rowSize; int32_t nRows = payloadSize / rowSize;
ASSERT(nRows >= 1); if (nRows < 1) {
uError("rows %d in page is too small, payloadSize:%d, rowSize:%d", nRows, payloadSize, rowSize);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return -1;
}
int32_t numVarCols = 0; int32_t numVarCols = 0;
int32_t numFixCols = 0; int32_t numFixCols = 0;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i);
if (pCol == NULL) { if (pCol == NULL) {
return terrno; return -1;
} }
if (IS_VAR_DATA_TYPE(pCol->info.type)) { if (IS_VAR_DATA_TYPE(pCol->info.type)) {
@ -2135,7 +2139,11 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize, int
int32_t newRows = (result != -1) ? result - 1 : nRows; int32_t newRows = (result != -1) ? result - 1 : nRows;
// the true value must be less than the value of nRows // the true value must be less than the value of nRows
ASSERT(newRows <= nRows && newRows >= 1); if (newRows > nRows || newRows < 1) {
uError("invalid newRows:%d, nRows:%d", newRows, nRows);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return -1;
}
return newRows; return newRows;
} }
@ -2616,7 +2624,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
// the rsma result should has the same column number with schema. // the rsma result should has the same column number with schema.
ASSERT(colNum == pTSchema->numOfCols); if (colNum != pTSchema->numOfCols) {
uError("colNum %d is not equal to numOfCols %d", colNum, pTSchema->numOfCols);
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _end;
}
SSubmitTbData tbData = {0}; SSubmitTbData tbData = {0};
@ -2652,10 +2664,18 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
switch (pColInfoData->info.type) { switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
ASSERT(pColInfoData->info.type == pCol->type); if (pColInfoData->info.type != pCol->type) {
uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
}
if (!isStartKey) { if (!isStartKey) {
isStartKey = true; isStartKey = true;
ASSERT(PRIMARYKEY_TIMESTAMP_COL_ID == pCol->colId); if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) {
uError("the first timestamp colId %d is not primary colId", pCol->colId);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
}
SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var})); SColVal cv = COL_VAL_VALUE(pCol->colId, ((SValue){.type = pCol->type, .val = *(TSKEY*)var}));
void* px = taosArrayPush(pVals, &cv); void* px = taosArrayPush(pVals, &cv);
if (px == NULL) { if (px == NULL) {
@ -2679,7 +2699,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_VARBINARY: case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
ASSERT(pColInfoData->info.type == pCol->type); if (pColInfoData->info.type != pCol->type) {
uError("colType:%d mismatch with sechma colType:%d", pColInfoData->info.type, pCol->type);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
}
if (colDataIsNull_s(pColInfoData, j)) { if (colDataIsNull_s(pColInfoData, j)) {
SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type); SColVal cv = COL_VAL_NULL(pCol->colId, pCol->type);
void* px = taosArrayPush(pVals, &cv); void* px = taosArrayPush(pVals, &cv);
@ -2704,7 +2728,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB: case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
break; break;
default: default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
@ -2752,7 +2777,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
} }
} else { } else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
ASSERT(0); terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
} }
break; break;
} }
@ -2763,7 +2789,6 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq2** ppReq, const SSDataBlock* pDat
goto _end; goto _end;
} }
ASSERT(pRow);
void* px = taosArrayPush(tbData.aRowP, &pRow); void* px = taosArrayPush(tbData.aRowP, &pRow);
if (px == NULL) { if (px == NULL) {
code = terrno; code = terrno;
@ -2902,7 +2927,11 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
int32_t* rows = (int32_t*)data; int32_t* rows = (int32_t*)data;
*rows = pBlock->info.rows; *rows = pBlock->info.rows;
data += sizeof(int32_t); data += sizeof(int32_t);
ASSERT(*rows > 0); if (*rows <= 0) {
uError("Invalid rows %d in block", *rows);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return -1;
}
int32_t* cols = (int32_t*)data; int32_t* cols = (int32_t*)data;
*cols = numOfCols; *cols = numOfCols;
@ -3055,7 +3084,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0); if (colLen[i] < 0) {
uError("block decode colLen:%d error, colIdx:%d", colLen[i], i);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
}
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (pColInfoData == NULL) { if (pColInfoData == NULL) {
@ -3099,7 +3132,11 @@ int32_t blockDecode(SSDataBlock* pBlock, const char* pData, const char** pEndPos
pBlock->info.dataLoad = 1; pBlock->info.dataLoad = 1;
pBlock->info.rows = numOfRows; pBlock->info.rows = numOfRows;
pBlock->info.blankFill = blankFill; pBlock->info.blankFill = blankFill;
ASSERT(pStart - pData == dataLen); if (pStart - pData != dataLen) {
uError("block decode msg len error, pStart:%p, pData:%p, dataLen:%d", pStart, pData, dataLen);
terrno = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
return terrno;
}
*pEndPos = pStart; *pEndPos = pStart;
return code; return code;

View File

@ -32,7 +32,6 @@
#define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \ #define SET_RES_WINDOW_KEY(_k, _ori, _len, _uid) \
do { \ do { \
assert(sizeof(_uid) == sizeof(uint64_t)); \
*(uint64_t*)(_k) = (_uid); \ *(uint64_t*)(_k) = (_uid); \
(void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ (void)memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \
} while (0) } while (0)

View File

@ -357,7 +357,6 @@ typedef struct SMJoinOperatorInfo {
#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \ #define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \
do { \ do { \
ASSERT(taosArrayGetSize((_cache)->grps) <= 1); \
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \ SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayReserve((_cache)->grps, 1); \
(_cache)->rowNum += (_blk)->info.rows; \ (_cache)->rowNum += (_blk)->info.rows; \
pGrp->blk = (_blk); \ pGrp->blk = (_blk); \
@ -381,7 +380,6 @@ typedef struct SMJoinOperatorInfo {
do { \ do { \
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \ SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
if (NULL != pGrp) { \ if (NULL != pGrp) { \
ASSERT(pGrp->blk == (_tb)->blk); \
pGrp->beginIdx = (_tb)->blkRowIdx; \ pGrp->beginIdx = (_tb)->blkRowIdx; \
pGrp->readIdx = pGrp->beginIdx; \ pGrp->readIdx = pGrp->beginIdx; \
} \ } \

View File

@ -182,6 +182,7 @@ int16_t getOperatorResultBlockId(struct SOperatorInfo* pOperator, int32_t
int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond,
SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo); SNode* pTagIndexCond, const char* pUser, const char* dbname, SOperatorInfo** pOptrInfo);
void destroyOperator(SOperatorInfo* pOperator); void destroyOperator(SOperatorInfo* pOperator);
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** stream, int32_t num);
int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo); int32_t extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, const char* id, SOperatorInfo** pOptrInfo);
int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder);

View File

@ -137,22 +137,14 @@ int32_t createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pA
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) { if (pInfo != NULL) {
destroyAggOperatorInfo(pInfo); destroyAggOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -378,20 +378,14 @@ int32_t createCountwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyCountWindowOperatorInfo(pInfo); destroyCountWindowOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -1006,14 +1006,14 @@ int32_t createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numO
NULL, optrDefaultGetNextExtFn, NULL); NULL, optrDefaultGetNextExtFn, NULL);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyDynQueryCtrlOperator(pInfo); destroyDynQueryCtrlOperator(pInfo);
} }
taosMemoryFree(pOperator); destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -140,20 +140,14 @@ int32_t createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNode* phy
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyEWindowOperatorInfo(pInfo); destroyEWindowOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -444,7 +444,7 @@ int32_t createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNo
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo, pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteDataNext, NULL, destroyExchangeOperatorInfo,
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {

View File

@ -511,26 +511,21 @@ int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFi
optrDefaultGetNextExtFn, NULL); optrDefaultGetNextExtFn, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pInfo != NULL) { if (pInfo != NULL) {
destroyFillOperatorInfo(pInfo); destroyFillOperatorInfo(pInfo);
} }
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code; pTaskInfo->code = code;
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
return code; return code;
} }

View File

@ -1510,20 +1510,14 @@ int32_t createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfD
qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch); qTrace("new group cache operator, maxCacheSize:%" PRId64 ", globalGrp:%d, batchFetch:%d", pInfo->maxCacheSize, pInfo->globalGrp, pInfo->batchFetch);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyGroupCacheOperator(pInfo); destroyGroupCacheOperator(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && pDownstream != NULL && (*pDownstream) != NULL) {
destroyOperator(*pDownstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -600,22 +600,12 @@ int32_t createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNo
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyGroupOperatorInfo(pInfo);
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pTaskInfo->code = code; pTaskInfo->code = code;
if (pInfo != NULL) {
destroyGroupOperatorInfo(pInfo);
}
if (pOperator) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
return code; return code;
} }
@ -1209,6 +1199,11 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
pInfo->rowCapacity = pInfo->rowCapacity =
blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf), blockDataGetCapacityInRow(pInfo->binfo.pRes, getBufPageSize(pInfo->pBuf),
blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock))); blockDataGetSerialMetaSize(taosArrayGetSize(pInfo->binfo.pRes->pDataBlock)));
if (pInfo->rowCapacity < 0) {
code = terrno;
goto _error;
}
pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity); pInfo->columnOffset = setupColumnOffset(pInfo->binfo.pRes, pInfo->rowCapacity);
QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno); QUERY_CHECK_NULL(pInfo->columnOffset, code, lino, _error, terrno);
@ -1229,20 +1224,14 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyPartitionOperatorInfo(pInfo); destroyPartitionOperatorInfo(pInfo);
} }
pTaskInfo->code = code; pTaskInfo->code = code;
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -1778,18 +1767,12 @@ int32_t createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPart
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamPartitionOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code; return code;
} }

View File

@ -1228,15 +1228,14 @@ int32_t createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDow
qDebug("create hash Join operator done"); qDebug("create hash Join operator done");
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_return: _return:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyHashJoinOperator(pInfo); destroyHashJoinOperator(pInfo);
} }
destroyOperatorAndDownstreams(pOperator, pDownstream, numOfDownstream);
taosMemoryFree(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -1869,6 +1869,7 @@ int32_t createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDo
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) { SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
QRY_OPTR_CHECK(pOptrInfo); QRY_OPTR_CHECK(pOptrInfo);
int32_t oldNum = numOfDownstream;
bool newDownstreams = false; bool newDownstreams = false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SOperatorInfo* pOperator = NULL; SOperatorInfo* pOperator = NULL;
@ -1921,8 +1922,7 @@ _return:
if (newDownstreams) { if (newDownstreams) {
taosMemoryFree(pDownstream); taosMemoryFree(pDownstream);
} }
destroyOperatorAndDownstreams(pOperator, pDownstream, oldNum);
taosMemoryFree(pOperator);
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;

View File

@ -649,14 +649,13 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyMultiwayMergeOperatorInfo(pInfo); destroyMultiwayMergeOperatorInfo(pInfo);
} }
pTaskInfo->code = code; pTaskInfo->code = code;
taosMemoryFree(pOperator); destroyOperatorAndDownstreams(pOperator, downStreams, numStreams);
return code; return code;
} }

View File

@ -633,7 +633,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else { } else {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
pTaskInfo->code = code; pTaskInfo->code = code;
for (int32_t i = 0; i < size; ++i) {
destroyOperator(ops[i]);
}
taosMemoryFree(ops); taosMemoryFree(ops);
qError("invalid operator type %d", type);
return code; return code;
} }
@ -672,6 +676,23 @@ void destroyOperator(SOperatorInfo* pOperator) {
taosMemoryFreeClear(pOperator); taosMemoryFreeClear(pOperator);
} }
void destroyOperatorAndDownstreams(SOperatorInfo* pOperator, SOperatorInfo** downstreams, int32_t num) {
if (downstreams != NULL) {
for (int i = 0; i < num; i++) {
destroyOperator(downstreams[i]);
}
}
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream != NULL) {
taosMemoryFreeClear(pOperator->pDownstream);
pOperator->pDownstream = NULL;
}
destroyOperator(pOperator);
}
}
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) { int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList) {
SExplainExecInfo execInfo = {0}; SExplainExecInfo execInfo = {0};
SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo); SExplainExecInfo* pExplainInfo = taosArrayPush(pExecInfoList, &execInfo);

View File

@ -177,17 +177,11 @@ int32_t createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode*
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyProjectOperatorInfo(pInfo); if (pInfo != NULL) destroyProjectOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -525,17 +519,11 @@ int32_t createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo); if (pInfo != NULL) destroyIndefinitOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -1528,7 +1528,7 @@ int32_t createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHa
// for non-blocking operator, the open cost is always 0 // for non-blocking operator, the open cost is always 0
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {

View File

@ -155,20 +155,13 @@ int32_t createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortN
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroySortOperatorInfo(pInfo); destroySortOperatorInfo(pInfo);
} }
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -841,19 +834,13 @@ int32_t createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNo
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
pTaskInfo->code = code; pTaskInfo->code = code;
if (pInfo != NULL) { if (pInfo != NULL) {
destroyGroupSortOperatorInfo(pInfo); destroyGroupSortOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
return code; return code;
} }

View File

@ -920,20 +920,14 @@ int32_t createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyStreamCountAggOperatorInfo(pInfo); destroyStreamCountAggOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code; return code;

View File

@ -982,17 +982,11 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamEventOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code; return code;

View File

@ -1451,20 +1451,13 @@ int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysi
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -2041,17 +2041,11 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -3875,20 +3869,13 @@ int32_t createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyStreamSessionAggOperatorInfo(pInfo); destroyStreamSessionAggOperatorInfo(pInfo);
} }
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code; return code;
@ -4091,6 +4078,7 @@ int32_t createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhys
SOperatorInfo* pOperator = NULL; SOperatorInfo* pOperator = NULL;
code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator); code = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo, pHandle, &pOperator);
if (pOperator == NULL || code != 0) { if (pOperator == NULL || code != 0) {
downstream = NULL;
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
} }
@ -4152,9 +4140,6 @@ _error:
} }
if (pOperator != NULL) { if (pOperator != NULL) {
pOperator->info = NULL; pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator); destroyOperator(pOperator);
} }
pTaskInfo->code = code; pTaskInfo->code = code;
@ -5052,17 +5037,11 @@ int32_t createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamStateOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
return code; return code;
@ -5398,17 +5377,11 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo); if (pInfo != NULL) destroyStreamFinalIntervalOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -1192,22 +1192,17 @@ int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyN
// int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp); // int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
} }
if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo); if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -1428,19 +1428,13 @@ int32_t createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyIntervalOperatorInfo(pInfo); destroyIntervalOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -1703,20 +1697,14 @@ int32_t createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhy
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) { if (pInfo != NULL) {
destroyStateWindowOperatorInfo(pInfo); destroyStateWindowOperatorInfo(pInfo);
} }
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -1805,17 +1793,11 @@ int32_t createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPh
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pInfo != NULL) destroySWindowOperatorInfo(pInfo); if (pInfo != NULL) destroySWindowOperatorInfo(pInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -2122,17 +2104,11 @@ int32_t createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
QUERY_CHECK_CODE(code, lino, _error); QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo); if (miaInfo != NULL) destroyMAIOperatorInfo(miaInfo);
if (pOperator != NULL) { destroyOperatorAndDownstreams(pOperator, &downstream, 1);
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }
@ -2462,19 +2438,12 @@ int32_t createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeInterva
} }
*pOptrInfo = pOperator; *pOptrInfo = pOperator;
return code; return TSDB_CODE_SUCCESS;
_error: _error:
if (pMergeIntervalInfo != NULL) { if (pMergeIntervalInfo != NULL) {
destroyMergeIntervalOperatorInfo(pMergeIntervalInfo); destroyMergeIntervalOperatorInfo(pMergeIntervalInfo);
} }
destroyOperatorAndDownstreams(pOperator, &downstream, 1);
if (pOperator != NULL) {
pOperator->info = NULL;
if (pOperator->pDownstream == NULL && downstream != NULL) {
destroyOperator(downstream);
}
destroyOperator(pOperator);
}
pTaskInfo->code = code; pTaskInfo->code = code;
return code; return code;
} }

View File

@ -183,7 +183,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) {
setBufPageDirty(pFirst, true); setBufPageDirty(pFirst, true);
setBufPageDirty(pLast, true); setBufPageDirty(pLast, true);
// ASSERT(pLast->num >= nodeSize + sizeof(SFilePage)); // A S S E R T(pLast->num >= nodeSize + sizeof(SFilePage));
pFirst->num += nodeSize; pFirst->num += nodeSize;
pLast->num -= nodeSize; pLast->num -= nodeSize;

View File

@ -970,6 +970,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize,
blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock))); blockDataGetSerialMetaSize(taosArrayGetSize(pHandle->pDataBlock->pDataBlock)));
if (numOfRows < 0) {
return terrno;
}
int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows); int32_t code = blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
if (code) { if (code) {
return code; return code;
@ -1999,6 +2003,9 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock); int32_t pageHeaderSize = sizeof(int32_t) + sizeof(int32_t) * blockDataGetNumOfCols(pHandle->pDataBlock);
int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize); int32_t rowCap = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize, pageHeaderSize);
if (rowCap < 0) {
return terrno;
}
code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap); code = blockDataEnsureCapacity(pHandle->pDataBlock, rowCap);
if (code) { if (code) {

View File

@ -1089,8 +1089,6 @@ void nodesDestroyNode(SNode* pNode) {
if (pStmt->destroyParseFileCxt) { if (pStmt->destroyParseFileCxt) {
pStmt->destroyParseFileCxt(&pStmt->pParFileCxt); pStmt->destroyParseFileCxt(&pStmt->pParFileCxt);
} }
assert(TSDB_CODE_SUCCESS == taosCloseFile(&pStmt->fp));
break; break;
} }
case QUERY_NODE_CREATE_DATABASE_STMT: case QUERY_NODE_CREATE_DATABASE_STMT:

View File

@ -6578,7 +6578,6 @@ static bool tsmaOptMayBeOptimized(SLogicNode* pNode, void* pCtx) {
return false; return false;
} }
assert(pFuncs);
FOREACH(pTmpNode, pFuncs) { FOREACH(pTmpNode, pFuncs) {
SFunctionNode* pFunc = (SFunctionNode*)pTmpNode; SFunctionNode* pFunc = (SFunctionNode*)pTmpNode;
if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) && if (!fmIsTSMASupportedFunc(pFunc->funcId) && !fmIsPseudoColumnFunc(pFunc->funcId) &&
@ -7271,7 +7270,6 @@ static int32_t tsmaOptRewriteParent(STSMAOptCtx* pTsmaOptCtx, SLogicNode* pParen
if (code == TSDB_CODE_SUCCESS && pWindow) { if (code == TSDB_CODE_SUCCESS && pWindow) {
SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode; SColumnNode* pCol = (SColumnNode*)pScan->pScanCols->pTail->pNode;
assert(pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
nodesDestroyNode(pWindow->pTspk); nodesDestroyNode(pWindow->pTspk);
pWindow->pTspk = NULL; pWindow->pTspk = NULL;
code = nodesCloneNode((SNode*)pCol, &pWindow->pTspk); code = nodesCloneNode((SNode*)pCol, &pWindow->pTspk);

View File

@ -5221,7 +5221,7 @@ int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pInfo, uint32_t options)
FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat)); FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat));
} }
return code; return TSDB_CODE_SUCCESS;
_return: _return:

View File

@ -310,6 +310,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
} }
void streamFreeTaskState(SStreamTask* pTask, int8_t remove) { void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
stDebug("s-task:0x%x start to free task state/backend", pTask->id.taskId);
if (pTask->pState != NULL) { if (pTask->pState != NULL) {
stDebug("s-task:0x%x start to free task state", pTask->id.taskId); stDebug("s-task:0x%x start to free task state", pTask->id.taskId);
streamStateClose(pTask->pState, remove); streamStateClose(pTask->pState, remove);
@ -319,8 +320,11 @@ void streamFreeTaskState(SStreamTask* pTask, int8_t remove) {
pTask->pBackend = NULL; pTask->pBackend = NULL;
pTask->pState = NULL; pTask->pState = NULL;
} else { } else {
stDebug("s-task:0x%x task state is NULL, may del backend:%s", pTask->id.taskId,
pTask->backendPath ? pTask->backendPath : "NULL");
if (remove) { if (remove) {
if (pTask->backendPath != NULL) { if (pTask->backendPath != NULL) {
stDebug("s-task:0x%x task state is NULL, do del backend:%s", pTask->id.taskId, pTask->backendPath);
taosRemoveDir(pTask->backendPath); taosRemoveDir(pTask->backendPath);
} }
} }
@ -373,11 +377,11 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
int32_t taskId = 0; int32_t taskId = 0;
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
streamId = pTask->hTaskInfo.id.taskId; streamId = pTask->streamTaskId.streamId;
taskId = pTask->hTaskInfo.id.taskId;
} else {
streamId = pTask->streamTaskId.taskId;
taskId = pTask->streamTaskId.taskId; taskId = pTask->streamTaskId.taskId;
} else {
streamId = pTask->id.streamId;
taskId = pTask->id.taskId;
} }
char id[128] = {0}; char id[128] = {0};
@ -393,6 +397,7 @@ int32_t streamTaskSetBackendPath(SStreamTask* pTask) {
} }
(void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id); (void)sprintf(pTask->backendPath, "%s%s%s", pTask->pMeta->path, TD_DIRSEP, id);
stDebug("s-task:%s set backend path:%s", pTask->id.idStr, pTask->backendPath);
return 0; return 0;
} }

View File

@ -253,20 +253,20 @@ static void cliWalkCb(uv_handle_t* handle, void* arg);
#define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para)) #define CONN_PERSIST_TIME(para) ((para) <= 90000 ? 90000 : (para))
#define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label) #define CONN_GET_INST_LABEL(conn) (((STrans*)(((SCliThrd*)(conn)->hostThrd)->pTransInst))->label)
#define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \ #define CONN_GET_MSGCTX_BY_AHANDLE(conn, ahandle) \
do { \ do { \
int i = 0, sz = transQueueSize(&conn->cliMsgs); \ int i = 0, sz = transQueueSize(&conn->cliMsgs); \
for (; i < sz; i++) { \ for (; i < sz; i++) { \
pMsg = transQueueGet(&conn->cliMsgs, i); \ pMsg = transQueueGet(&conn->cliMsgs, i); \
if (pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \ if (pMsg->msg.msgType != TDMT_SCH_DROP_TASK && pMsg->ctx != NULL && (uint64_t)pMsg->ctx->ahandle == ahandle) { \
break; \ break; \
} \ } \
} \ } \
if (i == sz) { \ if (i == sz) { \
pMsg = NULL; \ pMsg = NULL; \
} else { \ } else { \
pMsg = transQueueRm(&conn->cliMsgs, i); \ pMsg = transQueueRm(&conn->cliMsgs, i); \
} \ } \
} while (0) } while (0)
#define CONN_GET_NEXT_SENDMSG(conn) \ #define CONN_GET_NEXT_SENDMSG(conn) \

View File

@ -374,6 +374,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
LRUStatus status = TAOS_LRU_STATUS_OK; LRUStatus status = TAOS_LRU_STATUS_OK;
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES); SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
if (!lastReferenceList) { if (!lastReferenceList) {
taosLRUEntryFree(e);
return TAOS_LRU_STATUS_FAIL; return TAOS_LRU_STATUS_FAIL;
} }
@ -385,13 +386,12 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false); TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
if (handle == NULL) { if (handle == NULL) {
if (!taosArrayPush(lastReferenceList, &e)) { if (!taosArrayPush(lastReferenceList, &e)) {
(void)taosThreadMutexUnlock(&shard->mutex);
taosLRUEntryFree(e); taosLRUEntryFree(e);
return status; goto _exit;
} }
} else { } else {
if (freeOnFail) { if (freeOnFail) {
taosMemoryFree(e); taosLRUEntryFree(e);
*handle = NULL; *handle = NULL;
} }
@ -410,9 +410,9 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
shard->usage -= old->totalCharge; shard->usage -= old->totalCharge;
if (!taosArrayPush(lastReferenceList, &old)) { if (!taosArrayPush(lastReferenceList, &old)) {
(void)taosThreadMutexUnlock(&shard->mutex); taosLRUEntryFree(e);
taosLRUEntryFree(old); taosLRUEntryFree(old);
return status; goto _exit;
} }
} }
} }
@ -427,6 +427,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
} }
} }
_exit:
(void)taosThreadMutexUnlock(&shard->mutex); (void)taosThreadMutexUnlock(&shard->mutex);
for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) { for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {

View File

@ -7,6 +7,7 @@ add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c) add_executable(sml_test sml_test.c)
add_executable(get_db_name_test get_db_name_test.c) add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c) add_executable(tmq_offset tmqOffset.c)
add_executable(tmq_multi_thread_test tmq_multi_thread_test.c)
add_executable(tmq_offset_test tmq_offset_test.c) add_executable(tmq_offset_test tmq_offset_test.c)
add_executable(varbinary_test varbinary_test.c) add_executable(varbinary_test varbinary_test.c)
add_executable(replay_test replay_test.c) add_executable(replay_test replay_test.c)
@ -22,6 +23,15 @@ target_link_libraries(
PUBLIC common PUBLIC common
PUBLIC os PUBLIC os
) )
target_link_libraries(
tmq_multi_thread_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries( target_link_libraries(
create_table create_table
PUBLIC taos PUBLIC taos

View File

@ -0,0 +1,115 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "taos.h"
#include "types.h"
void* consumeThreadFunc(void* param) {
int32_t* index = (int32_t*) param;
tmq_conf_t* conf = tmq_conf_new();
char groupId[64] = {0};
int64_t t = taosGetTimestampMs();
sprintf(groupId, "group_%"PRId64"_%d", t, *index);
tmq_conf_set(conf, "enable.auto.commit", "false");
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
tmq_conf_set(conf, "group.id", groupId);
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.with.table.name", "false");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "select_d4");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
int32_t timeout = 200;
int32_t totalRows = 0;
while (1) {
printf("start to poll\n");
TAOS_RES *pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
int32_t rows = 0;
void* data = NULL;
taos_fetch_raw_block(pRes, &rows, &data);
totalRows+=rows;
int cols = taos_num_fields(pRes);
for(int32_t i = 0; i < cols; ++i) {
int64_t start = taosGetTimestampUs();
for (int32_t j = 0; j < rows; ++j) {
//int64_t t1 = taosGetTimestampUs();
taos_is_null(pRes, j, i);
//int64_t t2 = taosGetTimestampUs();
//printf("taos_is_null gourp:%s cost %"PRId64" us\n", groupId, t2 - t1);
}
int64_t end = taosGetTimestampUs();
bool* isNULL = taosMemoryCalloc(rows, sizeof(bool));
int code = taos_is_null_by_column(pRes, i, isNULL, &rows);
printf("taos_fetch_raw_block gourp:%s total rows:%d cost %"PRId64" us, code:%d\n", groupId, totalRows, end - start, code);
}
taos_free_result(pRes);
} else {
printf("no data\n");
break;
}
}
tmq_consumer_close(tmq);
return NULL;
}
int main(int argc, char* argv[]) {
if (argc != 2) {
printf("Usage: %s <num_of_thread>\n", argv[0]);
return 0;
}
int32_t numOfThread = atoi(argv[1]);
TdThread *thread = taosMemoryCalloc(numOfThread, sizeof(TdThread));
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
int64_t t1 = taosGetTimestampUs();
// pthread_create one thread to consume
int32_t* paras = taosMemoryCalloc(numOfThread, sizeof(int32_t));
for (int32_t i = 0; i < numOfThread; ++i) {
paras[i] = i;
taosThreadCreate(&(thread[i]), &thattr, consumeThreadFunc, (void*)(&paras[i]));
}
for (int32_t i = 0; i < numOfThread; i++) {
taosThreadJoin(thread[i], NULL);
taosThreadClear(&thread[i]);
}
int64_t t2 = taosGetTimestampUs();
printf("total cost %"PRId64" us\n", t2 - t1);
taosMemoryFree(paras);
return 0;
}