Merge pull request #25910 from taosdata/fix/TD-30028
fix(stream): check if outputQ is full before execute the scan history task. if it is full, idle for 5 sec.
This commit is contained in:
commit
499ca6c23b
|
@ -1639,11 +1639,11 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
|||
// info
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
const char *sinkStr = "%.2fMiB";
|
||||
sprintf(buf, sinkStr, pe->sinkDataSize);
|
||||
snprintf(buf, tListLen(buf), sinkStr, pe->sinkDataSize);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
// offset info
|
||||
const char *offsetStr = "%" PRId64 " [%" PRId64 ", %" PRId64 "]";
|
||||
sprintf(buf, offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
||||
snprintf(buf, tListLen(buf), offsetStr, pe->processedVer, pe->verRange.minVer, pe->verRange.maxVer);
|
||||
}
|
||||
|
||||
STR_TO_VARSTR(vbuf, buf);
|
||||
|
|
|
@ -184,16 +184,16 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t handleResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
|
||||
static int32_t handleSanhistoryResultBlocks(SStreamTask* pTask, SArray* pRes, int32_t size) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (taosArrayGetSize(pRes) > 0) {
|
||||
SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(NULL, pTask, size, pRes);
|
||||
code = doOutputResultBlockImpl(pTask, pStreamBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stDebug("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
if (code != TSDB_CODE_SUCCESS) { // should not have error code
|
||||
stError("s-task:%s dump fill-history results failed, code:%s", pTask->id.idStr, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
taosArrayDestroy(pRes);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -268,6 +268,17 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
|
||||
}
|
||||
|
||||
// output queue is full, idle for 5 sec.
|
||||
if (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
stWarn("s-task:%s outputQ is full, idle for 1sec and retry", id);
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, STREAM_SCAN_HISTORY_TIMESLICE);
|
||||
}
|
||||
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
stWarn("s-task:%s downstream task inputQ blocked, idle for 5sec and retry", id);
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
if (pRes == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -284,19 +295,13 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
|
|||
}
|
||||
|
||||
// dispatch the generated results
|
||||
/*int32_t code = */handleResultBlocks(pTask, pRes, size);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
|
||||
// downstream task input queue is full, try in 5sec
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
|
||||
}
|
||||
/*int32_t code = */handleSanhistoryResultBlocks(pTask, pRes, size);
|
||||
|
||||
if (finished) {
|
||||
return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
|
||||
}
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
|
||||
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
|
||||
pTask->info.fillHistory, el / 1000.0);
|
||||
|
@ -558,7 +563,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
|
||||
if (streamQueueIsFull(pTask->outputq.queue)) {
|
||||
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
|
||||
streamTaskSetIdleInfo(pTask, 500);
|
||||
streamTaskSetIdleInfo(pTask, 1000);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -381,7 +381,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
|
|||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskInitTokenBucket(STokenBucket* pBucket, int32_t numCap, int32_t numRate, float quotaRate,
|
||||
|
|
Loading…
Reference in New Issue