fix(stream): check return value.

This commit is contained in:
Haojun Liao 2024-07-23 17:31:41 +08:00
parent 3bddd051ca
commit eefee54c6a
9 changed files with 143 additions and 58 deletions

View File

@ -99,7 +99,8 @@ struct SExecTaskInfo {
};
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI,
SExecTaskInfo** pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
bool isTaskKilled(void* pTaskInfo);
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
@ -107,7 +108,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model);
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo);
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList);
#ifdef __cplusplus
}

View File

@ -288,8 +288,10 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
uint64_t id) {
if (msg == NULL) { // create raw scan
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api);
if (NULL == pTaskInfo) {
SExecTaskInfo* pTaskInfo = NULL;
int32_t code = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, &pReaderHandle->api, &pTaskInfo);
if (NULL == pTaskInfo || code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -1450,7 +1452,13 @@ SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SExecTaskInfo* pTaskInfo = tinfo;
SArray* plist = getTableListInfo(pTaskInfo);
SArray* plist = NULL;
code = getTableListInfo(pTaskInfo, &plist);
if (code || plist == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// only extract table in the first elements
STableListInfo* pTableListInfo = taosArrayGetP(plist, 0);
@ -1502,11 +1510,21 @@ _end:
}
}
SArray* getTableListInfo(const SExecTaskInfo* pTaskInfo) {
int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
if (pList == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SArray* pArray = taosArrayInit(0, POINTER_BYTES);
if (pArray == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SOperatorInfo* pOperator = pTaskInfo->pRoot;
extractTableList(pArray, pOperator);
return pArray;
*pList = pArray;
return TSDB_CODE_SUCCESS;
}
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {

View File

@ -1068,7 +1068,12 @@ int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, SExecTaskInfo*
return TSDB_CODE_OUT_OF_MEMORY;
}
SArray* pInfoList = getTableListInfo(pTask);
SArray* pInfoList = NULL;
int32_t code = getTableListInfo(pTask, &pInfoList);
if (code || pInfoList == NULL) {
return code;
}
STableListInfo* pTableListInfo = taosArrayGetP(pInfoList, 0);
taosArrayDestroy(pInfoList);

View File

@ -168,8 +168,10 @@ SSDataBlock* doSortMerge(SOperatorInfo* pOperator) {
blockDataCleanup(pDataBlock);
if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = tsortGetSortedDataBlock(pHandle);
if (pSortMergeInfo->pIntermediateBlock == NULL) {
pSortMergeInfo->pIntermediateBlock = NULL;
int32_t code = tsortGetSortedDataBlock(pHandle, &pSortMergeInfo->pIntermediateBlock);
if (pSortMergeInfo->pIntermediateBlock == NULL || code != 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}

View File

@ -35,38 +35,51 @@
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI) {
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
int32_t doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI,
SExecTaskInfo** pTaskInfo) {
if (pTaskInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return TSDB_CODE_SUCCESS;
}
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTaskInfo->cost.created = taosGetTimestampUs();
SExecTaskInfo* p = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pTaskInfo->execModel = model;
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
pTaskInfo->storageAPI = *pAPI;
setTaskStatus(p, TASK_NOT_COMPLETED);
p->cost.created = taosGetTimestampUs();
taosInitRWLatch(&pTaskInfo->lock);
p->execModel = model;
p->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
p->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
if (p->stopInfo.pStopInfo == NULL || p->pResultBlockList == NULL) {
doDestroyTask(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
pTaskInfo->id.vgId = vgId;
pTaskInfo->id.queryId = queryId;
pTaskInfo->id.taskId = taskId;
pTaskInfo->id.str = taosMemoryMalloc(64);
buildTaskId(taskId, queryId, pTaskInfo->id.str);
pTaskInfo->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
p->storageAPI = *pAPI;
taosInitRWLatch(&p->lock);
return pTaskInfo;
p->id.vgId = vgId;
p->id.queryId = queryId;
p->id.taskId = taskId;
p->id.str = taosMemoryMalloc(64);
buildTaskId(taskId, queryId, p->id.str);
p->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
if (p->id.str == NULL || p->schemaInfos == NULL) {
doDestroyTask(p);
return TSDB_CODE_OUT_OF_MEMORY;
}
*pTaskInfo = p;
return TSDB_CODE_SUCCESS;
}
bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); }
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
pTaskInfo->code = rspCode;
stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI);
(void) stopTableScanOperator(pTaskInfo->pRoot, pTaskInfo->id.str, &pTaskInfo->storageAPI);
}
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
@ -81,10 +94,10 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api);
if (*pTaskInfo == NULL) {
int32_t code = doCreateTask(pPlan->id.queryId, taskId, vgId, model, &pHandle->api, pTaskInfo);
if (*pTaskInfo == NULL || code != 0) {
taosMemoryFree(sql);
return terrno;
return code;
}
if (pHandle) {
@ -165,12 +178,10 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
}
pAPI->metaReaderFn.clearReader(&mr);
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return TSDB_CODE_SUCCESS;
void* p = taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return (p != NULL)? TSDB_CODE_SUCCESS:TSDB_CODE_OUT_OF_MEMORY;
}
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {

View File

@ -166,7 +166,11 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
}
pBlock->info.dataLoad = 1;
pBlock->info.scanFlag = ((SDataBlockInfo*)tsortGetBlockInfo(pTupleHandle))->scanFlag;
SDataBlockInfo info = {0};
tsortGetBlockInfo(pTupleHandle, &info);
pBlock->info.scanFlag = info.scanFlag;
pBlock->info.rows += 1;
}
@ -224,9 +228,9 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
SSortOperatorInfo* pInfo) {
blockDataCleanup(pDataBlock);
int32_t code = 0;
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
if (p == NULL) {
SSDataBlock* p = NULL;
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return NULL;
}
@ -443,8 +447,9 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
blockDataCleanup(pDataBlock);
blockDataEnsureCapacity(pDataBlock, capacity);
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
if (p == NULL) {
SSDataBlock* p = NULL;
int32_t code = tsortGetSortedDataBlock(pHandle, &p);
if (p == NULL || (code != 0)) {
return NULL;
}
@ -452,7 +457,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
while (1) {
STupleHandle* pTupleHandle = NULL;
int32_t code = tsortNextTuple(pHandle, &pTupleHandle);
code = tsortNextTuple(pHandle, &pTupleHandle);
if (pTupleHandle == NULL || code != 0) {
break;
}

View File

@ -418,13 +418,14 @@ void findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId, SDownstreamStatus
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
int32_t* pNotReady, const char* id) {
streamMutexLock(&pInfo->checkInfoLock);
SDownstreamStatusInfo* p = NULL;
streamMutexLock(&pInfo->checkInfoLock);
findCheckRspStatus(pInfo, taskId, &p);
if (p != NULL) {
if (reqId != p->reqId) {
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64 " expired check-rsp recv from downstream task:0x%x, discarded",
stError("s-task:%s reqId:0x%" PRIx64 " expected:0x%" PRIx64
" expired check-rsp recv from downstream task:0x%x, discarded",
id, reqId, p->reqId, taskId);
streamMutexUnlock(&pInfo->checkInfoLock);
return TSDB_CODE_FAILED;

View File

@ -176,6 +176,10 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int64_t checkpointId = pDataBlock->info.version;
int32_t transId = pDataBlock->info.window.skey;
const char* id = pTask->id.idStr;
@ -248,6 +252,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (p->upstreamTaskId == pBlock->srcTaskId) {
ASSERT(p->checkpointId == checkpointId);
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
@ -381,6 +389,10 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
for (int32_t i = 0; i < size; ++i) {
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
if (p == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (p->downstreamTaskId == downstreamTaskId) {
received = true;
break;
@ -420,6 +432,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
streamMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
pReadyInfo->sendCompleted = 1;
stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
@ -430,6 +446,10 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pReadyInfo->sendCompleted == 1) {
numOfConfirmed += 1;
}
@ -819,6 +839,10 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
bool recved = false;
for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
if (pReady == NULL) {
continue;
}
if (pInfo->nodeId == pReady->upstreamNodeId) {
recved = true;
break;
@ -867,6 +891,9 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
for (int32_t i = 0; i < size; i++) {
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
if (pUpstreamTask == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
if (pReq == NULL) {
@ -917,6 +944,10 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo == NULL) {
return TSDB_CODE_INVALID_PARA;
}
if (pSendInfo->nodeId != downstreamNodeId) {
continue;
}
@ -974,6 +1005,9 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
} else {
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
if (pVgInfo == NULL) {
continue;
}
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
@ -993,6 +1027,10 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
streamMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p == NULL) {
return num;
}
if (p->recved) {
num++;
}
@ -1009,6 +1047,10 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p == NULL) {
continue;
}
if (p->nodeId == vgId) {
ASSERT(p->recved == false);