diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 44fe81ac09..48549fce42 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1639,11 +1639,11 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS // info if (pTask->info.taskLevel == TASK_LEVEL__SINK) { const char *sinkStr = "%.2fMiB"; - sprintf(buf, sinkStr, pe->sinkDataSize); + snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize); } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { // offset info const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]"; - sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); + snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer); } STR_TO_VARSTR(vbuf, buf); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 047b169ec9..934ff898a9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -184,16 +184,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { +static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) { int32_t code = TSDB_CODE_SUCCESS; if (taosArrayGetSize(pRes) > 0) { SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes); code = doOutputResultBlockImpl(pTask, pStreamBlocks); - if (code != TSDB_CODE_SUCCESS) { - stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); + if (code != TSDB_CODE_SUCCESS) { // should not have error code + stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code)); } } else { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } return code; } @@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } + // output queue is full, idle for 5 sec. + if (streamQueueIsFull(pTask->outputq.queue)) { + stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE); + } + + if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) { + stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id); + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); + } + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); if (pRes == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -284,19 +295,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { } // dispatch the generated results - /*int32_t code = */handleResultBlocks(pTask, pRes, size); - - int64_t el = taosGetTimestampMs() - st; - - // downstream task input queue is full, try in 5sec - if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); - } + /*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size); if (finished) { return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } + int64_t el = taosGetTimestampMs() - st; if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); @@ -558,7 +563,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { if (streamQueueIsFull(pTask->outputq.queue)) { stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id); - streamTaskSetIdleInfo(pTask, 500); + streamTaskSetIdleInfo(pTask, 1000); return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 5596eb3dee..247baea16f 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc } } - return TSDB_CODE_SUCCESS; + return code; } int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,