diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ed56b7e6b2..ae26d5f2ae 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -74,7 +74,7 @@ typedef enum { * @param vgId * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); +int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); /** * Create the exec task for queue mode @@ -93,7 +93,7 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList * @param taskId * @param queryId */ -void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); +int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 61ae034450..4fc7e5eac5 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -173,7 +173,7 @@ typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables); void (*tsdReaderClose)(); - void (*tsdSetReaderTaskId)(void *pReader, const char *pId); + int32_t (*tsdSetReaderTaskId)(void *pReader, const char *pId); int32_t (*tsdSetQueryTableList)(); int32_t (*tsdNextDataBlock)(); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2f56aac7d6..827f7e2044 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -164,7 +164,7 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables); int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); -void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr); +int32_t tsdbReaderSetId(void *pReader, const char *idstr); void tsdbReaderClose2(STsdbReader *pReader); int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 19c5b5d481..69819c87dc 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -314,8 +314,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); - if (!pRSmaInfo->taskInfo[idx]) { + + code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0); + if (!pRSmaInfo->taskInfo[idx] || (code != 0)) { TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index ff9f1e524e..495fcd771a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -145,12 +145,13 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, terrno = 0; SMqDataRsp dataRsp = {0}; - int code = tqInitDataRsp(&dataRsp.common, *pOffset); + + int code = tqInitDataRsp(&dataRsp.common, *pOffset); if (code != 0) { goto end; } - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); + code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c31b9b598c..3f4329f22b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -36,6 +36,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); int64_t streamId = 0; int32_t taskId = 0; + int32_t code = 0; tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); @@ -52,7 +53,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId); if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); - return -1; + return terrno; } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } @@ -75,18 +76,23 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { initStorageAPI(&handle.api); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return terrno; + code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (code) { + tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + if (code) { + + return code; } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el); - return TSDB_CODE_SUCCESS; + return code; } void tqSetRestoreVersionInfo(SStreamTask* pTask) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e39ac2e45d..731b733b52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5986,13 +5986,18 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact } // if failed, do nothing -void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { +int32_t tsdbReaderSetId(void* p, const char* idstr) { + STsdbReader* pReader = (STsdbReader*) p; taosMemoryFreeClear(pReader->idStr); + pReader->idStr = taosStrdup(idstr); if (pReader->idStr == NULL) { - // no need to do anything + tsdbError("%s failed to build reader id, code:%s", idstr, tstrerror(terrno)); + return terrno; } + pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr; + return 0; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 9be84b99f4..59a129cac8 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -59,7 +59,7 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable2; // todo this function should be moved away pReader->tsdSetQueryTableList = tsdbSetTableList2; - pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + pReader->tsdSetReaderTaskId = tsdbReaderSetId; pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited; pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ebec9f9373..0033e14a2d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -204,28 +204,34 @@ _end: return code; } -void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) { +int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pStreamScanInfo = pOperator->info; if (pStreamScanInfo->pTableScanOp != NULL) { STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; if (pScanInfo->base.dataReader != NULL) { - pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str); + int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str); + if (code) { + qError("failed to set reader id for executor, code:%s", tstrerror(code)); + return code; + } } } } else { - doSetTaskId(pOperator->pDownstream[0], pAPI); + return doSetTaskId(pOperator->pDownstream[0], pAPI); } + + return 0; } -void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { +int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { SExecTaskInfo* pTaskInfo = tinfo; pTaskInfo->id.queryId = queryId; buildTaskId(taskId, queryId, pTaskInfo->id.str); // set the idstr for tsdbReader - doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); + return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); } int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { @@ -337,33 +343,31 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { +int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { if (msg == NULL) { - return NULL; + return TSDB_CODE_INVALID_PARA; } + *pTaskInfo = NULL; + SSubplan* pPlan = NULL; int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; + return code; } - qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, taskId, pPlan, pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - qDestroyTask(pTaskInfo); - terrno = code; - return NULL; + qDestroyTask(*pTaskInfo); + return code; } code = qStreamInfoResetTimewindowFilter(pTaskInfo); if (code != TSDB_CODE_SUCCESS) { - qDestroyTask(pTaskInfo); - terrno = code; - return NULL; + qDestroyTask(*pTaskInfo); } - return pTaskInfo; + + return code; } static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr, diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index e6754d7bfd..0e2d31cc8f 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -101,8 +101,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); + if (pState == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; QUERY_CHECK_CODE(code, lino, _end); @@ -138,7 +140,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code)); } return NULL; }