Merge pull request #26991 from taosdata/fix/syntax

fix(stream): check return value.
This commit is contained in:
Haojun Liao 2024-08-06 09:02:59 +08:00 committed by GitHub
commit 14556de514
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 109 additions and 41 deletions

View File

@ -1620,6 +1620,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) {
if (pBlock == NULL){ if (pBlock == NULL){
return; return;
} }
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);

View File

@ -26,7 +26,7 @@
#define T_LONG_JMP(_obj, _c) \ #define T_LONG_JMP(_obj, _c) \
do { \ do { \
ASSERT((_c) != 1); \ ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \ longjmp((_obj), (_c)); \
} while (0) } while (0)

View File

@ -1434,7 +1434,12 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList
pNode.suid = suid; pNode.suid = suid;
pNode.uid = suid; pNode.uid = suid;
pNode.tableType = TSDB_SUPER_TABLE; pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
return terrno;
}
uint8_t digest[17] = {0}; uint8_t digest[17] = {0};
int code = int code =
getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL, getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
@ -1692,8 +1697,11 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
SDataType* pType = &pColNode->node.resType; SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = pExp->base.resSchema =
createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName); createResSchema(pType->type, pType->bytes, slotId, pType->scale, pType->precision, pColNode->colName);
pExp->base.pParam[0].pCol = pExp->base.pParam[0].pCol =
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType); createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
QUERY_CHECK_NULL(pExp->base.pParam[0].pCol, code, lino, _end, terrno);
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
} else if (type == QUERY_NODE_VALUE) { } else if (type == QUERY_NODE_VALUE) {
pExp->pExpr->nodeType = QUERY_NODE_VALUE; pExp->pExpr->nodeType = QUERY_NODE_VALUE;
@ -1761,6 +1769,7 @@ int32_t createExprFromOneNode(SExprInfo* pExp, SNode* pNode, int16_t slotId) {
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
pExp->base.pParam[j].pCol = pExp->base.pParam[j].pCol =
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType); createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
QUERY_CHECK_NULL(pExp->base.pParam[j].pCol, code, lino, _end, terrno);
} else if (p1->type == QUERY_NODE_VALUE) { } else if (p1->type == QUERY_NODE_VALUE) {
SValueNode* pvn = (SValueNode*)p1; SValueNode* pvn = (SValueNode*)p1;
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE; pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
@ -2324,7 +2333,9 @@ void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psu
uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) { uint64_t tableListGetTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid)); int32_t* slot = taosHashGet(pTableList->map, &tableUid, sizeof(tableUid));
ASSERT(pTableList->map != NULL && slot != NULL); if (slot == NULL) {
return -1;
}
STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot); STableKeyInfo* pKeyInfo = taosArrayGet(pTableList->pTableList, *slot);
ASSERT(pKeyInfo->uid == tableUid); ASSERT(pKeyInfo->uid == tableUid);
@ -2407,11 +2418,10 @@ bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList-
STableListInfo* tableListCreate() { STableListInfo* tableListCreate() {
STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo)); STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
if (pListInfo == NULL) { if (pListInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pListInfo->remainGroups = NULL;
pListInfo->remainGroups = NULL;
pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) { if (pListInfo->pTableList == NULL) {
goto _error; goto _error;
@ -2427,7 +2437,6 @@ STableListInfo* tableListCreate() {
_error: _error:
tableListDestroy(pListInfo); tableListDestroy(pListInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -2437,7 +2446,6 @@ void tableListDestroy(STableListInfo* pTableListInfo) {
} }
taosArrayDestroy(pTableListInfo->pTableList); taosArrayDestroy(pTableListInfo->pTableList);
pTableListInfo->pTableList = NULL;
taosMemoryFreeClear(pTableListInfo->groupOffset); taosMemoryFreeClear(pTableListInfo->groupOffset);
taosHashCleanup(pTableListInfo->map); taosHashCleanup(pTableListInfo->map);

View File

@ -1139,8 +1139,8 @@ int32_t createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNo
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = NULL; SExprInfo* pExprInfo = NULL;
code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols); code = createExprInfo(pPartNode->pTargets, NULL, &pExprInfo, &numOfCols);
QUERY_CHECK_CODE(code, lino, _error);
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys); pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);

View File

@ -594,9 +594,11 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo; SSortMergeInfo* pSortMergeInfo = &pInfo->sortMergeInfo;
initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo);
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc);
TSDB_CHECK_NULL(pInputBlock, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
@ -620,6 +622,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_NON_SORT: { case MERGE_TYPE_NON_SORT: {
SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo; SNonSortMergeInfo* pNonSortMerge = &pInfo->nsortMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1024); initResultSizeInfo(&pOperator->resultInfo, 1024);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);
@ -629,6 +633,8 @@ int32_t createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size_t numS
case MERGE_TYPE_COLUMNS: { case MERGE_TYPE_COLUMNS: {
SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo; SColsMergeInfo* pColsMerge = &pInfo->colsMergeInfo;
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
TSDB_CHECK_NULL(pInfo->binfo.pRes, code, lino, _error, terrno);
initResultSizeInfo(&pOperator->resultInfo, 1); initResultSizeInfo(&pOperator->resultInfo, 1);
code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
TSDB_CHECK_CODE(code, lino, _error); TSDB_CHECK_CODE(code, lino, _error);

View File

@ -295,6 +295,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} }
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo); code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) { if (code) {
@ -362,6 +366,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL){
pTaskInfo->code = terrno;
return terrno;
}
if (pHandle->vnode) { if (pHandle->vnode) {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort, code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
@ -385,6 +393,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode; STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
if (!pTagScanPhyNode->onlyMetaCtbIdx) { if (!pTagScanPhyNode->onlyMetaCtbIdx) {
code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond, code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo); pTagIndexCond, pTaskInfo);
@ -398,6 +411,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
if (pBlockNode->tableType == TSDB_SUPER_TABLE) { if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(uint64_t)); SArray* pList = taosArrayInit(4, sizeof(uint64_t));
@ -436,6 +453,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate(); STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo); pTagCond, pTagIndexCond, pTaskInfo);

View File

@ -64,6 +64,10 @@ int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC
p->id.queryId = queryId; p->id.queryId = queryId;
p->id.taskId = taskId; p->id.taskId = taskId;
p->id.str = taosMemoryMalloc(64); p->id.str = taosMemoryMalloc(64);
if (p->id.str == NULL) {
return terrno;
}
buildTaskId(taskId, queryId, p->id.str); buildTaskId(taskId, queryId, p->id.str);
p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
if (p->id.str == NULL || p->schemaInfos == NULL) { if (p->id.str == NULL || p->schemaInfos == NULL) {
@ -174,8 +178,15 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
} }
if (schemaInfo.sw == NULL) {
return terrno;
}
pAPI->metaReaderFn.clearReader(&mr); pAPI->metaReaderFn.clearReader(&mr);
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
if (schemaInfo.qsw == NULL) {
return terrno;
}
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY; return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
@ -186,7 +197,14 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols); int32_t numOfTags = LIST_LENGTH(pScanNode->pScanPseudoCols);
SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); SSchemaWrapper* pqSw = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pqSw == NULL) {
return NULL;
}
pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema)); pqSw->pSchema = taosMemoryCalloc(numOfCols + numOfTags, sizeof(SSchema));
if (pqSw->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i); STargetNode* pNode = (STargetNode*)nodesListGetNode(pScanNode->pScanCols, i);

