fix(stream): fix error in pause stream.
This commit is contained in:
parent
f84bfc96fb
commit
ceb06635fe
|
@ -317,6 +317,60 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
const char* id) {
|
||||
int32_t retryTimes = 0;
|
||||
int32_t MAX_RETRY_TIMES = 5;
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||
taosMsleep(10);
|
||||
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
|
||||
continue;
|
||||
}
|
||||
|
||||
qDebug("===stream===break batchSize:%d", *numOfBlocks);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// do not merge blocks for sink node
|
||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
*numOfBlocks = 1;
|
||||
*pInput = qItem;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pInput == NULL) {
|
||||
ASSERT((*numOfBlocks) == 0);
|
||||
*pInput = qItem;
|
||||
} else {
|
||||
// todo we need to sort the data block, instead of just appending into the array list.
|
||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||
if (newRet == NULL) {
|
||||
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pInput = newRet;
|
||||
}
|
||||
|
||||
*numOfBlocks += 1;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
|
||||
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||
|
@ -326,73 +380,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
while (1) {
|
||||
int32_t batchSize = 0;
|
||||
int16_t times = 0;
|
||||
|
||||
SStreamQueueItem* pInput = NULL;
|
||||
|
||||
// merge multiple input data if possible in the input queue.
|
||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||
|
||||
while (1) {
|
||||
if (streamTaskShouldPause(&pTask->status)) {
|
||||
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize);
|
||||
if (batchSize > 0) {
|
||||
break;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
||||
times++;
|
||||
taosMsleep(10);
|
||||
qDebug("===stream===try again batchSize:%d", batchSize);
|
||||
continue;
|
||||
}
|
||||
|
||||
qDebug("===stream===break batchSize:%d", batchSize);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pInput == NULL) {
|
||||
batchSize += 1;
|
||||
|
||||
pInput = qItem;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// todo we need to sort the data block, instead of just appending into the array list.
|
||||
ASSERT(batchSize >= 1);
|
||||
|
||||
void* newRet = streamMergeQueueItem(pInput, qItem);
|
||||
if (newRet == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
break;
|
||||
} else {
|
||||
batchSize += 1;
|
||||
|
||||
pInput = newRet;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
|
||||
if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if (streamTaskShouldStop(&pTask->status)) {
|
||||
// if (pInput) {
|
||||
// streamFreeQitem(pInput);
|
||||
// }
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
|
||||
if (pInput == NULL) {
|
||||
ASSERT(batchSize == 0);
|
||||
break;
|
||||
|
@ -409,8 +402,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
while (pTask->taskLevel == TASK_LEVEL__SOURCE) {
|
||||
int8_t status = atomic_load_8(&pTask->status.taskStatus);
|
||||
if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) {
|
||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id,
|
||||
atomic_load_8(&pTask->status.taskStatus));
|
||||
qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status);
|
||||
taosMsleep(100);
|
||||
} else {
|
||||
break;
|
||||
|
@ -463,6 +455,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
||||
id, el, resSize / 1048576.0, totalBlocks);
|
||||
|
||||
streamFreeQitem(pInput);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue