Merge pull request #22266 from taosdata/fix/3_liaohj
fix(stream): dump results to sink node before paused.
This commit is contained in:
commit
fb98ede512
|
@ -590,7 +590,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask);
|
|||
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
|
||||
|
||||
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz);
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
|
||||
|
||||
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
|
||||
|
||||
|
|
|
@ -186,11 +186,6 @@ void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) {
|
|||
doSetTaskId(pTaskInfo->pRoot, &pTaskInfo->storageAPI);
|
||||
}
|
||||
|
||||
//void qSetTaskCode(qTaskInfo_t tinfo, int32_t code) {
|
||||
// SExecTaskInfo* pTaskInfo = tinfo;
|
||||
// pTaskInfo->code = code;
|
||||
//}
|
||||
|
||||
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
|
||||
if (tinfo == NULL) {
|
||||
return TSDB_CODE_APP_ERROR;
|
||||
|
@ -652,23 +647,33 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
|
||||
*pRes = NULL;
|
||||
int64_t curOwner = 0;
|
||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||
|
||||
// todo extract method
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
bool isKilled = isTaskKilled(pTaskInfo);
|
||||
if (isKilled) {
|
||||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pTaskInfo->owner != 0) {
|
||||
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
|
||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
pTaskInfo->owner = threadId;
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
if (pTaskInfo->cost.start == 0) {
|
||||
pTaskInfo->cost.start = taosGetTimestampUs();
|
||||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
clearStreamBlock(pTaskInfo->pRoot);
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// error occurs, record the error code and return to client
|
||||
int32_t ret = setjmp(pTaskInfo->env);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -772,11 +777,13 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
|||
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||
|
||||
taosWLockLatch(&pTaskInfo->lock);
|
||||
while (qTaskIsExecuting(pTaskInfo)) {
|
||||
taosMsleep(10);
|
||||
}
|
||||
|
||||
pTaskInfo->code = rspCode;
|
||||
taosWUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -162,7 +162,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
||||
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) {
|
||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
|
@ -174,7 +174,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0;
|
||||
qDebug("s-task:%s paused from the scan-history task, elapsed time:%.2fsec", pTask->id.idStr, el);
|
||||
return 0;
|
||||
break;
|
||||
}
|
||||
|
||||
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
|
@ -190,23 +190,18 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
break;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if (qExecTask(exec, &output, &ts) < 0) {
|
||||
code = qExecTask(exec, &output, &ts);
|
||||
if (code != TSDB_CODE_TSC_QUERY_KILLED && code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s scan-history data error occurred code:%s, continue scan", pTask->id.idStr, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
|
||||
if (output == NULL && qStreamRecoverScanFinished(exec)) {
|
||||
finished = true;
|
||||
// the generated results before fill-history task been paused, should be dispatched to sink node
|
||||
if (output == NULL) {
|
||||
finished = qStreamRecoverScanFinished(exec);
|
||||
break;
|
||||
} else {
|
||||
if (output == NULL) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock block = {0};
|
||||
|
@ -214,9 +209,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
block.info.childId = pTask->info.selfChildId;
|
||||
taosArrayPush(pRes, &block);
|
||||
|
||||
numOfBlocks++;
|
||||
qDebug("s-task:%s scan exec numOfBlocks:%d, limit:%d", pTask->id.idStr, numOfBlocks, batchSz);
|
||||
if (numOfBlocks >= batchSz) {
|
||||
if ((++numOfBlocks) >= batchSize) {
|
||||
qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -310,11 +304,12 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
|
||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||
|
||||
// todo. the dropping status should be append to the status after the halt completed.
|
||||
// It must be halted for a source stream task, since when the related scan-history-data task start scan the history
|
||||
// for the step 2. For a agg task
|
||||
// for the step 2.
|
||||
int8_t status = pStreamTask->status.taskStatus;
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
ASSERT(status == TASK_STATUS__HALT);
|
||||
ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING);
|
||||
} else {
|
||||
ASSERT(status == TASK_STATUS__SCAN_HISTORY);
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||
|
|
Loading…
Reference in New Issue