From fb24ed161d896fad08ffc18b25f920ba71c3650d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 12 Jun 2023 22:48:23 +0800 Subject: [PATCH] enh(stream): set correct task status. --- include/common/tmsgdef.h | 5 +++-- include/libs/stream/tstream.h | 6 +++-- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/tq/tq.c | 31 +++++++++++++++++++------- source/dnode/vnode/src/tq/tqRestore.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 19 +++++++--------- source/libs/stream/src/stream.c | 2 ++ source/libs/stream/src/streamExec.c | 18 +++++++++------ source/libs/stream/src/streamRecover.c | 10 +++++---- source/libs/stream/src/streamTask.c | 6 +++++ 10 files changed, 65 insertions(+), 36 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 11756c47ba..3103b6fd76 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -252,6 +252,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_RECOVER_FINISH, "stream-recover-finish", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) @@ -298,8 +299,8 @@ enum { TD_NEW_MSG_SEG(TDMT_VND_STREAM_MSG) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, "vnode-stream-recover1", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, "vnode-stream-recover2", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_SCAN_HISTORY, "vnode-stream-scan-history", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRANSFER_STATE, "vnode-stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e1af12389b..63af587794 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -45,8 +45,8 @@ enum { TASK_STATUS__FAIL, TASK_STATUS__STOP, TASK_STATUS__WAIT_DOWNSTREAM, - TASK_STATUS__RECOVER_PREPARE, - TASK_STATUS__RECOVER1, + TASK_STATUS__SCAN_HISTORY_PREPARE, + TASK_STATUS__HALT, // stream task halt to wait for the secondary scan history, this status is invisible for user TASK_STATUS__PAUSE, }; @@ -577,6 +577,8 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamSetParamForRecover(SStreamTask* pTask); int32_t streamRestoreParam(SStreamTask* pTask); int32_t streamSetStatusNormal(SStreamTask* pTask); +const char* streamGetTaskStatusStr(int32_t status); + // source level int32_t streamSourceRecoverPrepareStep1(SStreamTask* pTask, SVersionRange *pVerRange, STimeWindow* pWindow); int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* pReq); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 57e7ad8abf..cd97bd5753 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -244,7 +244,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskTransferStateReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 71e28c3bad..93b078ad7d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1065,7 +1065,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms return 0; } -int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { +int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { int32_t code = TSDB_CODE_SUCCESS; char* msg = pMsg->pCont; int32_t msgLen = pMsg->contLen; @@ -1091,10 +1091,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { tqDebug("s-task:%s start history data scan stage(step 1)", pTask->id.idStr); int64_t st = taosGetTimestampMs(); + // todo set the correct status flag + int8_t schedStatus = atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE, + TASK_SCHED_STATUS__WAITING); + if (schedStatus != TASK_SCHED_STATUS__INACTIVE) { + ASSERT(0); + return 0; + } + streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); - streamMetaReleaseTask(pMeta, pTask); return 0; } @@ -1109,19 +1116,23 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { // todo handle error } - // todo here we should use another flag to avoid user resume the task - pStreamTask->status.taskStatus = TASK_STATUS__PAUSE; + pStreamTask->status.taskStatus = TASK_STATUS__HALT; // if it's an source task, extract the last version in wal. int64_t ver = pTask->dataRange.range.maxVer; int64_t latestVer = walReaderGetCurrentVer(pStreamTask->exec.pWalReader); - ASSERT(latestVer >= ver); - ver = latestVer; + if (latestVer >= ver) { + ver = latestVer; + } else { + ASSERT(latestVer == -1); + } // 2. do secondary scan of the history data, the time window remain, and the version range is updated to [pTask->dataRange.range.maxVer, ver1] // 3. notify the downstream tasks to transfer executor state after handle all history blocks. + pTask->status.transferState = true; + code = streamDispatchTransferStateMsg(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle error @@ -1129,7 +1140,9 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { // 4. 1) transfer the ownership of executor state, 2) update the scan data range for source task. // 5. resume the related stream task. + streamTryExec(pTask); + streamMetaReleaseTask(pMeta, pStreamTask); streamMetaReleaseTask(pMeta, pTask); } else { // todo update the chkInfo version for current task. @@ -1318,10 +1331,12 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { pTask->chkInfo.version); streamProcessRunReq(pTask); } else { - if (streamTaskShouldPause(&pTask->status)) { + if (streamTaskShouldPause(&pTask->status) || (pTask->status.taskStatus == TASK_STATUS__HALT)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); } - tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr); + + tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, + pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index cafb64e44a..bf560a3fb9 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -135,7 +135,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || + if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__SCAN_HISTORY_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM || streamTaskShouldPause(&pTask->status)) { tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 47b899aa3d..99957431f8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -421,11 +421,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; - case TDMT_STREAM_TASK_CHECK_RSP: { - if (tqProcessStreamTaskCheckRsp(pVnode->pTq, ver, pReq, len) < 0) { - goto _err; - } - } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { @@ -574,24 +569,26 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRunReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH: return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); - case TDMT_STREAM_TASK_CHECK: - return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_DISPATCH_RSP: return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECK: + return tqProcessStreamTaskCheckReq(pVnode->pTq, pMsg); + case TDMT_STREAM_TASK_CHECK_RSP: + return tqProcessStreamTaskCheckRsp(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen); case TDMT_STREAM_RETRIEVE: return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg); case TDMT_STREAM_RETRIEVE_RSP: return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE: - return tqProcessTaskRecover1Req(pVnode->pTq, pMsg); - case TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE: + case TDMT_VND_STREAM_SCAN_HISTORY: + return tqProcessTaskScanHistory(pVnode->pTq, pMsg); + case TDMT_VND_STREAM_TRANSFER_STATE: return tqProcessTaskTransferStateReq(pVnode->pTq, 0, pMsg->pCont, pMsg->contLen); case TDMT_STREAM_RECOVER_FINISH: return tqProcessTaskRecoverFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_RECOVER_FINISH_RSP: return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg); default: - vError("unknown msg type:%d in fetch queue", pMsg->msgType); + vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 510c18ab65..e93c280d62 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -117,6 +117,8 @@ int32_t streamSchedExec(SStreamTask* pTask) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); qDebug("trigger to run s-task:%s", pTask->id.idStr); + } else { + qDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); } return 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 5264a5f043..688491fbdb 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -388,20 +388,23 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pInput == NULL) { if (pTask->info.fillHistory && pTask->status.transferState) { // todo transfer task state here - SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); - ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); - ASSERT(pStreamTask->status.taskStatus == STREAM_STATUS__PAUSE); + SStreamTask* pStreamTask = streamMetaAcquireTask(pTask->pMeta, pTask->streamTaskId.taskId); + qDebug("s-task:%s scan history task end, update stream task:%s info and launch it", pTask->id.idStr, pStreamTask->id.idStr); + + ASSERT(pStreamTask != NULL && pStreamTask->historyTaskId.taskId == pTask->id.taskId); + ASSERT(pStreamTask->status.taskStatus == TASK_STATUS__HALT); // update the scan data range for source task. - + STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; + qDebug("s-task:%s stream task window %"PRId64" - %"PRId64" transfer to %"PRId64" - %"PRId64", status:%d, sched-status:%d", pStreamTask->id.idStr, + pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, TASK_STATUS__NORMAL, pStreamTask->status.schedStatus); + pTimeWindow->skey = INT64_MIN; streamSetStatusNormal(pStreamTask); streamSchedExec(pStreamTask); streamMetaReleaseTask(pTask->pMeta, pStreamTask); - - // todo set the task with specified status, to avoid execute this process again } break; } @@ -479,7 +482,8 @@ int32_t streamTryExec(SStreamTask* pTask) { // todo the task should be commit here atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%d", pTask->id.idStr, pTask->status.taskStatus); + qDebug("s-task:%s exec completed, status:%d, sched-status:%d", pTask->id.idStr, pTask->status.taskStatus, + pTask->status.schedStatus); if (!taosQueueEmpty(pTask->inputQueue->queue) && (!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 9efe6fec42..c5f1004784 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -21,16 +21,17 @@ const char* streamGetTaskStatusStr(int32_t status) { switch(status) { case TASK_STATUS__NORMAL: return "normal"; case TASK_STATUS__WAIT_DOWNSTREAM: return "wait-for-downstream"; - case TASK_STATUS__RECOVER_PREPARE: return "scan-history-prepare"; - case TASK_STATUS__RECOVER1: return "scan-history-data"; + case TASK_STATUS__SCAN_HISTORY_PREPARE: return "scan-history-prepare"; + case TASK_STATUS__HALT: return "halt"; default:return ""; } } + int32_t streamTaskLaunchRecover(SStreamTask* pTask) { qDebug("s-task:%s (vgId:%d) launch recover", pTask->id.idStr, pTask->info.nodeId); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); + atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__SCAN_HISTORY_PREPARE); SVersionRange* pRange = &pTask->dataRange.range; qDebug("s-task:%s set task status:%s and start to recover, ver:%" PRId64 "-%" PRId64, pTask->id.idStr, @@ -51,7 +52,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask) { memcpy(serializedReq, &req, len); - SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE }; + SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY }; if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { /*ASSERT(0);*/ } @@ -473,6 +474,7 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) { return -1; } + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamMetaSaveTask(pMeta, pTask); return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7261fd31cc..0ab096aea3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -89,6 +89,9 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1; if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1; + if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; + if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; @@ -149,6 +152,9 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1; if (tDecodeI32(pDecoder, &pTask->historyTaskId.taskId)) return -1; + if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; + if (tDecodeI32(pDecoder, &pTask->streamTaskId.taskId)) return -1; + if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1; if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1; if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;