enh(stream): set correct task status.
This commit is contained in:
parent
78a240be8f
commit
fb24ed161d
|
@ -252,6 +252,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
|
||||||
|
@ -298,8 +299,8 @@ enum {
|
||||||
|
|
||||||
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRANSFER_STATE, "vnode-stream-transfer-state", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||||
|
|
||||||
|
|
|
@ -45,8 +45,8 @@ enum {
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__WAIT_DOWNSTREAM,
|
TASK_STATUS__WAIT_DOWNSTREAM,
|
||||||
TASK_STATUS__RECOVER_PREPARE,
|
TASK_STATUS__SCAN_HISTORY_PREPARE,
|
||||||
TASK_STATUS__RECOVER1,
|
TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user
|
||||||
TASK_STATUS__PAUSE,
|
TASK_STATUS__PAUSE,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -577,6 +577,8 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask);
|
||||||
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
int32_t streamSetParamForRecover(SStreamTask* pTask);
|
||||||
int32_t streamRestoreParam(SStreamTask* pTask);
|
int32_t streamRestoreParam(SStreamTask* pTask);
|
||||||
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
int32_t streamSetStatusNormal(SStreamTask* pTask);
|
||||||
|
const char* streamGetTaskStatusStr(int32_t status);
|
||||||
|
|
||||||
// source level
|
// source level
|
||||||
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
|
int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow);
|
||||||
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
|
int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq);
|
||||||
|
|
|
@ -244,7 +244,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -1065,7 +1065,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
char* msg = pMsg->pCont;
|
char* msg = pMsg->pCont;
|
||||||
int32_t msgLen = pMsg->contLen;
|
int32_t msgLen = pMsg->contLen;
|
||||||
|
@ -1091,10 +1091,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr);
|
tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr);
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// todo set the correct status flag
|
||||||
|
int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE,
|
||||||
|
TASK_SCHED_STATUS__WAITING);
|
||||||
|
if (schedStatus != TASK_SCHED_STATUS__INACTIVE) {
|
||||||
|
ASSERT(0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
streamSourceRecoverScanStep1(pTask);
|
streamSourceRecoverScanStep1(pTask);
|
||||||
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) {
|
||||||
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
|
tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr);
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -1109,19 +1116,23 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo here we should use another flag to avoid user resume the task
|
pStreamTask->status.taskStatus = TASK_STATUS__HALT;
|
||||||
pStreamTask->status.taskStatus = TASK_STATUS__PAUSE;
|
|
||||||
|
|
||||||
// if it's an source task, extract the last version in wal.
|
// if it's an source task, extract the last version in wal.
|
||||||
int64_t ver = pTask->dataRange.range.maxVer;
|
int64_t ver = pTask->dataRange.range.maxVer;
|
||||||
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader);
|
||||||
ASSERT(latestVer >= ver);
|
if (latestVer >= ver) {
|
||||||
ver = latestVer;
|
ver = latestVer;
|
||||||
|
} else {
|
||||||
|
ASSERT(latestVer == -1);
|
||||||
|
}
|
||||||
|
|
||||||
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
|
// 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1]
|
||||||
|
|
||||||
|
|
||||||
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
// 3. notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||||
|
pTask->status.transferState = true;
|
||||||
|
|
||||||
code = streamDispatchTransferStateMsg(pTask);
|
code = streamDispatchTransferStateMsg(pTask);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// todo handle error
|
// todo handle error
|
||||||
|
@ -1129,7 +1140,9 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
// 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task.
|
||||||
// 5. resume the related stream task.
|
// 5. resume the related stream task.
|
||||||
|
streamTryExec(pTask);
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
// todo update the chkInfo version for current task.
|
// todo update the chkInfo version for current task.
|
||||||
|
@ -1318,10 +1331,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pTask->chkInfo.version);
|
pTask->chkInfo.version);
|
||||||
streamProcessRunReq(pTask);
|
streamProcessRunReq(pTask);
|
||||||
} else {
|
} else {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status) || (pTask->status.taskStatus == TASK_STATUS__HALT)) {
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
}
|
}
|
||||||
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
|
|
||||||
|
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
|
||||||
|
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
|
@ -135,7 +135,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE ||
|
if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__SCAN_HISTORY_PREPARE ||
|
||||||
status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
|
status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) {
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status);
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
|
|
|
@ -421,11 +421,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_TASK_CHECK_RSP: {
|
|
||||||
if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
needCommit = pVnode->config.hashChange;
|
needCommit = pVnode->config.hashChange;
|
||||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||||
|
@ -574,24 +569,26 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
||||||
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRunReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_DISPATCH:
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||||
case TDMT_STREAM_TASK_CHECK:
|
|
||||||
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
|
||||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
|
case TDMT_STREAM_TASK_CHECK:
|
||||||
|
return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg);
|
||||||
|
case TDMT_STREAM_TASK_CHECK_RSP:
|
||||||
|
return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
|
||||||
case TDMT_STREAM_RETRIEVE:
|
case TDMT_STREAM_RETRIEVE:
|
||||||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RETRIEVE_RSP:
|
case TDMT_STREAM_RETRIEVE_RSP:
|
||||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE:
|
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||||
return tqProcessTaskRecover1Req(pVnode->pTq, pMsg);
|
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||||
case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE:
|
case TDMT_VND_STREAM_TRANSFER_STATE:
|
||||||
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
|
return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen);
|
||||||
case TDMT_STREAM_RECOVER_FINISH:
|
case TDMT_STREAM_RECOVER_FINISH:
|
||||||
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_RECOVER_FINISH_RSP:
|
case TDMT_STREAM_RECOVER_FINISH_RSP:
|
||||||
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
vError("unknown msg type:%d in fetch queue", pMsg->msgType);
|
vError("unknown msg type:%d in stream queue", pMsg->msgType);
|
||||||
return TSDB_CODE_APP_ERROR;
|
return TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -117,6 +117,8 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg);
|
||||||
qDebug("trigger to run s-task:%s", pTask->id.idStr);
|
qDebug("trigger to run s-task:%s", pTask->id.idStr);
|
||||||
|
} else {
|
||||||
|
qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -388,20 +388,23 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
if (pInput == NULL) {
|
if (pInput == NULL) {
|
||||||
if (pTask->info.fillHistory && pTask->status.transferState) {
|
if (pTask->info.fillHistory && pTask->status.transferState) {
|
||||||
// todo transfer task state here
|
// todo transfer task state here
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
|
||||||
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
|
||||||
|
|
||||||
ASSERT(pStreamTask->status.taskStatus == STREAM_STATUS__PAUSE);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId);
|
||||||
|
qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr);
|
||||||
|
|
||||||
|
ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId);
|
||||||
|
ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT);
|
||||||
|
|
||||||
// update the scan data range for source task.
|
// update the scan data range for source task.
|
||||||
|
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||||
|
qDebug("s-task:%s stream task window %"PRId64" - %"PRId64" transfer to %"PRId64" - %"PRId64", status:%d, sched-status:%d", pStreamTask->id.idStr,
|
||||||
|
pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, TASK_STATUS__NORMAL, pStreamTask->status.schedStatus);
|
||||||
|
pTimeWindow->skey = INT64_MIN;
|
||||||
|
|
||||||
streamSetStatusNormal(pStreamTask);
|
streamSetStatusNormal(pStreamTask);
|
||||||
streamSchedExec(pStreamTask);
|
streamSchedExec(pStreamTask);
|
||||||
|
|
||||||
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
streamMetaReleaseTask(pTask->pMeta, pStreamTask);
|
||||||
|
|
||||||
// todo set the task with specified status, to avoid execute this process again
|
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -479,7 +482,8 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
||||||
|
|
||||||
// todo the task should be commit here
|
// todo the task should be commit here
|
||||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
qDebug("s-task:%s exec completed, status:%d", pTask->id.idStr, pTask->status.taskStatus);
|
qDebug("s-task:%s exec completed, status:%d, sched-status:%d", pTask->id.idStr, pTask->status.taskStatus,
|
||||||
|
pTask->status.schedStatus);
|
||||||
|
|
||||||
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
|
if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) &&
|
||||||
(!streamTaskShouldPause(&pTask->status))) {
|
(!streamTaskShouldPause(&pTask->status))) {
|
||||||
|
|
|
@ -21,16 +21,17 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
||||||
switch(status) {
|
switch(status) {
|
||||||
case TASK_STATUS__NORMAL: return "normal";
|
case TASK_STATUS__NORMAL: return "normal";
|
||||||
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
|
case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream";
|
||||||
case TASK_STATUS__RECOVER_PREPARE: return "scan-history-prepare";
|
case TASK_STATUS__SCAN_HISTORY_PREPARE: return "scan-history-prepare";
|
||||||
case TASK_STATUS__RECOVER1: return "scan-history-data";
|
case TASK_STATUS__HALT: return "halt";
|
||||||
default:return "";
|
default:return "";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
||||||
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId);
|
qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE);
|
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__SCAN_HISTORY_PREPARE);
|
||||||
|
|
||||||
SVersionRange* pRange = &pTask->dataRange.range;
|
SVersionRange* pRange = &pTask->dataRange.range;
|
||||||
qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr,
|
qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr,
|
||||||
|
@ -51,7 +52,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) {
|
||||||
|
|
||||||
memcpy(serializedReq, &req, len);
|
memcpy(serializedReq, &req, len);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE };
|
SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY };
|
||||||
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
|
if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) {
|
||||||
/*ASSERT(0);*/
|
/*ASSERT(0);*/
|
||||||
}
|
}
|
||||||
|
@ -473,6 +474,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,6 +89,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
|
||||||
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
|
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1;
|
||||||
|
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
||||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
||||||
|
@ -149,6 +152,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
|
if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1;
|
||||||
|
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
|
||||||
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
||||||
|
|
Loading…
Reference in New Issue