fix(stream): pause when outputQ is blocked.
This commit is contained in:
parent
f8f8e7a8f0
commit
bb4f8515bd
|
@ -26,6 +26,9 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define ONE_MB_F (1048576.0)
|
||||||
|
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
void* timer;
|
void* timer;
|
||||||
|
|
|
@ -18,9 +18,8 @@
|
||||||
|
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
#define STREAM_TASK_INPUT_QUEUE_CAPACITY 20480
|
||||||
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
#define STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE (30)
|
||||||
#define ONE_MB_F (1048576.0)
|
|
||||||
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
|
||||||
|
|
||||||
|
#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q) / ONE_MB_F)
|
||||||
SStreamGlobalEnv streamEnv;
|
SStreamGlobalEnv streamEnv;
|
||||||
|
|
||||||
int32_t streamInit() {
|
int32_t streamInit() {
|
||||||
|
@ -178,7 +177,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo add log
|
|
||||||
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t type = pTask->outputInfo.type;
|
int32_t type = pTask->outputInfo.type;
|
||||||
|
@ -191,11 +189,12 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
||||||
} else {
|
} else {
|
||||||
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
|
||||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||||
if (code != 0) { // todo failed to add it into the output queue, free it.
|
if (code != 0) {
|
||||||
return code;
|
qError("s-task:%s failed to put res into outputQ", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamDispatchStreamBlock(pTask);
|
streamDispatchStreamBlock(pTask);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -359,7 +358,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
|
||||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
msgLen, ver, total, size + msgLen/1048576.0);
|
msgLen, ver, total, size + SIZE_IN_MB(msgLen));
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
if (/*(pTask->info.taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
||||||
|
|
|
@ -18,7 +18,7 @@
|
||||||
// maximum allowed processed block batches. One block may include several submit blocks
|
// maximum allowed processed block batches. One block may include several submit blocks
|
||||||
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
#define MAX_STREAM_EXEC_BATCH_NUM 32
|
||||||
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
#define MIN_STREAM_EXEC_BATCH_NUM 4
|
||||||
#define MAX_STREAM_RESULT_DUMP_THRESHOLD 100
|
#define STREAM_RESULT_DUMP_THRESHOLD 100
|
||||||
|
|
||||||
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
static int32_t updateCheckPointInfo(SStreamTask* pTask);
|
||||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
|
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask);
|
||||||
|
@ -51,7 +51,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
|
qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
|
||||||
size / 1048576.0);
|
SIZE_IN_MB(size));
|
||||||
|
|
||||||
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
|
code = streamTaskOutputResultBlock(pTask, pStreamBlocks);
|
||||||
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
|
if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position
|
||||||
|
@ -137,10 +137,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
||||||
taosArrayPush(pRes, &block);
|
taosArrayPush(pRes, &block);
|
||||||
|
|
||||||
qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
qDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
|
||||||
pTask->info.selfChildId, numOfBlocks, size / 1048576.0);
|
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size));
|
||||||
|
|
||||||
// current output should be dispatched to down stream nodes
|
// current output should be dispatched to down stream nodes
|
||||||
if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) {
|
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD) {
|
||||||
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
|
ASSERT(numOfBlocks == taosArrayGetSize(pRes));
|
||||||
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
|
code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -570,6 +570,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
qWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", pTask->id.idStr);
|
||||||
|
taosMsleep(1000);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
qDebug("s-task:%s start to extract data block from inputQ", id);
|
||||||
|
|
||||||
|
@ -636,7 +642,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d",
|
||||||
id, el, resSize / 1048576.0, totalBlocks);
|
id, el, SIZE_IN_MB(resSize), totalBlocks);
|
||||||
|
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue