refactor: check return value.

This commit is contained in:
Haojun Liao 2024-09-14 13:39:23 +08:00
parent 31354d4767
commit b494163f28
8 changed files with 55 additions and 21 deletions

View File

@ -362,7 +362,7 @@ int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle
return code;
}
code = qStreamInfoResetTimewindowFilter(pTaskInfo);
code = qStreamInfoResetTimewindowFilter(*pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
qDestroyTask(*pTaskInfo);
}
@ -631,9 +631,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
// pSinkParam has been freed during create sinker.
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
if (code) {
qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code));
}
}
qDebug("subplan task create completed, TID:0x%" PRIx64 "QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId,
tstrerror(code));
_error:
// if failed to add ref for all tables in this query, abort current query

View File

@ -394,7 +394,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand
}
}
//pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator);
if (code) {
pTaskInfo->code = code;

View File

@ -3894,8 +3894,8 @@ static void destroyStreamScanOperatorInfo(void* param) {
if (param == NULL) {
return;
}
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
destroyOperator(pStreamScan->pTableScanOp);
}
@ -3914,7 +3914,10 @@ static void destroyStreamScanOperatorInfo(void* param) {
cleanupExprSupp(&pStreamScan->tbnameCalSup);
cleanupExprSupp(&pStreamScan->tagCalSup);
if (pStreamScan->stateStore.updateInfoDestroy) {
pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo);
}
blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes);
blockDataDestroy(pStreamScan->pDeleteDataRes);
@ -4127,16 +4130,13 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
}
pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData));
if (pInfo->pBlockLists == NULL) {
code = terrno;
goto _error;
}
TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno);
if (pHandle->vnode) {
SOperatorInfo* pTableScanOp = NULL;
code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pTableScanOp);
if (pTableScanOp == NULL || code != 0) {
qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code);
qError("createTableScanOperatorInfo error, code:%d", pTaskInfo->code);
goto _error;
}
@ -4180,6 +4180,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
// set the extract column id to streamHandle
pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds);
SArray* tableIdList = NULL;
code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList);
QUERY_CHECK_CODE(code, lino, _error);
@ -4189,9 +4190,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode*
} else {
taosArrayDestroy(pColIds);
tableListDestroy(pTableListInfo);
pColIds = NULL;
}
// clear the local variable to avoid repeatly free
pColIds = NULL;
// create the pseduo columns info
if (pTableScanNode->scan.pScanPseudoCols != NULL) {
code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr);
@ -4268,6 +4271,10 @@ _error:
}
if (pInfo != NULL) {
STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info;
if (p != NULL) {
p->base.pTableListInfo = NULL;
}
destroyStreamScanOperatorInfo(pInfo);
}

View File

@ -238,7 +238,7 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t up
int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId,
int64_t checkpointId, SRpcMsg* pMsg);
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock);
#ifdef __cplusplus

View File

@ -2576,8 +2576,12 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
char* err = NULL;
char** cfNames = NULL;
size_t nCf = 0;
int32_t code = 0;
int32_t lino = 0;
STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper));
TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno);
pTaskDb->idstr = key ? taosStrdup(key) : NULL;
pTaskDb->path = statePath ? taosStrdup(statePath) : NULL;
@ -2592,6 +2596,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) {
stError("%s open state-backend failed, reason:%s", key, err);
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
goto _EXIT;
}
@ -2608,11 +2613,12 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (err != NULL) {
stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err);
code = TSDB_CODE_STREAM_INTERNAL_ERROR;
goto _EXIT;
}
}
if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) {
if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) {
goto _EXIT;
}
@ -2625,6 +2631,8 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
return pTaskDb;
_EXIT:
stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code));
taskDbDestroy(pTaskDb, false);
if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);

View File

@ -258,6 +258,11 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) {
stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code));
return terrno;
}
((SMsgHead*)buf)->vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
@ -268,7 +273,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
code = (code >= 0)? 0:code;
code = TMIN(code, 0);
return code;
}

View File

@ -365,7 +365,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// The transfer of state may generate new data that need to dispatch to downstream tasks,
// Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
// before the next checkpoint.
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) {
return code;
}
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
@ -398,7 +401,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) {
return code;
}
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
// this task already. And then, dispatch check point msg to all downstream tasks

View File

@ -676,7 +676,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock
return code;
}
void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) {
const char* id = pTask->id.idStr;
// 1. transfer the ownership of executor state
@ -717,7 +717,12 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
}
// 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V.
doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1);
if(code) {
stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code));
}
return code;
}
/**