fix stream backend crash when transfer
This commit is contained in:
parent
0840e6a0d8
commit
1dea66d4f5
|
@ -14,10 +14,10 @@
|
|||
*/
|
||||
|
||||
#include "streamInt.h"
|
||||
#include "streamsm.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "wal.h"
|
||||
#include "streamsm.h"
|
||||
|
||||
#define SCANHISTORY_IDLE_TIME_SLICE 100 // 100ms
|
||||
#define SCANHISTORY_MAX_IDLE_TIME 10 // 10 sec
|
||||
|
@ -115,8 +115,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
|
|||
// release the task.
|
||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||
} else {
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
|
||||
&pTask->schedHistoryInfo.pTimer);
|
||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,7 +134,7 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration)
|
|||
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref);
|
||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
|
||||
|
||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
|
||||
|
@ -290,7 +289,8 @@ static void recheckDownstreamTasks(void* param, void* tmrId) {
|
|||
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
|
||||
}
|
||||
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage, int64_t* oldStage) {
|
||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||
int64_t* oldStage) {
|
||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||
ASSERT(pInfo != NULL);
|
||||
|
||||
|
@ -374,7 +374,7 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
|
|||
void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||
EStreamTaskEvent event;
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
event = HAS_RELATED_FILLHISTORY_TASK(pTask)? TASK_EVENT_INIT_STREAM_SCANHIST:TASK_EVENT_INIT;
|
||||
event = HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
} else {
|
||||
event = TASK_EVENT_INIT_SCANHIST;
|
||||
}
|
||||
|
@ -418,7 +418,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
|
||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
||||
bool found = false;
|
||||
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
|
||||
for (int32_t i = 0; i < numOfReqs; i++) {
|
||||
|
@ -457,8 +456,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
} else { // not ready, wait for 100ms and retry
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||
stError(
|
||||
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", "
|
||||
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
||||
", current stage:%" PRId64
|
||||
", "
|
||||
"not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
||||
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
|
||||
|
@ -476,17 +476,22 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
// automatically set the related fill-history task to be failed.
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
STaskId* pId = &pTask->hTaskInfo.id;
|
||||
int64_t current = taosGetTimestampMs();
|
||||
|
||||
SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pId->streamId, pId->taskId);
|
||||
streamMetaUpdateTaskDownstreamStatus(pHTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init,
|
||||
taosGetTimestampMs(), false);
|
||||
if (pHTask != NULL) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, pHTask->execInfo.init, current,
|
||||
false);
|
||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||
} else {
|
||||
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pId->streamId, pId->taskId, 0, current, false);
|
||||
}
|
||||
}
|
||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id,
|
||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
||||
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer);
|
||||
}
|
||||
|
@ -496,7 +501,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
|||
}
|
||||
|
||||
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
|
||||
SRpcHandleInfo *pRpcInfo, int32_t taskId) {
|
||||
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
|
||||
SEncoder encoder;
|
||||
int32_t code;
|
||||
int32_t len;
|
||||
|
@ -533,11 +538,11 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// source
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
|
||||
return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
|
||||
}
|
||||
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow) {
|
||||
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
|
||||
return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
|
||||
}
|
||||
|
||||
|
@ -567,7 +572,7 @@ int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) {
|
|||
pBlock->info.rows = 1;
|
||||
pBlock->info.childId = pTask->info.selfChildId;
|
||||
|
||||
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
|
||||
pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock)); // pBlock;
|
||||
taosArrayPush(pTranstate->blocks, pBlock);
|
||||
|
||||
taosMemoryFree(pBlock);
|
||||
|
@ -601,8 +606,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
ETaskStatus status = streamTaskGetStatus(pTask, &p);
|
||||
|
||||
if (status != TASK_STATUS__SCAN_HISTORY && status != TASK_STATUS__STREAM_SCAN_HISTORY) {
|
||||
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly",
|
||||
id, p, pReq->upstreamTaskId);
|
||||
stError("s-task:%s not in scan-history status, status:%s return upstream:0x%x scan-history finish directly", id, p,
|
||||
pReq->upstreamTaskId);
|
||||
|
||||
void* pBuf = NULL;
|
||||
int32_t len = 0;
|
||||
|
@ -612,8 +617,8 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
|
||||
|
||||
tmsgSendRsp(&msg);
|
||||
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id,
|
||||
taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
|
||||
stDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", id, taskLevel,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -642,13 +647,13 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
|
|||
|
||||
// mnode will not send the pause/resume message to the sink task, so no need to enable the pause for sink tasks.
|
||||
if (taskLevel == TASK_LEVEL__AGG) {
|
||||
/*int32_t code = */streamTaskScanHistoryDataComplete(pTask);
|
||||
/*int32_t code = */ streamTaskScanHistoryDataComplete(pTask);
|
||||
} else { // for sink task, set normal
|
||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||
}
|
||||
} else {
|
||||
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d",
|
||||
id, pReq->upstreamTaskId, pReq->childId, left);
|
||||
stDebug("s-task:%s receive scan-history data finish msg from upstream:0x%x(index:%d), unfinished:%d", id,
|
||||
pReq->upstreamTaskId, pReq->childId, left);
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -687,9 +692,9 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
|
|||
|
||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||
" verRange:%" PRId64 " - %" PRId64", init:%"PRId64,
|
||||
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey,
|
||||
pRange->range.minVer, pRange->range.maxVer, pHTask->execInfo.init);
|
||||
" verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
|
||||
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||
pRange->range.maxVer, pHTask->execInfo.init);
|
||||
} else {
|
||||
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
|
||||
}
|
||||
|
@ -724,7 +729,6 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pInfo->id.streamId, pInfo->id.taskId);
|
||||
if (pTask != NULL) {
|
||||
|
||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||
|
||||
pHTaskInfo->tickCount -= 1;
|
||||
|
|
Loading…
Reference in New Issue