fix(stream): dump results to sink node before paused.

This commit is contained in:
Haojun Liao 2023-08-01 00:50:30 +08:00
parent 1e680d4df1
commit 0e3fd5277d
2 changed files with 4 additions and 6 deletions

View File

@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI); doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
} }
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
// SExecTaskInfo* pTaskInfo = tinfo;
// pTaskInfo->code = code;
//}
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
if (tinfo == NULL) { if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR; return TSDB_CODE_APP_ERROR;

View File

@ -197,10 +197,13 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
SSDataBlock* output = NULL; SSDataBlock* output = NULL;
uint64_t ts = 0; uint64_t ts = 0;
if (qExecTask(exec, &output, &ts) < 0) { code = qExecTask(exec, &output, &ts);
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
continue; continue;
} }
// the generated results before fill-history task been paused, should be dispatched to sink node
if (output == NULL && qStreamRecoverScanFinished(exec)) { if (output == NULL && qStreamRecoverScanFinished(exec)) {
finished = true; finished = true;
break; break;