From 83d531284b53d83668c71a19591a636273872a9f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Sep 2024 19:02:56 +0800 Subject: [PATCH 1/7] other: merge 3.0 --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/streamExec.c | 70 ++++++++++++++++++----------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e71a6c4dce..b77c8535f1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -673,7 +673,7 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); int32_t streamExecTask(SStreamTask* pTask); -void streamResumeTask(SStreamTask* pTask); +int32_t streamResumeTask(SStreamTask* pTask); int32_t streamTrySchedExec(SStreamTask* pTask); int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType); void streamTaskResumeInFuture(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f3279a0f01..4fc00a6f59 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ #define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); -static void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState pState = streamTaskGetStatus(pTask); @@ -95,7 +95,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* return code; } -void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { +int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; int32_t size = 0; @@ -112,7 +112,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to if (streamTaskShouldStop(pTask) || (pRes == NULL)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return; + return code; } SSDataBlock* output = NULL; @@ -122,8 +122,13 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to resetTaskInfo(pExecutor); } - stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code)); - continue; + if (code == TSDB_CODE_OUT_OF_MEMORY || code == TSDB_CODE_INVALID_PARA || code == TSDB_CODE_FILE_CORRUPTED) { + stFatal("s-task:%s failed to continue execute since %s", pTask->id.idStr, tstrerror(code)); + return code; + } else { + qResetTaskCode(pExecutor); + continue; + } } if (output == NULL) { @@ -194,7 +199,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); // todo: here we need continue retry to put it into output buffer if (code != TSDB_CODE_SUCCESS) { - return; + return code; } pRes = NULL; @@ -208,6 +213,8 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } + + return code; } // todo contiuous try to create result blocks @@ -627,7 +634,7 @@ static void doRecordThroughput(STaskExecStatisInfo* pInfo, int64_t totalBlocks, } } -static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { +static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, int32_t num) { const char* id = pTask->id.idStr; int32_t blockSize = 0; int64_t st = taosGetTimestampMs(); @@ -635,23 +642,28 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i int64_t ver = pInfo->processedVer; int64_t totalSize = 0; int32_t totalBlocks = 0; + int32_t code = 0; stDebug("s-task:%s start to process batch blocks, num:%d, type:%s", id, num, streamQueueItemGetTypeStr(pBlock->type)); - int32_t code = doSetStreamInputBlock(pTask, pBlock, &ver, id); + code = doSetStreamInputBlock(pTask, pBlock, &ver, id); if (code) { stError("s-task:%s failed to set input block, not exec for these blocks", id); - return; + return code; + } + + code = streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); + if (code) { + return code; } - streamTaskExecImpl(pTask, pBlock, &totalSize, &totalBlocks); doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); // update the currentVer if processing the submit blocks. if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, pInfo->checkpointVer, pInfo->nextProcessVer, ver); - return; + return code; } if (ver != pInfo->processedVer) { @@ -660,6 +672,8 @@ static void doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock, i id, pInfo->processedVer, ver, pInfo->nextProcessVer, pInfo->checkpointVer); pInfo->processedVer = ver; } + + return code; } void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { @@ -712,6 +726,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB */ static int32_t doStreamExecTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; + int32_t code = 0; // merge multiple input data if possible in the input queue. stDebug("s-task:%s start to extract data block from inputQ", id); @@ -784,9 +799,9 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (type == STREAM_INPUT__DATA_BLOCK) { pTask->execInfo.sink.dataSize += blockSize; stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); - int32_t code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); + code = doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); if (code != TSDB_CODE_SUCCESS) { - // todo handle error. + return code; } double el = (taosGetTimestampMs() - st) / 1000.0; @@ -801,17 +816,19 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } if (type != STREAM_INPUT__CHECKPOINT) { - doStreamTaskExecImpl(pTask, pInput, numOfBlocks); + code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); + if (code) { + return code; + } } else { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. streamMutexLock(&pTask->lock); SStreamTaskState pState = streamTaskGetStatus(pTask); if (pState.state == TASK_STATUS__CK) { stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); - int32_t code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue + code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue } else { // todo refactor - int32_t code = 0; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { code = streamTaskSendCheckpointSourceRsp(pTask); } else { @@ -827,7 +844,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); streamFreeQitem(pInput); - return 0; + return code; } } } @@ -858,21 +875,21 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } -void streamResumeTask(SStreamTask* pTask) { +int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; int32_t code = 0; if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { - stError("s-task:%s invalid sched status:%d, not resume task", id, pTask->status.schedStatus); - return; + stError("s-task:%s invalid sched status:%d, not resume task", pTask->id.idStr, pTask->status.schedStatus); + return code; } while (1) { code = doStreamExecTask(pTask); if (code) { stError("s-task:%s failed to exec stream task, code:%s", id, tstrerror(code)); + return code; } - // check if continue streamMutexLock(&pTask->lock); @@ -888,7 +905,7 @@ void streamResumeTask(SStreamTask* pTask) { stDebug("s-task:%s exec completed, status:%s, sched-status:%d, lastExecTs:%" PRId64, id, p, pTask->status.schedStatus, pTask->status.lastExecTs); - return; + return code; } else { // check if this task needs to be idle for a while if (pTask->status.schedIdleTime > 0) { @@ -896,28 +913,31 @@ void streamResumeTask(SStreamTask* pTask) { streamMutexUnlock(&pTask->lock); setLastExecTs(pTask, taosGetTimestampMs()); - return; + return code; } } streamMutexUnlock(&pTask->lock); } + + return code; } int32_t streamExecTask(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. const char* id = pTask->id.idStr; + int32_t code = 0; int8_t schedStatus = streamTaskSetSchedStatusActive(pTask); if (schedStatus == TASK_SCHED_STATUS__WAITING) { - streamResumeTask(pTask); + code = streamResumeTask(pTask); } else { char* p = streamTaskGetStatus(pTask).name; stDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, p, pTask->status.schedStatus); } - return 0; + return code; } int32_t streamTaskReleaseState(SStreamTask* pTask) { From a33015e7120660022239758fa2bcc433fbb2a83a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Sep 2024 19:35:02 +0800 Subject: [PATCH 2/7] refactor: pass error code to invoker. --- include/util/taoserror.h | 1 + source/dnode/vnode/src/tq/tq.c | 6 +++- source/dnode/vnode/src/tq/tqRead.c | 5 +--- source/dnode/vnode/src/tq/tqStreamTask.c | 32 +++++++++++++++------- source/dnode/vnode/src/tqCommon/tqCommon.c | 6 ++-- source/libs/stream/src/streamQueue.c | 4 +-- source/util/src/terror.c | 1 + 7 files changed, 35 insertions(+), 20 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ba0930955c..9ea1b53564 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -976,6 +976,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) #define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) #define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) +#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3911822068..2d1d3ed357 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1007,9 +1007,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + if (code) { + tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; + } // let's continue scan data in the wal files - if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { + if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) { code = tqScanWalAsync(pTq, false); // it's ok to failed if (code) { tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e85c3a5f3a..eb5d6aafe7 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -351,11 +351,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con if (data == NULL) { // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then // retry - code = TSDB_CODE_OUT_OF_MEMORY; - terrno = code; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return code; + return terrno; } (void)memcpy(data, pBody, len); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6558012551..8f575b540f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); -static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. @@ -40,7 +40,7 @@ int32_t tqScanWal(STQ* pTq) { int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); if (code) { - tqError("vgId:%d failed to start all tasks, try next time", vgId); + tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); return code; } @@ -293,9 +293,11 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; + int32_t code = 0; + *pSucc = false; while (1) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { @@ -304,7 +306,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems } SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); @@ -327,10 +329,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems break; } } else { - tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); - code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); - if (code) { - tqError("s-task:%s failed to seek ver to:%"PRId64 " in wal", id, pTask->chkInfo.nextProcessVer); + if (code == TSDB_CODE_OUT_OF_MEMORY) { + tqError("s-task:%s failed to put data into inputQ, since out of memory"); + } else { + tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id, + pTask->chkInfo.nextProcessVer); + code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + if (code) { + tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer); + } + + code = 0; // reset the error code } break; @@ -339,7 +348,8 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems } *numOfItems += numOfNewItems; - return numOfNewItems > 0; + *pSucc = (numOfNewItems > 0); + return code; } int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { @@ -358,6 +368,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); streamMetaWUnLock(pStreamMeta); if (pTaskList == NULL) { + tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno)); return terrno; } @@ -405,7 +416,8 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems); + bool hasNewData = false; + code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData); streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 83cbb4d3b9..c31b9b598c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -78,7 +78,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return -1; + return terrno; } qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } @@ -840,7 +840,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t idle = taosGetTimestampMs() - execTs; tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); - streamResumeTask(pTask); + code = streamResumeTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, @@ -849,7 +849,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead streamMetaReleaseTask(pMeta, pTask); } - return 0; + return code; } SStreamTask* pTask = NULL; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index cfa49fd92e..6c3685bb0f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_STREAM_INPUTQ_FULL; } int32_t msgLen = px->submit.msgLen; @@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamFreeQitem(pItem); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_STREAM_INPUTQ_FULL; } int32_t code = taosWriteQitem(pQueue, pItem); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ccf30438bd..2c9de4526e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -816,6 +816,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags") From 99dbb78992f3be1477f0e0b91aade9d6977f45a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Sep 2024 22:43:55 +0800 Subject: [PATCH 3/7] refactor: check return value for stream. --- source/dnode/vnode/src/tq/tq.c | 16 +++-- source/libs/stream/src/streamCheckStatus.c | 34 ++++++----- source/libs/stream/src/streamMeta.c | 70 +++++++--------------- source/libs/stream/src/streamTask.c | 3 +- 4 files changed, 54 insertions(+), 69 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2d1d3ed357..a2c088de68 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -102,7 +102,6 @@ int32_t tqOpen(const char* path, SVnode* pVnode) { int32_t tqInitialize(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = streamMetaOpen(pTq->path, pTq, tqBuildStreamTask, tqExpandStreamTask, vgId, -1, tqStartTaskCompleteCallback, &pTq->pStreamMeta); if (code != TSDB_CODE_SUCCESS) { @@ -110,7 +109,6 @@ int32_t tqInitialize(STQ* pTq) { } streamMetaLoadAllTasks(pTq->pStreamMeta); - return tqMetaOpen(pTq); } @@ -713,8 +711,7 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { - STQ* pTq = (STQ*)pTqObj; - + STQ* pTq = (STQ*)pTqObj; int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); @@ -744,16 +741,25 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV SSchemaWrapper* pschemaWrapper = pOutputInfo->tbSink.pSchemaWrapper; pOutputInfo->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); if (pOutputInfo->tbSink.pTSchema == NULL) { - return -1; + return terrno; } pOutputInfo->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pOutputInfo->tbSink.pTblInfo == NULL) { + tqError("vgId:%d failed init sink tableInfo, code:%s", vgId, tstrerror(terrno)); + return terrno; + } + tSimpleHashSetFreeFp(pOutputInfo->tbSink.pTblInfo, freePtr); } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); + if (pTask->exec.pWalReader == NULL) { + tqError("vgId:%d failed init wal reader, code:%s", vgId, tstrerror(terrno)); + return terrno; + } } streamTaskResetUpstreamStageInfo(pTask); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 540199bb90..2bfab82805 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -21,7 +21,7 @@ #define CHECK_NOT_RSP_DURATION 10 * 1000 // 10 sec static void processDownstreamReadyRsp(SStreamTask* pTask); -static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); +static int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId); static void rspMonitorFn(void* param, void* tmrId); static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); @@ -226,13 +226,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64 ", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart", id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); - addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); + code = addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId); } else { stError( "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "downstream again, nodeUpdate needed", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); - addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); + code = addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId); } streamMetaAddFailedTaskSelf(pTask, now); @@ -371,12 +371,14 @@ void processDownstreamReadyRsp(SStreamTask* pTask) { } } -void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { +int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t vgId = pTask->pMeta->vgId; + int32_t code = 0;; + bool existed = false; streamMutexLock(&pTask->lock); + int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList); - bool existed = false; for (int i = 0; i < num; ++i) { SDownstreamTaskEpset* p = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, i); if (p == NULL) { @@ -391,15 +393,19 @@ void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { if (!existed) { SDownstreamTaskEpset t = {.nodeId = nodeId}; - void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); + + void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); if (p == NULL) { - // todo let's retry + code = terrno; + stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, tstrerror(code)); + } else { + stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, + vgId, t.nodeId, (num + 1)); } - stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, - t.nodeId, (num + 1)); } streamMutexUnlock(&pTask->lock); + return code; } void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { @@ -629,6 +635,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); + int32_t code = 0; pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { @@ -640,14 +647,13 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t taskId = *px; SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, taskId, &p); + if (p != NULL) { if (p->status != -1 || p->rspTs != 0) { - stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, pTask->id.idStr, i, - p->status, p->rspTs); + stError("s-task:%s invalid rsp record entry, index:%d, status:%d, rspTs:%" PRId64, id, i, p->status, p->rspTs); continue; } - - int32_t code = doSendCheckMsg(pTask, p); + code = doSendCheckMsg(pTask, p); } } @@ -666,7 +672,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { SDownstreamStatusInfo* p = NULL; findCheckRspStatus(pInfo, *pTaskId, &p); if (p != NULL) { - addIntoNodeUpdateList(pTask, p->vgId); + code = addIntoNodeUpdateList(pTask, p->vgId); stDebug("s-task:%s vgId:%d downstream task:0x%x (vgId:%d) timeout more than 100sec, add into nodeUpate list", id, vgId, p->taskId, p->vgId); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0417fb2182..514e25c689 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -368,8 +368,9 @@ void streamMetaRemoveDB(void* arg, char* key) { int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, FTaskExpand expandTaskFn, int32_t vgId, int64_t stage, startComplete_fn_t fn, SStreamMeta** p) { - int32_t code = 0; QRY_PARAM_CHECK(p); + int32_t code = 0; + int32_t lino = 0; SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); if (pMeta == NULL) { @@ -379,23 +380,18 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, int32_t len = strlen(path) + 64; char* tpath = taosMemoryCalloc(1, len); - if (tpath == NULL) { - code = terrno; - goto _err; - } + TSDB_CHECK_NULL(tpath, code, lino, _err, terrno); sprintf(tpath, "%s%s%s", path, TD_DIRSEP, "stream"); pMeta->path = tpath; code = streamMetaOpenTdb(pMeta); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); if ((code = streamMetaMayCvtDbFormat(pMeta)) < 0) { stError("vgId:%d convert sub info format failed, open stream meta failed, reason: %s", pMeta->vgId, tstrerror(terrno)); - goto _err; + TSDB_CHECK_CODE(code, lino, _err); } if ((code = streamMetaBegin(pMeta) < 0)) { @@ -405,28 +401,17 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); pMeta->pTasksMap = taosHashInit(64, fp, true, HASH_NO_LOCK); - if (pMeta->pTasksMap == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->pTasksMap, code, lino, _err, terrno); pMeta->updateInfo.pTasks = taosHashInit(64, fp, false, HASH_NO_LOCK); - if (pMeta->updateInfo.pTasks == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->updateInfo.pTasks, code, lino, _err, terrno); code = streamMetaInitStartInfo(&pMeta->startInfo); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); // task list pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId)); - if (pMeta->pTaskList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno); pMeta->scanInfo.scanCounter = 0; pMeta->vgId = vgId; @@ -440,59 +425,47 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, pMeta->startInfo.completeFn = fn; pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + TSDB_CHECK_NULL(pMeta->pTaskDbUnique, code, lino, _err, terrno); pMeta->numOfPausedTasks = 0; pMeta->numOfStreamTasks = 0; pMeta->closeFlag = false; stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage); - pMeta->rid = taosAddRef(streamMetaId, pMeta); // set the attribute when running on Linux OS TdThreadRwlockAttr attr; code = taosThreadRwlockAttrInit(&attr); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); #ifdef LINUX code = pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); #endif code = taosThreadRwlockInit(&pMeta->lock, &attr); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = taosThreadRwlockAttrDestroy(&attr); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); code = metaRefMgtAdd(pMeta->vgId, pRid); - if (code) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = createMetaHbInfo(pRid, &pMeta->pHbInfo); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); + TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno); code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt); - if (code != 0) { - goto _err; - } + TSDB_CHECK_CODE(code, lino, _err); code = taosThreadMutexInit(&pMeta->backendMutex, NULL); + TSDB_CHECK_CODE(code, lino, _err); *p = pMeta; return code; @@ -526,9 +499,10 @@ _err: if (pMeta->startInfo.pReadyTaskSet) taosHashCleanup(pMeta->startInfo.pReadyTaskSet); if (pMeta->startInfo.pFailedTaskSet) taosHashCleanup(pMeta->startInfo.pFailedTaskSet); if (pMeta->bkdChkptMgt) bkdMgtDestroy(pMeta->bkdChkptMgt); + taosMemoryFree(pMeta); - stError("failed to open stream meta, reason:%s", tstrerror(terrno)); + stError("vgId:%d failed to open stream meta, at line:%d reason:%s", vgId, lino, tstrerror(code)); return code; } @@ -1274,7 +1248,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { void streamMetaStartHb(SStreamMeta* pMeta) { int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); if (pRid == NULL) { - stError("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId); + stFatal("vgId:%d failed to prepare the metaHb to mnode, hbMsg will not started, code: out of memory", pMeta->vgId); return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fb2456e1cd..9a324084ff 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -487,8 +487,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i STaskOutputInfo* pOutputInfo = &pTask->outputInfo; pOutputInfo->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket)); if (pOutputInfo->pTokenBucket == NULL) { - stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, - tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(terrno)); return terrno; } From 1a90e9612db9748ffa2ec10675ac6f6da6ceb735 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 13 Sep 2024 23:04:41 +0800 Subject: [PATCH 4/7] refactor: check return value. --- include/libs/executor/executor.h | 4 +-- include/libs/executor/storageapi.h | 2 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 5 +-- source/dnode/vnode/src/tq/tqUtil.c | 5 +-- source/dnode/vnode/src/tqCommon/tqCommon.c | 20 +++++++---- source/dnode/vnode/src/tsdb/tsdbRead2.c | 9 +++-- source/dnode/vnode/src/vnd/vnodeInitApi.c | 2 +- source/libs/executor/src/executor.c | 40 ++++++++++++---------- source/libs/stream/src/streamState.c | 4 ++- 10 files changed, 56 insertions(+), 37 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ed56b7e6b2..ae26d5f2ae 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -74,7 +74,7 @@ typedef enum { * @param vgId * @return */ -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); +int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId); /** * Create the exec task for queue mode @@ -93,7 +93,7 @@ int32_t qGetTableList(int64_t suid, void* pVnode, void* node, SArray **tableList * @param taskId * @param queryId */ -void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); +int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 61ae034450..4fc7e5eac5 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -173,7 +173,7 @@ typedef struct TsdReader { int32_t (*tsdReaderOpen)(void* pVnode, SQueryTableDataCond* pCond, void* pTableList, int32_t numOfTables, SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables); void (*tsdReaderClose)(); - void (*tsdSetReaderTaskId)(void *pReader, const char *pId); + int32_t (*tsdSetReaderTaskId)(void *pReader, const char *pId); int32_t (*tsdSetQueryTableList)(); int32_t (*tsdNextDataBlock)(); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2f56aac7d6..827f7e2044 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -164,7 +164,7 @@ typedef struct STsdbReader STsdbReader; int32_t tsdbReaderOpen2(void *pVnode, SQueryTableDataCond *pCond, void *pTableList, int32_t numOfTables, SSDataBlock *pResBlock, void **ppReader, const char *idstr, SHashObj **pIgnoreTables); int32_t tsdbSetTableList2(STsdbReader *pReader, const void *pTableList, int32_t num); -void tsdbReaderSetId2(STsdbReader *pReader, const char *idstr); +int32_t tsdbReaderSetId(void *pReader, const char *idstr); void tsdbReaderClose2(STsdbReader *pReader); int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 19c5b5d481..69819c87dc 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -314,8 +314,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SReadHandle handle = {.vnode = pVnode, .initTqReader = 1, .skipRollup = 1, .pStateBackend = pStreamState}; initStorageAPI(&handle.api); - pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); - if (!pRSmaInfo->taskInfo[idx]) { + + code = qCreateStreamExecTaskInfo(&pRSmaInfo->taskInfo[idx], param->qmsg[idx], &handle, TD_VID(pVnode), 0); + if (!pRSmaInfo->taskInfo[idx] || (code != 0)) { TAOS_RETURN(TSDB_CODE_RSMA_QTASKINFO_CREATE); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index ff9f1e524e..495fcd771a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -145,12 +145,13 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, terrno = 0; SMqDataRsp dataRsp = {0}; - int code = tqInitDataRsp(&dataRsp.common, *pOffset); + + int code = tqInitDataRsp(&dataRsp.common, *pOffset); if (code != 0) { goto end; } - qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); + code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { goto end; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c31b9b598c..3f4329f22b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -36,6 +36,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { int64_t st = taosGetTimestampMs(); int64_t streamId = 0; int32_t taskId = 0; + int32_t code = 0; tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId); @@ -52,7 +53,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { pTask->pState = streamStateOpen(pMeta->path, pTask, streamId, taskId); if (pTask->pState == NULL) { tqError("s-task:%s (vgId:%d) failed to open state for task, expand task failed", pTask->id.idStr, vgId); - return -1; + return terrno; } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } @@ -75,18 +76,23 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { initStorageAPI(&handle.api); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG) { - pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); - if (pTask->exec.pExecutor == NULL) { - tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return terrno; + code = qCreateStreamExecTaskInfo(&pTask->exec.pExecutor, pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); + if (code) { + tqError("s-task:%s failed to expand task, code:%s", pTask->id.idStr, tstrerror(code)); + return code; + } + + code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); + if (code) { + + return code; } - qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el); - return TSDB_CODE_SUCCESS; + return code; } void tqSetRestoreVersionInfo(SStreamTask* pTask) { diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index e39ac2e45d..731b733b52 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -5986,13 +5986,18 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact } // if failed, do nothing -void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { +int32_t tsdbReaderSetId(void* p, const char* idstr) { + STsdbReader* pReader = (STsdbReader*) p; taosMemoryFreeClear(pReader->idStr); + pReader->idStr = taosStrdup(idstr); if (pReader->idStr == NULL) { - // no need to do anything + tsdbError("%s failed to build reader id, code:%s", idstr, tstrerror(terrno)); + return terrno; } + pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr; + return 0; } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ } diff --git a/source/dnode/vnode/src/vnd/vnodeInitApi.c b/source/dnode/vnode/src/vnd/vnodeInitApi.c index 9be84b99f4..59a129cac8 100644 --- a/source/dnode/vnode/src/vnd/vnodeInitApi.c +++ b/source/dnode/vnode/src/vnd/vnodeInitApi.c @@ -59,7 +59,7 @@ void initTsdbReaderAPI(TsdReader* pReader) { pReader->tsdReaderGetNumOfInMemRows = tsdbGetNumOfRowsInMemTable2; // todo this function should be moved away pReader->tsdSetQueryTableList = tsdbSetTableList2; - pReader->tsdSetReaderTaskId = (void (*)(void*, const char*))tsdbReaderSetId2; + pReader->tsdSetReaderTaskId = tsdbReaderSetId; pReader->tsdSetFilesetDelimited = (void (*)(void*))tsdbSetFilesetDelimited; pReader->tsdSetSetNotifyCb = (void (*)(void*, TsdReaderNotifyCbFn, void*))tsdbReaderSetNotifyCb; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index ebec9f9373..0033e14a2d 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -204,28 +204,34 @@ _end: return code; } -void doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) { +int32_t doSetTaskId(SOperatorInfo* pOperator, SStorageAPI* pAPI) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { SStreamScanInfo* pStreamScanInfo = pOperator->info; if (pStreamScanInfo->pTableScanOp != NULL) { STableScanInfo* pScanInfo = pStreamScanInfo->pTableScanOp->info; if (pScanInfo->base.dataReader != NULL) { - pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str); + int32_t code = pAPI->tsdReader.tsdSetReaderTaskId(pScanInfo->base.dataReader, pTaskInfo->id.str); + if (code) { + qError("failed to set reader id for executor, code:%s", tstrerror(code)); + return code; + } } } } else { - doSetTaskId(pOperator->pDownstream[0], pAPI); + return doSetTaskId(pOperator->pDownstream[0], pAPI); } + + return 0; } -void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { +int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { SExecTaskInfo* pTaskInfo = tinfo; pTaskInfo->id.queryId = queryId; buildTaskId(taskId, queryId, pTaskInfo->id.str); // set the idstr for tsdbReader - doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); + return doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); } int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { @@ -337,33 +343,31 @@ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int3 return pTaskInfo; } -qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { +int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle* readers, int32_t vgId, int32_t taskId) { if (msg == NULL) { - return NULL; + return TSDB_CODE_INVALID_PARA; } + *pTaskInfo = NULL; + SSubplan* pPlan = NULL; int32_t code = qStringToSubplan(msg, &pPlan); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; + return code; } - qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(readers, vgId, taskId, pPlan, &pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); + code = qCreateExecTask(readers, vgId, taskId, pPlan, pTaskInfo, NULL, 0, NULL, OPTR_EXEC_MODEL_STREAM); if (code != TSDB_CODE_SUCCESS) { - qDestroyTask(pTaskInfo); - terrno = code; - return NULL; + qDestroyTask(*pTaskInfo); + return code; } code = qStreamInfoResetTimewindowFilter(pTaskInfo); if (code != TSDB_CODE_SUCCESS) { - qDestroyTask(pTaskInfo); - terrno = code; - return NULL; + qDestroyTask(*pTaskInfo); } - return pTaskInfo; + + return code; } static int32_t filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr, diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index e6754d7bfd..0e2d31cc8f 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -101,8 +101,10 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) { SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, int32_t taskId) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); stDebug("open stream state %p, %s", pState, path); + if (pState == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; QUERY_CHECK_CODE(code, lino, _end); @@ -138,7 +140,7 @@ SStreamState* streamStateOpen(const char* path, void* pTask, int64_t streamId, i _end: if (code != TSDB_CODE_SUCCESS) { - qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + qError("0x%x %s failed at line %d since %s", taskId, __func__, lino, tstrerror(code)); } return NULL; } From 3b9ca7b7be3a24bb888cf1446dd655a8e48bd57f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 14 Sep 2024 09:21:15 +0800 Subject: [PATCH 5/7] fix(stream): fix syntax error. --- source/libs/stream/src/streamCheckStatus.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 2bfab82805..6353904b07 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -397,7 +397,7 @@ int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { void* p = taosArrayPush(pTask->outputInfo.pNodeEpsetUpdateList, &t); if (p == NULL) { code = terrno; - stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, tstrerror(code)); + stError("s-task:%s vgId:%d failed to update epset, code:%s", pTask->id.idStr, vgId, tstrerror(code)); } else { stInfo("s-task:%s vgId:%d downstream nodeId:%d needs to be updated, total needs updated:%d", pTask->id.idStr, vgId, t.nodeId, (num + 1)); From 31354d47678ed2f7345598ab5854848056da600b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 14 Sep 2024 12:22:51 +0800 Subject: [PATCH 6/7] fix(stream): fix syntax error. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 8f575b540f..8cd04b48f6 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -330,7 +330,7 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt } } else { if (code == TSDB_CODE_OUT_OF_MEMORY) { - tqError("s-task:%s failed to put data into inputQ, since out of memory"); + tqError("s-task:%s failed to put data into inputQ, since out of memory", id); } else { tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); From b494163f28682e054ac48ce39e43804d6bfcf361 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 14 Sep 2024 13:39:23 +0800 Subject: [PATCH 7/7] refactor: check return value. --- source/libs/executor/src/executor.c | 8 +++++-- source/libs/executor/src/operator.c | 1 - source/libs/executor/src/scanoperator.c | 23 ++++++++++++------- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 16 +++++++++---- source/libs/stream/src/streamCheckStatus.c | 7 +++++- source/libs/stream/src/streamCheckpoint.c | 10 ++++++-- source/libs/stream/src/streamExec.c | 9 ++++++-- 8 files changed, 55 insertions(+), 21 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0033e14a2d..cd43c5c99e 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -362,7 +362,7 @@ int32_t qCreateStreamExecTaskInfo(qTaskInfo_t* pTaskInfo, void* msg, SReadHandle return code; } - code = qStreamInfoResetTimewindowFilter(pTaskInfo); + code = qStreamInfoResetTimewindowFilter(*pTaskInfo); if (code != TSDB_CODE_SUCCESS) { qDestroyTask(*pTaskInfo); } @@ -631,9 +631,13 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, // pSinkParam has been freed during create sinker. code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str); + if (code) { + qError("s-task:%s failed to create data sinker, code:%s", (*pTask)->id.str, tstrerror(code)); + } } - qDebug("subplan task create completed, TID:0x%" PRIx64 "QID:0x%" PRIx64, taskId, pSubplan->id.queryId); + qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64 " code:%s", taskId, pSubplan->id.queryId, + tstrerror(code)); _error: // if failed to add ref for all tables in this query, abort current query diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 88b9f6bf55..fe2f3f8dfe 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -394,7 +394,6 @@ int32_t createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHand } } - //pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); code = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo, &pOperator); if (code) { pTaskInfo->code = code; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4369b1df54..b6b5c5484e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3894,8 +3894,8 @@ static void destroyStreamScanOperatorInfo(void* param) { if (param == NULL) { return; } - SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; + SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { destroyOperator(pStreamScan->pTableScanOp); } @@ -3914,7 +3914,10 @@ static void destroyStreamScanOperatorInfo(void* param) { cleanupExprSupp(&pStreamScan->tbnameCalSup); cleanupExprSupp(&pStreamScan->tagCalSup); - pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); + if (pStreamScan->stateStore.updateInfoDestroy) { + pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); + } + blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); blockDataDestroy(pStreamScan->pDeleteDataRes); @@ -4127,16 +4130,13 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } pInfo->pBlockLists = taosArrayInit(4, sizeof(SPackedData)); - if (pInfo->pBlockLists == NULL) { - code = terrno; - goto _error; - } + TSDB_CHECK_NULL(pInfo->pBlockLists, code, lino, _error, terrno); if (pHandle->vnode) { SOperatorInfo* pTableScanOp = NULL; code = createTableScanOperatorInfo(pTableScanNode, pHandle, pTableListInfo, pTaskInfo, &pTableScanOp); if (pTableScanOp == NULL || code != 0) { - qError("createTableScanOperatorInfo error, errorcode: %d", pTaskInfo->code); + qError("createTableScanOperatorInfo error, code:%d", pTaskInfo->code); goto _error; } @@ -4180,6 +4180,7 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* // set the extract column id to streamHandle pAPI->tqReaderFn.tqReaderSetColIdList(pInfo->tqReader, pColIds); + SArray* tableIdList = NULL; code = extractTableIdList(((STableScanInfo*)(pInfo->pTableScanOp->info))->base.pTableListInfo, &tableIdList); QUERY_CHECK_CODE(code, lino, _error); @@ -4189,9 +4190,11 @@ int32_t createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* } else { taosArrayDestroy(pColIds); tableListDestroy(pTableListInfo); - pColIds = NULL; } + // clear the local variable to avoid repeatly free + pColIds = NULL; + // create the pseduo columns info if (pTableScanNode->scan.pScanPseudoCols != NULL) { code = createExprInfo(pTableScanNode->scan.pScanPseudoCols, NULL, &pInfo->pPseudoExpr, &pInfo->numOfPseudoExpr); @@ -4268,6 +4271,10 @@ _error: } if (pInfo != NULL) { + STableScanInfo* p = (STableScanInfo*) pInfo->pTableScanOp->info; + if (p != NULL) { + p->base.pTableListInfo = NULL; + } destroyStreamScanOperatorInfo(pInfo); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 350bd35490..a5c5c1b775 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -238,7 +238,7 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t up int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId, int64_t checkpointId, SRpcMsg* pMsg); -void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); +int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); #ifdef __cplusplus diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d79e5eb143..d9b6671d9b 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -2573,11 +2573,15 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) { } STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { - char* err = NULL; - char** cfNames = NULL; - size_t nCf = 0; + char* err = NULL; + char** cfNames = NULL; + size_t nCf = 0; + int32_t code = 0; + int32_t lino = 0; STaskDbWrapper* pTaskDb = taosMemoryCalloc(1, sizeof(STaskDbWrapper)); + TSDB_CHECK_NULL(pTaskDb, code, lino, _EXIT, terrno); + pTaskDb->idstr = key ? taosStrdup(key) : NULL; pTaskDb->path = statePath ? taosStrdup(statePath) : NULL; @@ -2592,6 +2596,7 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err); if (pTaskDb->db == NULL) { stError("%s open state-backend failed, reason:%s", key, err); + code = TSDB_CODE_STREAM_INTERNAL_ERROR; goto _EXIT; } @@ -2608,11 +2613,12 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err); if (err != NULL) { stError("%s failed to create column-family, %s, %" PRIzu ", reason:%s", key, dbPath, nCf, err); + code = TSDB_CODE_STREAM_INTERNAL_ERROR; goto _EXIT; } } - if (taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf) != 0) { + if ((code = taskDbOpenCfs(pTaskDb, dbPath, cfNames, nCf)) != 0) { goto _EXIT; } @@ -2625,6 +2631,8 @@ STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) { return pTaskDb; _EXIT: + stError("%s taskDb open failed, %s at line:%d code:%s", key, __func__, lino, tstrerror(code)); + taskDbDestroy(pTaskDb, false); if (err) taosMemoryFree(err); if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 6353904b07..76e74db33f 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -258,6 +258,11 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); + if (buf == NULL) { + stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code)); + return terrno; + } + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); @@ -268,7 +273,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); - code = (code >= 0)? 0:code; + code = TMIN(code, 0); return code; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b0f6f45110..35d5ba4e08 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -365,7 +365,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock // The transfer of state may generate new data that need to dispatch to downstream tasks, // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed // before the next checkpoint. - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (code) { + return code; + } if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); @@ -398,7 +401,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock code = streamTaskBuildCheckpoint(pTask); // todo: not handle error yet } else { // source & agg tasks need to forward the checkpoint msg downwards stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); - flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); + if (code) { + return code; + } // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by // this task already. And then, dispatch check point msg to all downstream tasks diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4fc00a6f59..88e40b247b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -676,7 +676,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock return code; } -void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { +int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { const char* id = pTask->id.idStr; // 1. transfer the ownership of executor state @@ -717,7 +717,12 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB } // 2. flush data in executor to K/V store, which should be completed before do checkpoint in the K/V. - doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); + int32_t code = doStreamTaskExecImpl(pTask, pCheckpointBlock, 1); + if(code) { + stError("s-task:%s failed to exec stream task before checkpoint, code:%s", id, tstrerror(code)); + } + + return code; } /**