fix(stream): set the fill history status.
This commit is contained in:
parent
fb24ed161d
commit
97da2a8fac
|
@ -300,7 +300,6 @@ enum {
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRANSFER_STATE, "vnode-stream-transfer-state", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ enum {
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__WAIT_DOWNSTREAM,
|
TASK_STATUS__WAIT_DOWNSTREAM,
|
||||||
TASK_STATUS__SCAN_HISTORY_PREPARE,
|
TASK_STATUS__SCAN_HISTORY,
|
||||||
TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user
|
TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user
|
||||||
TASK_STATUS__PAUSE,
|
TASK_STATUS__PAUSE,
|
||||||
};
|
};
|
||||||
|
@ -271,6 +271,7 @@ typedef struct SStreamStatus {
|
||||||
int8_t schedStatus;
|
int8_t schedStatus;
|
||||||
int8_t keepTaskStatus;
|
int8_t keepTaskStatus;
|
||||||
bool transferState;
|
bool transferState;
|
||||||
|
TdThreadMutex lock;
|
||||||
} SStreamStatus;
|
} SStreamStatus;
|
||||||
|
|
||||||
typedef struct SHistDataRange {
|
typedef struct SHistDataRange {
|
||||||
|
@ -585,7 +586,7 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req*
|
||||||
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
|
int32_t streamSourceRecoverScanStep1(SStreamTask* pTask);
|
||||||
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
|
int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* pReq);
|
||||||
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
|
int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver);
|
||||||
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask);
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
|
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -820,7 +820,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->dataRange.range.minVer = ver;
|
pTask->dataRange.range.minVer = ver;
|
||||||
|
|
||||||
// expand executor
|
// expand executor
|
||||||
pTask->status.taskStatus = /*(pTask->info.fillHistory) ? */TASK_STATUS__WAIT_DOWNSTREAM /*: TASK_STATUS__NORMAL*/;
|
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1);
|
||||||
|
@ -923,8 +923,8 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%d, rsp status %d",
|
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
|
||||||
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, pTask->status.taskStatus, rsp.status);
|
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = 0;
|
rsp.status = 0;
|
||||||
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
||||||
|
@ -1088,7 +1088,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// do recovery step 1
|
// do recovery step 1
|
||||||
tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr);
|
tqDebug("s-task:%s start history data scan stage(step 1), status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
// todo set the correct status flag
|
// todo set the correct status flag
|
||||||
|
@ -1116,7 +1117,19 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
|
// wait for the stream task get ready for scan history data
|
||||||
|
while (pStreamTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM ||
|
||||||
|
pStreamTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||||
|
tqDebug("s-task:%s level:%d not ready for halt, wait for 100ms and recheck", pStreamTask->id.idStr,
|
||||||
|
pStreamTask->info.taskLevel);
|
||||||
|
taosMsleep(100);
|
||||||
|
}
|
||||||
|
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
|
tqDebug("s-task:%s level:%d status is set to halt by history scan task:%s", pStreamTask->id.idStr,
|
||||||
|
pStreamTask->info.taskLevel, pTask->id.idStr);
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
int64_t ver = pTask->dataRange.range.maxVer;
|
int64_t ver = pTask->dataRange.range.maxVer;
|
||||||
|
@ -1158,14 +1171,20 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// notify the downstream tasks to transfer executor state after handle all history blocks.
|
// notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SStreamTransferReq* pReq = (SStreamTransferReq*)msg;
|
SStreamTransferReq req;
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
|
||||||
|
int32_t code = tDecodeStreamRecoverFinishReq(&decoder, &req);
|
||||||
|
|
||||||
|
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
|
tqError("failed to find task:0x%x", req.taskId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1212,7 +1231,7 @@ int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t sversion, char* msg, int
|
||||||
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
|
tqDebug("s-task:%s step2 recover finished, el:%.2fs", pTask->id.idStr, el);
|
||||||
|
|
||||||
// dispatch recover finish req to all related downstream task
|
// dispatch recover finish req to all related downstream task
|
||||||
code = streamDispatchRecoverFinishMsg(pTask);
|
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -135,9 +135,8 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__SCAN_HISTORY_PREPARE ||
|
if (status != TASK_STATUS__NORMAL) {
|
||||||
status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
|
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -573,16 +573,22 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_CHECK:
|
case TDMT_STREAM_TASK_CHECK:
|
||||||
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_CHECK_RSP:
|
case TDMT_STREAM_TASK_CHECK_RSP: {
|
||||||
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
|
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pReq, len);
|
||||||
|
}
|
||||||
case TDMT_STREAM_RETRIEVE:
|
case TDMT_STREAM_RETRIEVE:
|
||||||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE_RSP:
|
case TDMT_STREAM_RETRIEVE_RSP:
|
||||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_STREAM_TRANSFER_STATE:
|
case TDMT_STREAM_TRANSFER_STATE: {
|
||||||
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
|
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pReq, len);
|
||||||
|
}
|
||||||
case TDMT_STREAM_RECOVER_FINISH:
|
case TDMT_STREAM_RECOVER_FINISH:
|
||||||
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RECOVER_FINISH_RSP:
|
case TDMT_STREAM_RECOVER_FINISH_RSP:
|
||||||
|
|
|
@ -312,8 +312,7 @@ int32_t streamDoDispatchRecoverFinishMsg(SStreamTask* pTask, const SStreamRecove
|
||||||
msg.info.noResp = 1;
|
msg.info.noResp = 1;
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &msg);
|
tmsgSendReq(pEpSet, &msg);
|
||||||
qDebug("s-task:%s dispatch recover finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
|
qDebug("s-task:%s dispatch scan-history-data finish msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, pReq->taskId, vgId);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -326,15 +326,16 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
SStreamQueueItem* pInput = NULL;
|
SStreamQueueItem* pInput = NULL;
|
||||||
|
|
||||||
// merge multiple input data if possible in the input queue.
|
// merge multiple input data if possible in the input queue.
|
||||||
qDebug("s-task:%s start to extract data block from inputQ", id);
|
qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// downstream task's input queue is blocked, stop immediately
|
// downstream task's input queue is blocked, stop immediately
|
||||||
if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_INPUT_STATUS__BLOCKED) ||
|
if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) ||
|
||||||
streamTaskShouldStop(&pTask->status)) {
|
streamTaskShouldStop(&pTask->status)) {
|
||||||
if (batchSize > 1) {
|
if (batchSize > 1) {
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
qDebug("123 %s", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -382,10 +383,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
if (pInput) {
|
if (pInput) {
|
||||||
streamFreeQitem(pInput);
|
streamFreeQitem(pInput);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("456 %s", pTask->id.idStr);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
|
qDebug("789 %s", pTask->id.idStr);
|
||||||
|
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
// todo transfer task state here
|
// todo transfer task state here
|
||||||
|
|
||||||
|
@ -393,12 +398,25 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
||||||
|
|
||||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||||
|
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||||
|
|
||||||
|
if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
||||||
|
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " transfer to %" PRId64 " - %" PRId64
|
||||||
qDebug("s-task:%s stream task window %"PRId64" - %"PRId64" transfer to %"PRId64" - %"PRId64", status:%d, sched-status:%d", pStreamTask->id.idStr,
|
", status:%s, sched-status:%d",
|
||||||
pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, TASK_STATUS__NORMAL, pStreamTask->status.schedStatus);
|
pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN,
|
||||||
|
pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus);
|
||||||
|
} else {
|
||||||
|
// for agg task and sink task, they are continue to execute, no need to be halt.
|
||||||
|
// the process should be stopped for a while, during the term of transfer task state.
|
||||||
|
// OR wait for the inputQ && outputQ of agg tasks are all consumed, and then start the state transfer
|
||||||
|
|
||||||
|
|
||||||
|
qDebug("s-task:%s no need to update time window", pStreamTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
pTimeWindow->skey = INT64_MIN;
|
pTimeWindow->skey = INT64_MIN;
|
||||||
|
|
||||||
streamSetStatusNormal(pStreamTask);
|
streamSetStatusNormal(pStreamTask);
|
||||||
|
@ -406,6 +424,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
|
|
||||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -482,7 +501,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%d, sched-status:%d", pTask->id.idStr, pTask->status.taskStatus,
|
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->status.schedStatus);
|
pTask->status.schedStatus);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
|
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
|
||||||
|
|
|
@ -21,22 +21,19 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
switch(status) {
|
switch(status) {
|
||||||
case TASK_STATUS__NORMAL: return "normal";
|
case TASK_STATUS__NORMAL: return "normal";
|
||||||
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
|
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
|
||||||
case TASK_STATUS__SCAN_HISTORY_PREPARE: return "scan-history-prepare";
|
case TASK_STATUS__SCAN_HISTORY: return "scan-history";
|
||||||
case TASK_STATUS__HALT: return "halt";
|
case TASK_STATUS__HALT: return "halt";
|
||||||
|
case TASK_STATUS__PAUSE: return "paused";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId);
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__SCAN_HISTORY_PREPARE);
|
|
||||||
|
|
||||||
SVersionRange* pRange = &pTask->dataRange.range;
|
SVersionRange* pRange = &pTask->dataRange.range;
|
||||||
qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr,
|
qDebug("s-task:%s vgId:%d task status:%s and start to scan-history-data task, ver:%" PRId64 " - %" PRId64,
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->dataRange.range.minVer,
|
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||||
pTask->dataRange.range.maxVer);
|
pTask->dataRange.range.minVer, pTask->dataRange.range.maxVer);
|
||||||
|
|
||||||
streamSetParamForRecover(pTask);
|
streamSetParamForRecover(pTask);
|
||||||
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
|
streamSourceRecoverPrepareStep1(pTask, pRange, &pTask->dataRange.window);
|
||||||
|
@ -63,7 +60,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
||||||
streamAggRecoverPrepare(pTask);
|
streamAggRecoverPrepare(pTask);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
streamSetStatusNormal(pTask);
|
streamSetStatusNormal(pTask);
|
||||||
qDebug("s-task:%s sink task convert to normal immediately", pTask->id.idStr);
|
qDebug("s-task:%s sink task convert to normal status immediately", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -181,7 +178,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
taosArrayDestroy(pTask->checkReqIds);
|
taosArrayDestroy(pTask->checkReqIds);
|
||||||
pTask->checkReqIds = NULL;
|
pTask->checkReqIds = NULL;
|
||||||
|
|
||||||
qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", id, numOfReqs);
|
qDebug("s-task:%s all %d downstream tasks are ready, now enter into scan-history-data stage", id, numOfReqs);
|
||||||
streamTaskLaunchRecover(pTask);
|
streamTaskLaunchRecover(pTask);
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
|
qDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, remain not ready:%d", id,
|
||||||
|
@ -192,7 +189,12 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s fixed downstream tasks is ready, now enter into recover stage", id);
|
ASSERT(pTask->status.taskStatus != TASK_STATUS__HALT);
|
||||||
|
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
|
||||||
|
|
||||||
|
qDebug("s-task:%s fixed downstream task is ready, now enter into scan-history-data stage, status:%s", id,
|
||||||
|
streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
streamTaskLaunchRecover(pTask);
|
streamTaskLaunchRecover(pTask);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -219,6 +221,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
int32_t streamSetStatusNormal(SStreamTask* pTask) {
|
||||||
|
qDebug("s-task:%s set task status to be normal, prev:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -264,25 +267,29 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamDispatchRecoverFinishMsg(SStreamTask* pTask) {
|
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
||||||
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
|
qDebug("s-task:%s send scan-history-data complete msg to downstream (fix-dispatch) to taskId:0x%x, status:%s", pTask->id.idStr,
|
||||||
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
|
pTask->fixedEpDispatcher.taskId, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
|
||||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
streamDoDispatchRecoverFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||||
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
int32_t vgSz = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
for (int32_t i = 0; i < vgSz; i++) {
|
|
||||||
|
qDebug("s-task:%s send scan-history-data complete msg to downstream (shuffle-dispatch) %d tasks, status:%s", pTask->id.idStr,
|
||||||
|
numOfVgs, streamGetTaskStatusStr(pTask->status.taskStatus));
|
||||||
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
req.taskId = pVgInfo->taskId;
|
req.taskId = pVgInfo->taskId;
|
||||||
streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamDoDispatchRecoverFinishMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -333,7 +340,7 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// serialize
|
// serialize
|
||||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
|
qDebug("s-task:%s send transfer state msg to downstream (fix-dispatch) to taskId:0x%x, status:%d", pTask->id.idStr,
|
||||||
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
|
pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus);
|
||||||
|
|
||||||
req.taskId = pTask->fixedEpDispatcher.taskId;
|
req.taskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
@ -375,11 +382,15 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) {
|
||||||
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) {
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
int32_t left = atomic_sub_fetch_32(&pTask->numOfWaitingUpstream, 1);
|
||||||
qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left);
|
|
||||||
ASSERT(left >= 0);
|
ASSERT(left >= 0);
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
|
qDebug("s-task:%s all %d upstream tasks finish scan-history data", pTask->id.idStr, left);
|
||||||
streamAggChildrenRecoverFinish(pTask);
|
streamAggChildrenRecoverFinish(pTask);
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s remain unfinished upstream tasks:%d", pTask->id.idStr, left);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -388,10 +399,14 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
|
||||||
pHTask->dataRange.range.minVer = 0;
|
pHTask->dataRange.range.minVer = 0;
|
||||||
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
|
||||||
|
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
|
qDebug("s-task:%s set the launch condition for fill history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||||
" ver range:%" PRId64 " - %" PRId64,
|
" ver range:%" PRId64 " - %" PRId64,
|
||||||
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
|
pTask->id.idStr, pHTask->id.idStr, pHTask->dataRange.window.skey, pHTask->dataRange.window.ekey,
|
||||||
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
|
pHTask->dataRange.range.minVer, pHTask->dataRange.range.maxVer);
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s no fill history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
|
||||||
|
}
|
||||||
|
|
||||||
// check if downstream tasks have been ready
|
// check if downstream tasks have been ready
|
||||||
streamTaskCheckDownstreamTasks(pHTask);
|
streamTaskCheckDownstreamTasks(pHTask);
|
||||||
|
@ -444,13 +459,6 @@ int32_t streamTaskStartHistoryTask(SStreamTask* pTask, int64_t ver) {
|
||||||
|
|
||||||
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
SStreamMeta* pMeta = pTask->pMeta;
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
qDebug("s-task:%s set start wal scan start ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer);
|
|
||||||
ASSERT(walReaderGetCurrentVer(pTask->exec.pWalReader) == -1);
|
|
||||||
|
|
||||||
// walReaderSeekVer(pTask->exec.pWalReader, sversion);
|
|
||||||
// pTask->chkInfo.currentVer = sversion;
|
|
||||||
|
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -462,17 +470,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// dispatch recover finish req to all related downstream task
|
// dispatch recover finish req to all related downstream task
|
||||||
code = streamDispatchRecoverFinishMsg(pTask);
|
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set status normal
|
ASSERT(pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY);
|
||||||
qDebug("s-task:%s set the status to be normal, and start wal scan", pTask->id.idStr);
|
/*code = */streamSetStatusNormal(pTask);
|
||||||
code = streamSetStatusNormal(pTask);
|
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
|
Loading…
Reference in New Issue