Merge pull request #22476 from taosdata/fix/3_liaohj
fix(stream): pause when outputQ is blocked.
This commit is contained in:
commit
10872240ae
|
@ -26,6 +26,9 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define ONE_MB_F (1048576.0)
|
||||
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
void* timer;
|
||||
|
|
|
@ -18,9 +18,8 @@
|
|||
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||
#define ONE_MB_F (1048576.0)
|
||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||
|
||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||
SStreamGlobalEnv streamEnv;
|
||||
|
||||
int32_t streamInit() {
|
||||
|
@ -178,7 +177,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
// todo add log
|
||||
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
int32_t code = 0;
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
|
@ -191,11 +189,12 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
} else {
|
||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||
if (code != 0) { // todo failed to add it into the output queue, free it.
|
||||
return code;
|
||||
if (code != 0) {
|
||||
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
|
||||
}
|
||||
|
||||
streamDispatchStreamBlock(pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -359,7 +358,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
|
||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
msgLen, ver, total, size + msgLen/1048576.0);
|
||||
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
// maximum allowed processed block batches. One block may include several submit blocks
|
||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
||||
#define STREAM_RESULT_DUMP_THRESHOLD 100
|
||||
|
||||
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
@ -51,7 +51,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
|||
}
|
||||
|
||||
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
|
||||
size / 1048576.0);
|
||||
SIZE_IN_MB(size));
|
||||
|
||||
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
|
||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
|
||||
|
@ -90,6 +90,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
|
||||
taosMsleep(1000);
|
||||
continue;
|
||||
}
|
||||
|
||||
SSDataBlock* output = NULL;
|
||||
uint64_t ts = 0;
|
||||
if ((code = qExecTask(pExecutor, &output, &ts)) < 0) {
|
||||
|
@ -137,10 +143,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
taosArrayPush(pRes, &block);
|
||||
|
||||
qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
||||
pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
|
||||
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
|
||||
|
||||
// current output should be dispatched to down stream nodes
|
||||
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
|
||||
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
|
||||
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
|
||||
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -636,7 +642,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
||||
id, el, resSize / 1048576.0, totalBlocks);
|
||||
id, el, SIZE_IN_MB(resSize), totalBlocks);
|
||||
|
||||
streamFreeQitem(pInput);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue