Merge pull request #24444 from taosdata/fix/3_liaohj

fix(stream): close the inputQ of the related stream task.
This commit is contained in:
Haojun Liao 2024-01-12 13:41:30 +08:00 committed by GitHub
commit ee24bb9b03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 83 additions and 35 deletions

View File

@ -562,6 +562,7 @@ struct SStreamDispatchReq {
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamChildId; int32_t upstreamChildId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
int32_t upstreamRelTaskId;
int32_t blockNum; int32_t blockNum;
int64_t totalLen; int64_t totalLen;
SArray* dataLen; // SArray<int32_t> SArray* dataLen; // SArray<int32_t>

View File

@ -963,7 +963,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// let's decide which step should be executed now // let's decide which step should be executed now
if (pTask->execInfo.step1Start == 0) { if (pTask->execInfo.step1Start == 0) {
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
pTask->execInfo.step1Start = ts; pTask->execInfo.step1Start = ts;
tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts); tqDebug("s-task:%s start scan-history stage(step 1), status:%s, step1 startTs:%" PRId64, id, pStatus, ts);
} else { } else {

View File

@ -820,15 +820,23 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return 0; return 0;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
ASSERT(streamTaskReadyToRun(pTask, NULL));
int64_t execTs = pTask->status.lastExecTs;
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamResumeTask(pTask); if (pTask != NULL) {
char* pStatus = NULL;
if (streamTaskReadyToRun(pTask, &pStatus)) {
int64_t execTs = pTask->status.lastExecTs;
int32_t idle = taosGetTimestampMs() - execTs;
tqDebug("s-task:%s task resume to run after idle for:%dms from:%" PRId64, pTask->id.idStr, idle, execTs);
streamResumeTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, pStatus, status);
}
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
return 0; return 0;
} }

View File

@ -922,8 +922,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan
pStreamInfo->fillHistoryWindow = *pWindow; pStreamInfo->fillHistoryWindow = *pWindow;
pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2; pStreamInfo->recoverStep = STREAM_RECOVER_STEP__PREPARE2;
qDebug("%s step 2. set param for stream scanner for scan-history data, verRange:%" PRId64 " - %" PRId64 qDebug("%s step 2. set param for stream scanner scan wal, verRange:%" PRId64 " - %" PRId64 ", window:%" PRId64
", window:%" PRId64 " - %" PRId64, " - %" PRId64,
GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey, GET_TASKID(pTaskInfo), pStreamInfo->fillHistoryVer.minVer, pStreamInfo->fillHistoryVer.maxVer, pWindow->skey,
pWindow->ekey); pWindow->ekey);
return 0; return 0;
@ -1129,7 +1129,7 @@ int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow; STimeWindow* pWindow = &pTaskInfo->streamInfo.fillHistoryWindow;
qDebug("%s remove scan-history filter window:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64, qDebug("%s remove timeWindow filter:%" PRId64 "-%" PRId64 ", set new window:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX); GET_TASKID(pTaskInfo), pWindow->skey, pWindow->ekey, INT64_MIN, INT64_MAX);
pWindow->skey = INT64_MIN; pWindow->skey = INT64_MIN;

View File

@ -214,8 +214,9 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
} }
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int32_t status = 0; int32_t status = 0;
const char* id = pTask->id.idStr; SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr;
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id, stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
@ -223,7 +224,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL); ASSERT(pInfo != NULL);
if (pTask->pMeta->role == NODE_ROLE_FOLLOWER) { if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id); stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
status = TASK_INPUT_STATUS__REFUSED; status = TASK_INPUT_STATUS__REFUSED;
} else { } else {
@ -244,6 +245,22 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
} else if (pReq->type == STREAM_INPUT__TRANS_STATE) {
atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// disable the related stream task here to avoid it to receive the newly arrived data after the transfer-state
STaskId* pRelTaskId = &pTask->streamTaskId;
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pRelTaskId->streamId, pRelTaskId->taskId);
if (pStreamTask != NULL) {
atomic_add_fetch_32(&pStreamTask->upstreamInfo.numOfClosed, 1);
streamTaskCloseUpstreamInput(pStreamTask, pReq->upstreamRelTaskId);
streamMetaReleaseTask(pMeta, pStreamTask);
}
stDebug("s-task:%s close inputQ for upstream:0x%x since trans-state msgId:%d recv, rel stream-task:0x%" PRIx64
" close inputQ for upstream:0x%x",
id, pReq->upstreamTaskId, pReq->msgId, pTask->streamTaskId.taskId, pReq->upstreamRelTaskId);
} }
status = streamTaskAppendInputBlocks(pTask, pReq); status = streamTaskAppendInputBlocks(pTask, pReq);
@ -252,9 +269,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
} }
// disable the data from upstream tasks // disable the data from upstream tasks
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) { // if (streamTaskGetStatus(pTask)->state == TASK_STATUS__HALT) {
status = TASK_INPUT_STATUS__BLOCKED; // status = TASK_INPUT_STATUS__BLOCKED;
} // }
{ {
// do send response with the input status // do send response with the input status
@ -295,6 +312,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
} }
pTask->upstreamInfo.numOfClosed = 0; pTask->upstreamInfo.numOfClosed = 0;
stDebug("s-task:%s opening up inputQ from upstream tasks", pTask->id.idStr);
} }
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {

View File

@ -58,6 +58,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamRelTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1;
ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum);
@ -84,6 +85,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamRelTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1;
@ -114,6 +116,7 @@ static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTas
pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId; pReq->upstreamNodeId = pTask->info.nodeId;
pReq->upstreamRelTaskId = pTask->streamTaskId.taskId;
pReq->blockNum = numOfBlocks; pReq->blockNum = numOfBlocks;
pReq->taskId = dstTaskId; pReq->taskId = dstTaskId;
pReq->type = type; pReq->type = type;

View File

@ -371,7 +371,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// In case of sink tasks, no need to halt them. // In case of sink tasks, no need to halt them.
// In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to
// start the task state transfer procedure. // start the task state transfer procedure.
// char* p = NULL;
SStreamTaskState* pState = streamTaskGetStatus(pStreamTask); SStreamTaskState* pState = streamTaskGetStatus(pStreamTask);
status = pState->state; status = pState->state;
char* p = pState->name; char* p = pState->name;
@ -392,8 +391,12 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
} }
// 1. expand the query time window for stream task of WAL scanner // 1. expand the query time window for stream task of WAL scanner
pTimeWindow->skey = INT64_MIN; if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); pTimeWindow->skey = INT64_MIN;
qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor);
} else {
stDebug("s-task:%s non-source task no need to reset filter window", pStreamTask->id.idStr);
}
// 2. transfer the ownership of executor state // 2. transfer the ownership of executor state
streamTaskReleaseState(pTask); streamTaskReleaseState(pTask);
@ -407,10 +410,13 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
// 4. free it and remove fill-history task from disk meta-store // 4. free it and remove fill-history task from disk meta-store
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id); streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
// 5. save to disk // 5. assign the status to the value that will be kept in disk
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state;
// 6. add empty delete block // 6. open the inputQ for all upstream tasks
streamTaskOpenAllUpstreamInput(pStreamTask);
// 7. add empty delete block
if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) { if ((pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) && taosQueueEmpty(pStreamTask->inputq.queue->pQueue)) {
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0); SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);
@ -430,6 +436,8 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SStreamMeta* pMeta = pTask->pMeta;
ASSERT(pTask->status.appendTranstateBlock == 1); ASSERT(pTask->status.appendTranstateBlock == 1);
int32_t level = pTask->info.taskLevel; int32_t level = pTask->info.taskLevel;
@ -439,8 +447,14 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states.
code = streamDoTransferStateToStreamTask(pTask); code = streamDoTransferStateToStreamTask(pTask);
} else { // drop fill-history task } else { // drop fill-history task and open inputQ of sink task
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &pTask->id); SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
if (pStreamTask != NULL) {
streamTaskOpenAllUpstreamInput(pStreamTask);
streamMetaReleaseTask(pMeta, pStreamTask);
}
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
} }
return code; return code;
@ -496,16 +510,17 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
} }
} }
int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t level = pTask->info.taskLevel;
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) { if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
int32_t remain = streamAlignTransferState(pTask); int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) { if (remain > 0) {
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); stDebug("s-task:%s receive upstream trans-state msg, not sent remain:%d", id, remain);
return 0; return 0;
} }
} }
@ -536,7 +551,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
} }
} else { // non-dispatch task, do task state transfer directly } else { // non-dispatch task, do task state transfer directly
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, pTask->info.taskLevel); stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask); code = streamTransferStateToStreamTask(pTask);
@ -604,7 +619,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
} }
if (type == STREAM_INPUT__TRANS_STATE) { if (type == STREAM_INPUT__TRANS_STATE) {
streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); streamProcessTransstateBlock(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} }

View File

@ -1520,8 +1520,6 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, taskId);
streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pMeta, streamId, taskId, 0, taosGetTimestampMs(), false);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS; return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
} }

View File

@ -147,8 +147,6 @@ const char* streamQueueItemGetTypeStr(int32_t type) {
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize) { int32_t* blockSize) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel; int32_t taskLevel = pTask->info.taskLevel;

View File

@ -1029,6 +1029,11 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
} }
} else { } else {
ASSERT(pTask->info.fillHistory == 0);
if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
return;
}
int64_t ekey = 0; int64_t ekey = 0;
if (pRange->window.ekey < INT64_MAX) { if (pRange->window.ekey < INT64_MAX) {
ekey = pRange->window.ekey + 1; ekey = pRange->window.ekey + 1;
@ -1043,10 +1048,13 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
pRange->range.minVer = 0; pRange->range.minVer = 0;
pRange->range.maxVer = ver; pRange->range.maxVer = ver;
stDebug("s-task:%s level:%d related fill-history task exists, update stream calc time window:%" PRId64 " - %" PRId64 stDebug("s-task:%s level:%d related fill-history task exists, set stream task timeWindow:%" PRId64 " - %" PRId64
", verRang:%" PRId64 " - %" PRId64, ", verRang:%" PRId64 " - %" PRId64,
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, ver, INT64_MAX);
pRange->range.maxVer);
SVersionRange verRange = {.minVer = ver, .maxVer = INT64_MAX};
STimeWindow win = pRange->window;
streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
} }
} }