View File

@ -182,8 +182,8 @@ static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
} }
int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) { int32_t tsortGetSortedDataBlock(const SSortHandle* pSortHandle, SSDataBlock** pBlock) {
if (pSortHandle->pDataBlock == NULL) {
*pBlock = NULL; *pBlock = NULL;
if (pSortHandle->pDataBlock == NULL) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock); return createOneDataBlock(pSortHandle->pDataBlock, false, pBlock);
@ -2452,7 +2452,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code; return code;
} }
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t code = createInitialSources(pHandle); int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
@ -2478,7 +2478,8 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
return code; return code;
} }
return tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn); code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
return code;
} }
void tsortClose(SSortHandle* pHandle) { void tsortClose(SSortHandle* pHandle) {
@ -2808,19 +2809,24 @@ static int32_t tsortSingleTableMergeNextTuple(SSortHandle* pHandle, STupleHandle
} }
int32_t tsortOpen(SSortHandle* pHandle) { int32_t tsortOpen(SSortHandle* pHandle) {
int32_t code = 0;
if (pHandle->opened) { if (pHandle->opened) {
return 0; return code;
} }
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { if (pHandle == NULL || pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
return code;
} }
pHandle->opened = true; pHandle->opened = true;
if (tsortIsPQSortApplicable(pHandle)) if (tsortIsPQSortApplicable(pHandle)) {
return tsortOpenForPQSort(pHandle); code = tsortOpenForPQSort(pHandle);
else } else {
return tsortOpenForBufMergeSort(pHandle); code = tsortOpenForBufMergeSort(pHandle);
}
return code;
} }
int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) { int32_t tsortNextTuple(SSortHandle* pHandle, STupleHandle** pTupleHandle) {

View File

@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
} }
if (pInfo->stage != stage) { if (pInfo->stage != stage) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
streamMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE; return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) { } else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER)); stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));

View File

@ -453,8 +453,9 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) { if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock); stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
return TSDB_CODE_INVALID_PARA; pTask->id.idStr, i);
continue;
} }
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) { if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
@ -468,8 +469,9 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i); STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) { if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock); stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
return TSDB_CODE_INVALID_PARA; pTask->id.idStr, i);
continue;
} }
if (pReadyInfo->sendCompleted == 1) { if (pReadyInfo->sendCompleted == 1) {
@ -601,9 +603,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
} }
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) { void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId; struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId); if (pInfo->activeId <= 0) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
}
} }
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) { static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
@ -960,6 +968,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask); SStreamTaskState pStatus = streamTaskGetStatus(pTask);
bool alreadySend = false;
if (pStatus.state != TASK_STATUS__CK) { if (pStatus.state != TASK_STATUS__CK) {
return false; return false;
@ -971,11 +980,12 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
return false; return false;
} }
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
for (int32_t i = 0; i < num; ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo == NULL) { if (pSendInfo == NULL) {
streamMutexUnlock(&pInfo->lock); stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
return TSDB_CODE_INVALID_PARA; continue;
} }
if (pSendInfo->nodeId != downstreamNodeId) { if (pSendInfo->nodeId != downstreamNodeId) {

View File

@ -95,11 +95,12 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
tsize = (newSize == tsize) ? (tsize + 2) : newSize; tsize = (newSize == tsize) ? (tsize + 2) : newSize;
} }
pArray->pData = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize); char* p = taosMemoryRealloc(pArray->pData, tsize * pArray->elemSize);
if (pArray->pData == NULL) { if (p == NULL) {
return terrno; return terrno;
} }
pArray->pData = p;
pArray->capacity = tsize; pArray->capacity = tsize;
} }
return 0; return 0;

View File

@ -39,7 +39,7 @@ int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo** pTree, uint32_t numOfSources,
(SMultiwayMergeTreeInfo*)taosMemoryCalloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries); (SMultiwayMergeTreeInfo*)taosMemoryCalloc(1, sizeof(SMultiwayMergeTreeInfo) + sizeof(STreeNode) * totalEntries);
if (pTreeInfo == NULL) { if (pTreeInfo == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return TAOS_SYSTEM_ERROR(errno); return terrno;
} }
pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo)); pTreeInfo->pNode = (STreeNode*)(((char*)pTreeInfo) + sizeof(SMultiwayMergeTreeInfo));

View File

@ -352,9 +352,8 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id, int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
const char* dir) { const char* dir) {
*pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf)); *pBuf = NULL;
SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pPBuf = *pBuf;
if (pPBuf == NULL) { if (pPBuf == NULL) {
goto _error; goto _error;
} }
@ -394,11 +393,16 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->prefix = (char*)dir; pPBuf->prefix = (char*)dir;
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
if (pPBuf->emptyDummyIdList == NULL) {
goto _error;
}
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
*pBuf = pPBuf;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
destroyDiskbasedBuf(pPBuf); destroyDiskbasedBuf(pPBuf);
*pBuf = NULL; *pBuf = NULL;