Merge pull request #21960 from taosdata/fix/3_liaohj
fix(stream): fix error for extract msg from inputQ
This commit is contained in:
commit
7a417a33b0
|
@ -377,7 +377,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64
|
||||||
", status:%s, sched-status:%d",
|
", status:%s, sched-status:%d",
|
||||||
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||||
} else {
|
} else {
|
||||||
|
@ -399,6 +399,61 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||||
|
const char* id) {
|
||||||
|
int32_t retryTimes = 0;
|
||||||
|
int32_t MAX_RETRY_TIMES = 5;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
|
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
|
if (qItem == NULL) {
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||||
|
taosMsleep(10);
|
||||||
|
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("===stream===break batchSize:%d", *numOfBlocks);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// do not merge blocks for sink node
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
*numOfBlocks = 1;
|
||||||
|
*pInput = qItem;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*pInput == NULL) {
|
||||||
|
ASSERT((*numOfBlocks) == 0);
|
||||||
|
*pInput = qItem;
|
||||||
|
} else {
|
||||||
|
// todo we need to sort the data block, instead of just appending into the array list.
|
||||||
|
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||||
|
if (newRet == NULL) {
|
||||||
|
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
||||||
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pInput = newRet;
|
||||||
|
}
|
||||||
|
|
||||||
|
*numOfBlocks += 1;
|
||||||
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
|
||||||
|
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||||
|
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
* todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the
|
||||||
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
* appropriate batch of blocks should be handled in 5 to 10 sec.
|
||||||
|
@ -407,73 +462,15 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t batchSize = 1;
|
int32_t batchSize = 0;
|
||||||
int16_t times = 0;
|
|
||||||
|
|
||||||
SStreamQueueItem* pInput = NULL;
|
SStreamQueueItem* pInput = NULL;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
while (1) {
|
|
||||||
// downstream task's input queue is blocked, stop immediately
|
|
||||||
if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
|
|
||||||
streamTaskShouldStop(&pTask->status)) {
|
|
||||||
if (batchSize > 1) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
qDebug("123 %s", pTask->id.idStr);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
|
||||||
if (qItem == NULL) {
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) {
|
|
||||||
times++;
|
|
||||||
taosMsleep(10);
|
|
||||||
qDebug("===stream===try again batchSize:%d", batchSize);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("===stream===break batchSize:%d", batchSize);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pInput == NULL) {
|
|
||||||
pInput = qItem;
|
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// todo we need to sort the data block, instead of just appending into the array list.
|
|
||||||
void* newRet = NULL;
|
|
||||||
if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) {
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
batchSize++;
|
|
||||||
pInput = newRet;
|
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
|
||||||
|
|
||||||
if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) {
|
|
||||||
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id,
|
|
||||||
MAX_STREAM_EXEC_BATCH_NUM);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
|
||||||
if (pInput) {
|
|
||||||
streamFreeQitem(pInput);
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id);
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
|
ASSERT(batchSize == 0);
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
int32_t code = streamTransferStateToStreamTask(pTask);
|
int32_t code = streamTransferStateToStreamTask(pTask);
|
||||||
}
|
}
|
||||||
|
@ -532,8 +529,8 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks);
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
||||||
id, batchSize, el, resSize / 1048576.0, totalBlocks);
|
id, el, resSize / 1048576.0, totalBlocks);
|
||||||
|
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue