diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb843ced91..0c0d28e4b7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3040,6 +3040,13 @@ typedef struct { int32_t taskId; } SVDropStreamTaskReq; +typedef struct { + SMsgHead head; + int64_t streamId; + int32_t taskId; + int64_t dataVer; +} SVStreamTaskVerUpdateReq; + typedef struct { int8_t reserved; } SVDropStreamTaskRsp; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index fb2c780724..279cf72f0b 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -308,6 +308,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_VERUPDATE, "vnode-stream-ver-update", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 88ba7f995a..40b2be72bc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -723,6 +723,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); +int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); @@ -761,6 +762,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId); +int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver); int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask, int8_t isSucceed); int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 0251b9b636..9d6b18c677 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -801,6 +801,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_VERUPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a13c6371b1..f2e783d64f 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -232,7 +232,7 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, SEpSet* pEpset, bool isFillhistory) { - int64_t uid = (isFillhistory)? pStream->uid:pStream->hTaskUid; + int64_t uid = (isFillhistory)? pStream->hTaskUid:pStream->uid; SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory); if (pTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index dbbd68fa08..799c784f38 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -177,6 +177,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); + // send msg to update the nextProcessedVer attribute for this task if it is a stream task + streamBuildAndSendVerUpdateMsg(pTask->pMsgCb, pSnode->pMeta->vgId, &pTask->id, 0); + streamTaskCheckDownstream(pTask); return 0; } diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3a62f52bdd..1e07c87cb2 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -227,6 +227,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause); int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp); int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen); int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3b11b5b764..b523faec7f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -964,18 +964,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms if (tsDisableStream) { tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); - return 0; + return code; } tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); // 1.deserialize msg and build task - SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + int32_t size = sizeof(SStreamTask); + SStreamTask* pTask = taosMemoryCalloc(1, size); if (pTask == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, - (int32_t)sizeof(SStreamTask)); - return -1; + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); + return TSDB_CODE_OUT_OF_MEMORY; } SDecoder decoder; @@ -983,9 +982,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms code = tDecodeStreamTask(&decoder, pTask); tDecoderClear(&decoder); - if (code < 0) { + if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(pTask); - return -1; + return TSDB_CODE_INVALID_MSG; } SStreamMeta* pStreamMeta = pTq->pStreamMeta; @@ -1001,9 +1000,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms taosWUnLockLatch(&pStreamMeta->lock); if (code < 0) { - tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); + tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); tFreeStreamTask(pTask); - return -1; + return code; } // added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if @@ -1012,6 +1011,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // only handled in the leader node if (vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks); +#if 0 + if (pTq->pVnode->restored) { + SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); + if (p != NULL) { + // send msg to update the nextProcessedVer attribute for this task if it is a stream task + streamBuildAndSendVerUpdateMsg(p->pMsgCb, vgId, &p->id, sversion); + streamMetaReleaseTask(pStreamMeta, p); + } + } +#endif SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId); bool restored = pTq->pVnode->restored; @@ -1035,7 +1044,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tFreeStreamTask(pTask); } - return 0; + return code; } int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { @@ -1498,14 +1507,15 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); - int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId); + int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated); if (code != 0) { return code; } - SStreamTask* pHistoryTask = - streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId); if (pHistoryTask) { code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated); } @@ -1524,9 +1534,11 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { tDecodeStreamRetrieveReq(&decoder, &req); tDecoderClear(&decoder); + int32_t vgId = pTq->pStreamMeta->vgId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId); if (pTask == NULL) { - // tDeleteStreamDispatchReq(&req); + tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId, + req.dstTaskId); return -1; } @@ -1887,3 +1899,35 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { taosArrayDestroy(req.pNodeList); return rsp.code; } + +int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen) { + SStreamMeta* pMeta = pTq->pStreamMeta; + int32_t vgId = pMeta->vgId; + + SVStreamTaskVerUpdateReq* pReq = (SVStreamTaskVerUpdateReq*) pMsg; + tqDebug("vgId:%d receive msg to update task dataVer, task:0x%x dataVer:%" PRId64, vgId, pReq->taskId, pReq->dataVer); + + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); + if (pTask == NULL) { + tqError("vgId:%d process dataVer msg, failed to find task:0x%x, it may have been destroyed", vgId, pReq->taskId); + return -1; + } + + // commit the dataVer update + streamTaskUpdateDataVer(pTask, pReq->dataVer); + + if (vnodeIsLeader(pTq->pVnode)) { + if (pTq->pVnode->restored) { + ASSERT(pTask->execInfo.init == 0); + + pTask->execInfo.init = taosGetTimestampMs(); + tqDebug("s-task:%s set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.init); + streamTaskCheckDownstream(pTask); + } else { + tqWarn("s-task:%s not launched since vnode (vgId:%d) not ready", pTask->id.idStr, vgId); + } + } + + streamMetaReleaseTask(pMeta, pTask); + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 97f484849c..7a1e60f075 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -468,7 +468,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg void *ptr = NULL; void *pReq; int32_t len; - int32_t ret; if (ver <= pVnode->state.applied) { vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver, @@ -561,7 +560,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - if (tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) { + int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; goto _err; } } break; @@ -582,13 +583,15 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg goto _err; } } break; + case TDMT_STREAM_TASK_VERUPDATE: + tqProcessTaskDataVerUpdateReq(pVnode->pTq, pMsg->pCont, pMsg->contLen); + break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) { goto _err; } break; - case TDMT_VND_ALTER_CONFIG: vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp); break; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index da1acc6965..840eee98a8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -394,28 +394,28 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa *pAdded = false; STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; - void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); - if (p == NULL) { - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - tFreeStreamTask(pTask); - return -1; - } - - taosArrayPush(pMeta->pTaskList, &pTask->id); - - if (streamMetaSaveTask(pMeta, pTask) < 0) { - tFreeStreamTask(pTask); - return -1; - } - - if (streamMetaCommit(pMeta) < 0) { - tFreeStreamTask(pTask); - return -1; - } - } else { + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p != NULL) { return 0; } + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + tFreeStreamTask(pTask); + return -1; + } + + taosArrayPush(pMeta->pTaskList, &pTask->id); + +// if (streamMetaSaveTask(pMeta, pTask) < 0) { +// tFreeStreamTask(pTask); +// return -1; +// } +// +// if (streamMetaCommit(pMeta) < 0) { +// tFreeStreamTask(pTask); +// return -1; +// } + taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); if (pTask->info.fillHistory == 0) { atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1); @@ -716,6 +716,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { taosArrayPush(pMeta->pTaskList, &pTask->id); } else { + stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId); + ASSERT(0); + tdbFree(pKey); tdbFree(pVal); taosMemoryFree(pTask); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f31a16ec85..3e491fa1bf 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -119,7 +119,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) { streamTaskEnablePause(pTask); } } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); + if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { + stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr); + } } return 0; } @@ -144,10 +146,10 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { req.downstreamTaskId = pTask->fixedDispatcher.taskId; pTask->checkReqId = req.reqId; - stDebug("s-task:%s stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 + stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64, - pTask->id.idStr, req.reqId, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, - pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); + pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, + pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -583,6 +585,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) SDataRange* pRange = &pHTask->dataRange; pRange->range.minVer = 0; + // todo remove this // the query version range should be limited to the already processed data pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1; if (pRange->range.maxVer < pRange->range.minVer) { @@ -725,7 +728,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { if (pTask->hTaskInfo.pTimer == NULL) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer); - if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer + if (pTask->hTaskInfo.pTimer == NULL) { atomic_sub_fetch_32(&pTask->status.timerActive, 1); stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, pTask->status.timerActive); @@ -883,20 +886,20 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory } void streamTaskSetRangeStreamCalc(SStreamTask* pTask) { + SDataRange* pRange = &pTask->dataRange; + if (pTask->hTaskInfo.id.taskId == 0) { - SDataRange* pRange = &pTask->dataRange; if (pTask->info.fillHistory == 1) { stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } else { - stDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 - "-%" PRId64, - pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); + stDebug( + "s-task:%s no related fill-history task, stream time window and verRange are not set. default stream time " + "window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64, + pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } } else { - SDataRange* pRange = &pTask->dataRange; - int64_t ekey = 0; if (pRange->window.ekey < INT64_MAX) { ekey = pRange->window.ekey + 1; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 2907923d03..8dffbec09f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -413,6 +413,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMeta = pMeta; + pTask->chkInfo.checkpointVer = ver - 1; pTask->chkInfo.nextProcessVer = ver; pTask->dataRange.range.maxVer = ver; pTask->dataRange.range.minVer = ver; @@ -688,6 +689,63 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI return code; } +int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver) { + SVStreamTaskVerUpdateReq* pReq = rpcMallocCont(sizeof(SVStreamTaskVerUpdateReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pReq->head.vgId = vgId; + pReq->taskId = pTaskId->taskId; + pReq->streamId = pTaskId->streamId; + pReq->dataVer = ver; + + SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_VERUPDATE, .pCont = pReq, .contLen = sizeof(SVStreamTaskVerUpdateReq)}; + int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg); + if (code != TSDB_CODE_SUCCESS) { + stError("vgId:%d failed to send update task:0x%x dataVer msg, code:%s", vgId, pTaskId->taskId, tstrerror(code)); + return code; + } + + stDebug("vgId:%d build and send update table:0x%x dataVer:%"PRId64" msg", vgId, pTaskId->taskId, ver); + return code; +} + +int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver) { + SStreamMeta* pMeta = pTask->pMeta; + + // commit the dataVer update + int64_t prevVer = 0; + taosThreadMutexLock(&pTask->lock); + + if (pTask->chkInfo.checkpointId == 0) { + prevVer = pTask->chkInfo.nextProcessVer; + pTask->chkInfo.nextProcessVer = ver; + taosThreadMutexUnlock(&pTask->lock); + + taosWLockLatch(&pMeta->lock); + if (streamMetaSaveTask(pMeta, pTask) < 0) { +// return -1; + } + + if (streamMetaCommit(pMeta) < 0) { + // persist to disk + } + + stDebug("s-task:%s nextProcessedVer is update from %" PRId64 " to %" PRId64 " checkpointId:%" PRId64 + " checkpointVer:%" PRId64, + pTask->id.idStr, prevVer, ver, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); + taosWUnLockLatch(&pMeta->lock); + } else { + stDebug("s-task:%s not update the dataVer, existed:%" PRId64 ", checkpointId:%" PRId64 " checkpointVer:%" PRId64, + pTask->id.idStr, pTask->chkInfo.nextProcessVer, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer); + taosThreadMutexUnlock(&pTask->lock); + } + + return TSDB_CODE_SUCCESS; +} + STaskId streamTaskExtractKey(const SStreamTask* pTask) { STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; return id;