fix: check return values.

This commit is contained in:
Haojun Liao 2024-08-05 18:34:56 +08:00
parent 3772f25840
commit 6297d238e0
7 changed files with 54 additions and 26 deletions

View File

@ -26,7 +26,7 @@
#define T_LONG_JMP(_obj, _c) \
do { \
ASSERT((_c) != 1); \
ASSERT((_c) != -1); \
longjmp((_obj), (_c)); \
} while (0)

View File

@ -1434,7 +1434,12 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray** tableList
pNode.suid = suid;
pNode.uid = suid;
pNode.tableType = TSDB_SUPER_TABLE;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
return terrno;
}
uint8_t digest[17] = {0};
int code =
getTableList(pVnode, &pNode, pSubplan ? pSubplan->pTagCond : NULL, pSubplan ? pSubplan->pTagIndexCond : NULL,
@ -2411,11 +2416,10 @@ bool oneTableForEachGroup(const STableListInfo* pTableList) { return pTableList-
STableListInfo* tableListCreate() {
STableListInfo* pListInfo = taosMemoryCalloc(1, sizeof(STableListInfo));
if (pListInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pListInfo->remainGroups = NULL;
pListInfo->remainGroups = NULL;
pListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
if (pListInfo->pTableList == NULL) {
goto _error;
@ -2431,7 +2435,6 @@ STableListInfo* tableListCreate() {
_error:
tableListDestroy(pListInfo);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}

View File

@ -295,6 +295,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
code = initQueriedTableSchemaInfo(pHandle, &pTableScanNode->scan, dbname, pTaskInfo);
if (code) {
@ -362,6 +366,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL){
pTaskInfo->code = terrno;
return terrno;
}
if (pHandle->vnode) {
code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags, pTableScanNode->groupSort,
@ -385,6 +393,11 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
@ -398,6 +411,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
if (pBlockNode->tableType == TSDB_SUPER_TABLE) {
SArray* pList = taosArrayInit(4, sizeof(uint64_t));
@ -436,6 +453,10 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
if (pTableListInfo == NULL) {
pTaskInfo->code = terrno;
return terrno;
}
code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
pTagCond, pTagIndexCond, pTaskInfo);

View File

@ -2452,7 +2452,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
static int32_t tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -2479,7 +2479,6 @@ static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
}
code = tMergeTreeCreate(&pHandle->pMergeTree, pHandle->cmpParam.numOfSources, &pHandle->cmpParam, pHandle->comparFn);
ASSERT(code != 1);
return code;
}

View File

@ -74,13 +74,6 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
}
if (pInfo->stage != stage) {
streamMutexLock(&pTask->lock);
ETaskStatus status = streamTaskGetStatus(pTask).state;
if (status == TASK_STATUS__CK) {
streamTaskSetFailedCheckpointId(pTask);
}
streamMutexUnlock(&pTask->lock);
return TASK_UPSTREAM_NEW_STAGE;
} else if (pTask->status.downstreamReady != 1) {
stDebug("s-task:%s vgId:%d leader:%d, downstream not ready", id, vgId, (pTask->pMeta->role == NODE_ROLE_LEADER));

View File

@ -453,8 +453,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA;
stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i);
continue;
}
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
@ -468,8 +468,8 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA;
stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue", i);
continue;
}
if (pReadyInfo->sendCompleted == 1) {
@ -601,9 +601,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
}
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
pTask->chkInfo.pActiveInfo->failedId = pTask->chkInfo.pActiveInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.pActiveInfo->activeId, pTask->chkInfo.pActiveInfo->transId);
struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
if (pInfo->activeId <= 0) {
stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
} else {
pInfo->failedId = pInfo->activeId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr, pInfo->activeId,
pInfo->transId);
}
}
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
@ -960,6 +966,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
bool alreadySend = false;
if (pStatus.state != TASK_STATUS__CK) {
return false;
@ -971,11 +978,12 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
return false;
}
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
for (int32_t i = 0; i < num; ++i) {
STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (pSendInfo == NULL) {
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_INVALID_PARA;
stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
continue;
}
if (pSendInfo->nodeId != downstreamNodeId) {

View File

@ -352,9 +352,8 @@ static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, const char* id,
const char* dir) {
*pBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pPBuf = *pBuf;
*pBuf = NULL;
SDiskbasedBuf* pPBuf = taosMemoryCalloc(1, sizeof(SDiskbasedBuf));
if (pPBuf == NULL) {
goto _error;
}
@ -394,11 +393,16 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->prefix = (char*)dir;
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
if (pPBuf->emptyDummyIdList == NULL) {
goto _error;
}
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
*pBuf = pPBuf;
return TSDB_CODE_SUCCESS;
_error:
destroyDiskbasedBuf(pPBuf);
return TSDB_CODE_OUT_OF_MEMORY;