refactor: check return value.

This commit is contained in:
Haojun Liao 2024-09-13 23:04:41 +08:00
parent 99dbb78992
commit 1a90e9612d
10 changed files with 56 additions and 37 deletions

View File

@ -74,7 +74,7 @@ typedef enum {
* @param vgId
* @return
*/
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId);
/**
* Create the exec task for queue mode
@ -93,7 +93,7 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList
* @param taskId
* @param queryId
*/
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);

View File

@ -173,7 +173,7 @@ typedef struct TsdReader {
int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables,
SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables);
void (*tsdReaderClose)();
void (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetReaderTaskId)(void *pReader, const char *pId);
int32_t (*tsdSetQueryTableList)();
int32_t (*tsdNextDataBlock)();

View File

@ -164,7 +164,7 @@ typedef struct STsdbReader STsdbReader;
int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables,
SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables);
int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num);
void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr);
int32_t tsdbReaderSetId(void *pReader, const char *idstr);
void tsdbReaderClose2(STsdbReader *pReader);
int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext);
int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA);

View File

@ -314,8 +314,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState};
initStorageAPI(&handle.api);
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) {
code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx] || (code != 0)) {
TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE);
}

View File

@ -145,12 +145,13 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
terrno = 0;
SMqDataRsp dataRsp = {0};
int code = tqInitDataRsp(&dataRsp.common, *pOffset);
int code = tqInitDataRsp(&dataRsp.common, *pOffset);
if (code != 0) {
goto end;
}
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest);
if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) {
goto end;

View File

@ -36,6 +36,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
int64_t st = taosGetTimestampMs();
int64_t streamId = 0;
int32_t taskId = 0;
int32_t code = 0;
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
@ -52,7 +53,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId);
if (pTask->pState == NULL) {
tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId);
return -1;
return terrno;
} else {
tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState);
}
@ -75,18 +76,23 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
initStorageAPI(&handle.api);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (pTask->exec.pExecutor == NULL) {
tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr);
return terrno;
code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId);
if (code) {
tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
if (code) {
return code;
}
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
return TSDB_CODE_SUCCESS;
return code;
}
void tqSetRestoreVersionInfo(SStreamTask* pTask) {

View File

@ -5986,13 +5986,18 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
}
// if failed, do nothing
void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
int32_t tsdbReaderSetId(void* p, const char* idstr) {
STsdbReader* pReader = (STsdbReader*) p;
taosMemoryFreeClear(pReader->idStr);
pReader->idStr = taosStrdup(idstr);
if (pReader->idStr == NULL) {
// no need to do anything
tsdbError("%s failed to build reader id, code:%s", idstr, tstrerror(terrno));
return terrno;
}
pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr;
return 0;
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }

View File

@ -59,7 +59,7 @@ void initTsdbReaderAPI(TsdReader* pReader) {
pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable2; // todo this function should be moved away
pReader->tsdSetQueryTableList = tsdbSetTableList2;
pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2;
pReader->tsdSetReaderTaskId = tsdbReaderSetId;
pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited;
pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb;

View File

@ -204,28 +204,34 @@ _end:
return code;
}
void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pStreamScanInfo = pOperator->info;
if (pStreamScanInfo->pTableScanOp != NULL) {
STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info;
if (pScanInfo->base.dataReader != NULL) {
pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str);
if (code) {
qError("failed to set reader id for executor, code:%s", tstrerror(code));
return code;
}
}
}
} else {
doSetTaskId(pOperator->pDownstream[0], pAPI);
return doSetTaskId(pOperator->pDownstream[0], pAPI);
}
return 0;
}
void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
SExecTaskInfo* pTaskInfo = tinfo;
pTaskInfo->id.queryId = queryId;
buildTaskId(taskId, queryId, pTaskInfo->id.str);
// set the idstr for tsdbReader
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
@ -337,33 +343,31 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3
return pTaskInfo;
}
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) {
if (msg == NULL) {
return NULL;
return TSDB_CODE_INVALID_PARA;
}
*pTaskInfo = NULL;
SSubplan* pPlan = NULL;
int32_t code = qStringToSubplan(msg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
return code;
}
qTaskInfo_t pTaskInfo = NULL;
code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
code = qCreateExecTask(readers, vgId, taskId, pPlan, pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM);
if (code != TSDB_CODE_SUCCESS) {
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
qDestroyTask(*pTaskInfo);
return code;
}
code = qStreamInfoResetTimewindowFilter(pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
qDestroyTask(pTaskInfo);
terrno = code;
return NULL;
qDestroyTask(*pTaskInfo);
}
return pTaskInfo;
return code;
}
static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr,

View File

@ -101,8 +101,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
QUERY_CHECK_CODE(code, lino, _end);
@ -138,7 +140,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code));
}
return NULL;
}