fix(stream): close the inputQ of the related stream task when the fill-history task recv the trans-state msg from upstream.
This commit is contained in:
parent
9703018f56
commit
b36cc97236
|
@ -562,6 +562,7 @@ struct SStreamDispatchReq {
|
|||
int32_t upstreamTaskId;
|
||||
int32_t upstreamChildId;
|
||||
int32_t upstreamNodeId;
|
||||
int32_t upstreamRelTaskId;
|
||||
int32_t blockNum;
|
||||
int64_t totalLen;
|
||||
SArray* dataLen; // SArray<int32_t>
|
||||
|
|
|
@ -963,7 +963,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// let's decide which step should be executed now
|
||||
if (pTask->execInfo.step1Start == 0) {
|
||||
int64_t ts = taosGetTimestampMs();
|
||||
|
||||
pTask->execInfo.step1Start = ts;
|
||||
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
|
||||
} else {
|
||||
|
|
|
@ -922,8 +922,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
|
|||
pStreamInfo->fillHistoryWindow = *pWindow;
|
||||
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
|
||||
|
||||
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64
|
||||
", window:%" PRId64 " - %" PRId64,
|
||||
qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
|
||||
" - %" PRId64,
|
||||
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
|
||||
pWindow->ekey);
|
||||
return 0;
|
||||
|
@ -1129,7 +1129,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
|
|||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
|
||||
|
||||
qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
|
||||
qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
|
||||
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
|
||||
|
||||
pWindow->skey = INT64_MIN;
|
||||
|
|
|
@ -214,8 +214,9 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
int32_t status = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t status = 0;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
|
||||
|
@ -223,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
||||
ASSERT(pInfo != NULL);
|
||||
|
||||
if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
||||
status = TASK_INPUT_STATUS__REFUSED;
|
||||
} else {
|
||||
|
@ -244,6 +245,22 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
||||
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
|
||||
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
|
||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||
|
||||
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
|
||||
STaskId* pRelTaskId = &pTask->streamTaskId;
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
|
||||
if (pStreamTask != NULL) {
|
||||
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
|
||||
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
}
|
||||
|
||||
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
|
||||
" close inputQ for upstream:0x%x",
|
||||
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
|
||||
}
|
||||
|
||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||
|
@ -252,9 +269,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
}
|
||||
|
||||
// disable the data from upstream tasks
|
||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
|
||||
status = TASK_INPUT_STATUS__BLOCKED;
|
||||
}
|
||||
// if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
|
||||
// status = TASK_INPUT_STATUS__BLOCKED;
|
||||
// }
|
||||
|
||||
{
|
||||
// do send response with the input status
|
||||
|
@ -295,6 +312,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
pTask->upstreamInfo.numOfClosed = 0;
|
||||
stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr);
|
||||
}
|
||||
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
|
||||
|
|
|
@ -58,6 +58,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
|
||||
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
|
||||
|
@ -84,6 +85,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
|
||||
|
||||
|
@ -114,6 +116,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
|
|||
pReq->upstreamTaskId = pTask->id.taskId;
|
||||
pReq->upstreamChildId = pTask->info.selfChildId;
|
||||
pReq->upstreamNodeId = pTask->info.nodeId;
|
||||
pReq->upstreamRelTaskId = pTask->streamTaskId.taskId;
|
||||
pReq->blockNum = numOfBlocks;
|
||||
pReq->taskId = dstTaskId;
|
||||
pReq->type = type;
|
||||
|
|
|
@ -371,7 +371,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
// In case of sink tasks, no need to halt them.
|
||||
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
|
||||
// start the task state transfer procedure.
|
||||
// char* p = NULL;
|
||||
SStreamTaskState* pState = streamTaskGetStatus(pStreamTask);
|
||||
status = pState->state;
|
||||
char* p = pState->name;
|
||||
|
@ -392,8 +391,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// 1. expand the query time window for stream task of WAL scanner
|
||||
pTimeWindow->skey = INT64_MIN;
|
||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
pTimeWindow->skey = INT64_MIN;
|
||||
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
|
||||
} else {
|
||||
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
// 2. transfer the ownership of executor state
|
||||
streamTaskReleaseState(pTask);
|
||||
|
@ -407,10 +410,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
// 4. free it and remove fill-history task from disk meta-store
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
|
||||
// 5. save to disk
|
||||
// 5. assign the status to the value that will be kept in disk
|
||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
|
||||
|
||||
// 6. add empty delete block
|
||||
// 6. open the inputQ for all upstream tasks
|
||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||
|
||||
// 7. add empty delete block
|
||||
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
|
||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
|
||||
|
||||
|
@ -430,6 +436,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
ASSERT(pTask->status.appendTranstateBlock == 1);
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
|
@ -439,8 +447,14 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
|
||||
code = streamDoTransferStateToStreamTask(pTask);
|
||||
} else { // drop fill-history task
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id);
|
||||
} else { // drop fill-history task and open inputQ of sink task
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
if (pStreamTask != NULL) {
|
||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
}
|
||||
|
||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -496,16 +510,17 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
|
||||
int32_t remain = streamAlignTransferState(pTask);
|
||||
|
||||
if (remain > 0) {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
stDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
|
||||
stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -536,7 +551,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
}
|
||||
} else { // non-dispatch task, do task state transfer directly
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel);
|
||||
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
|
@ -604,7 +619,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
if (type == STREAM_INPUT__TRANS_STATE) {
|
||||
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -147,8 +147,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
|
|||
|
||||
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
|
||||
int32_t* blockSize) {
|
||||
int32_t retryTimes = 0;
|
||||
int32_t MAX_RETRY_TIMES = 5;
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t taskLevel = pTask->info.taskLevel;
|
||||
|
||||
|
|
|
@ -1029,6 +1029,11 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
|||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
}
|
||||
} else {
|
||||
ASSERT(pTask->info.fillHistory == 0);
|
||||
if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t ekey = 0;
|
||||
if (pRange->window.ekey < INT64_MAX) {
|
||||
ekey = pRange->window.ekey + 1;
|
||||
|
@ -1043,10 +1048,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
|||
pRange->range.minVer = 0;
|
||||
pRange->range.maxVer = ver;
|
||||
|
||||
stDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64
|
||||
stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64
|
||||
", verRang:%" PRId64 " - %" PRId64,
|
||||
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||
pRange->range.maxVer);
|
||||
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX);
|
||||
|
||||
SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX};
|
||||
STimeWindow win = pRange->window;
|
||||
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue