diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ba0930955c..9ea1b53564 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -976,6 +976,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_STREAM_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x4105) #define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106) #define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107) +#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108) // TDLite #define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3911822068..2d1d3ed357 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1007,9 +1007,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode)); + if (code) { + tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code)); + return code; + } // let's continue scan data in the wal files - if (code == 0 && (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK)) { + if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) { code = tqScanWalAsync(pTq, false); // it's ok to failed if (code) { tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e85c3a5f3a..eb5d6aafe7 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -351,11 +351,8 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con if (data == NULL) { // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then // retry - code = TSDB_CODE_OUT_OF_MEMORY; - terrno = code; - tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); - return code; + return terrno; } (void)memcpy(data, pBody, len); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6558012551..8f575b540f 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); -static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. @@ -40,7 +40,7 @@ int32_t tqScanWal(STQ* pTq) { int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); if (code) { - tqError("vgId:%d failed to start all tasks, try next time", vgId); + tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); return code; } @@ -293,9 +293,11 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; + int32_t code = 0; + *pSucc = false; while (1) { if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { @@ -304,7 +306,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems } SStreamQueueItem* pItem = NULL; - int32_t code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); + code = extractMsgFromWal(pTask->exec.pWalReader, (void**)&pItem, maxVer, id); if (code != TSDB_CODE_SUCCESS || pItem == NULL) { // failed, continue int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); bool itemInFillhistory = handleFillhistoryScanComplete(pTask, currentVer); @@ -327,10 +329,17 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems break; } } else { - tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer); - code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); - if (code) { - tqError("s-task:%s failed to seek ver to:%"PRId64 " in wal", id, pTask->chkInfo.nextProcessVer); + if (code == TSDB_CODE_OUT_OF_MEMORY) { + tqError("s-task:%s failed to put data into inputQ, since out of memory"); + } else { + tqTrace("s-task:%s append input queue failed, code:inputQ is full, ver:%" PRId64, id, + pTask->chkInfo.nextProcessVer); + code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + if (code) { + tqError("s-task:%s failed to seek ver to:%" PRId64 " in wal", id, pTask->chkInfo.nextProcessVer); + } + + code = 0; // reset the error code } break; @@ -339,7 +348,8 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems } *numOfItems += numOfNewItems; - return numOfNewItems > 0; + *pSucc = (numOfNewItems > 0); + return code; } int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { @@ -358,6 +368,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL); streamMetaWUnLock(pStreamMeta); if (pTaskList == NULL) { + tqError("vgId:%d failed to create task list dup, code:%s", vgId, tstrerror(terrno)); return terrno; } @@ -405,7 +416,8 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems); + bool hasNewData = false; + code = doPutDataIntoInputQ(pTask, maxVer, &numOfItems, &hasNewData); streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 83cbb4d3b9..c31b9b598c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -78,7 +78,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, vgId, pTask->id.taskId); if (pTask->exec.pExecutor == NULL) { tqError("s-task:%s failed to create exec taskInfo, failed to expand task", pTask->id.idStr); - return -1; + return terrno; } qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } @@ -840,7 +840,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead int32_t idle = taosGetTimestampMs() - execTs; tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs); - streamResumeTask(pTask); + code = streamResumeTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, @@ -849,7 +849,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead streamMetaReleaseTask(pMeta, pTask); } - return 0; + return code; } SStreamTask* pTask = NULL; diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index cfa49fd92e..6c3685bb0f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -287,7 +287,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_STREAM_INPUTQ_FULL; } int32_t msgLen = px->submit.msgLen; @@ -312,7 +312,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_QUEUE_CAPACITY_IN_SIZE, total, size); streamFreeQitem(pItem); - return TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_STREAM_INPUTQ_FULL; } int32_t code = taosWriteQitem(pQueue, pItem); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ccf30438bd..2c9de4526e 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -816,6 +816,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error") TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode") +TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full") // TDLite TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")