refactor(stream): back pressure is active, which is trigger by dispatch rsp.
This commit is contained in:
parent
725db16af9
commit
c6a322e0ef
|
@ -62,15 +62,12 @@ enum {
|
||||||
enum {
|
enum {
|
||||||
TASK_INPUT_STATUS__NORMAL = 1,
|
TASK_INPUT_STATUS__NORMAL = 1,
|
||||||
TASK_INPUT_STATUS__BLOCKED,
|
TASK_INPUT_STATUS__BLOCKED,
|
||||||
TASK_INPUT_STATUS__RECOVER,
|
|
||||||
TASK_INPUT_STATUS__STOP,
|
|
||||||
TASK_INPUT_STATUS__FAILED,
|
TASK_INPUT_STATUS__FAILED,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_OUTPUT_STATUS__NORMAL = 1,
|
TASK_OUTPUT_STATUS__NORMAL = 1,
|
||||||
TASK_OUTPUT_STATUS__WAIT,
|
TASK_OUTPUT_STATUS__WAIT,
|
||||||
TASK_OUTPUT_STATUS__BLOCKED,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
|
|
@ -761,17 +761,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver, int64_t checkpoi
|
||||||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||||
pTask->pMeta = pTq->pStreamMeta;
|
pTask->pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
// checkpoint exists, restore from the last checkpoint
|
|
||||||
// if (pTask->chkInfo.checkpointId != 0) {
|
|
||||||
// ASSERT(pTask->chkInfo.checkpointVer > 0);
|
|
||||||
// pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer;
|
|
||||||
// pTask->dataRange.range.maxVer = pTask->chkInfo.checkpointVer;
|
|
||||||
// pTask->dataRange.range.minVer = pTask->chkInfo.checkpointVer;
|
|
||||||
// } else {
|
|
||||||
pTask->chkInfo.currentVer = ver;
|
pTask->chkInfo.currentVer = ver;
|
||||||
pTask->dataRange.range.maxVer = ver;
|
pTask->dataRange.range.maxVer = ver;
|
||||||
pTask->dataRange.range.minVer = ver;
|
pTask->dataRange.range.minVer = ver;
|
||||||
//}
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
SStreamTask* pSateTask = pTask;
|
SStreamTask* pSateTask = pTask;
|
||||||
|
|
|
@ -240,8 +240,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = pTask->status.taskStatus;
|
int32_t status = pTask->status.taskStatus;
|
||||||
|
const char* pStatus = streamGetTaskStatusStr(status);
|
||||||
if (status != TASK_STATUS__NORMAL) {
|
if (status != TASK_STATUS__NORMAL) {
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -252,6 +253,12 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
tqDebug("s-task:%s inputQ is blocked, do nothing", pTask->id.idStr);
|
||||||
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
*pScanIdle = false;
|
*pScanIdle = false;
|
||||||
|
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
|
|
|
@ -244,10 +244,10 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) {
|
||||||
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
uint32_t* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
|
SStreamTask* p = *(SStreamTask**)taosHashGet(pMeta->pTasks, pTaskId, sizeof(*pTaskId));
|
||||||
|
|
||||||
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
|
||||||
p->chkInfo.checkpointId = p->checkpointingId;
|
|
||||||
|
|
||||||
int8_t prev = p->status.taskStatus;
|
int8_t prev = p->status.taskStatus;
|
||||||
|
ASSERT(p->chkInfo.checkpointId < p->checkpointingId && p->checkpointingId == checkpointId);
|
||||||
|
|
||||||
|
p->chkInfo.checkpointId = p->checkpointingId;
|
||||||
p->status.taskStatus = TASK_STATUS__NORMAL;
|
p->status.taskStatus = TASK_STATUS__NORMAL;
|
||||||
|
|
||||||
// save the task
|
// save the task
|
||||||
|
|
|
@ -794,13 +794,13 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo record the idle time for dispatch data
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// dispatch message failed: network error, or node not available.
|
// dispatch message failed: network error, or node not available.
|
||||||
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
|
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
|
||||||
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||||
// happened too fast. todo handle the shuffle dispatch failure
|
// happened too fast.
|
||||||
|
// todo handle the shuffle dispatch failure
|
||||||
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
|
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
|
||||||
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||||
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||||
|
@ -810,7 +810,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus,
|
||||||
|
code);
|
||||||
|
|
||||||
// there are other dispatch message not response yet
|
// there are other dispatch message not response yet
|
||||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
@ -828,14 +829,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
|
|
||||||
// the input queue of the (down stream) task that receive the output data is full,
|
// the input queue of the (down stream) task that receive the output data is full,
|
||||||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||||
// todo blocking the output status
|
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
|
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||||
|
|
||||||
int32_t waitDuration = 300; // 300 ms
|
|
||||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
|
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
|
||||||
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration);
|
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
streamRetryDispatchStreamBlock(pTask, waitDuration);
|
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
} else { // pipeline send data in output queue
|
} else { // pipeline send data in output queue
|
||||||
// this message has been sent successfully, let's try next one.
|
// this message has been sent successfully, let's try next one.
|
||||||
destroyStreamDataBlock(pTask->msgInfo.pData);
|
destroyStreamDataBlock(pTask->msgInfo.pData);
|
||||||
|
@ -843,8 +842,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
|
|
||||||
if (pTask->msgInfo.blockingTs != 0) {
|
if (pTask->msgInfo.blockingTs != 0) {
|
||||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
|
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
|
||||||
qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%" PRId64 "ms", pTask->id.idStr, el);
|
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
||||||
|
pTask->id.idStr, pRsp->downstreamTaskId, el);
|
||||||
pTask->msgInfo.blockingTs = 0;
|
pTask->msgInfo.blockingTs = 0;
|
||||||
|
|
||||||
|
// put data into inputQ of current task is also allowed
|
||||||
|
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// now ready for next data output
|
// now ready for next data output
|
||||||
|
|
|
@ -512,7 +512,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// blocked by downstream task
|
// blocked by downstream task
|
||||||
if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) {
|
if (pTask->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue