fix(stream): enable new time range for stream task.
This commit is contained in:
parent
6fbcf4b3d7
commit
bb4ba54f28
|
@ -3040,6 +3040,13 @@ typedef struct {
|
|||
int32_t taskId;
|
||||
} SVDropStreamTaskReq;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int64_t dataVer;
|
||||
} SVStreamTaskVerUpdateReq;
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
} SVDropStreamTaskRsp;
|
||||
|
|
|
@ -308,6 +308,7 @@ enum { // WARN: new msg should be appended to segment tail
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_CHECK_POINT_SOURCE, "vnode-stream-checkpoint-source", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_UPDATE, "vnode-stream-update", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_VERUPDATE, "vnode-stream-ver-update", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG)
|
||||
|
|
|
@ -723,6 +723,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
|
|||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver);
|
||||
|
||||
// source level
|
||||
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
|
||||
|
@ -761,6 +762,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
|
|||
|
||||
int32_t streamAlignTransferState(SStreamTask* pTask);
|
||||
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId);
|
||||
int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver);
|
||||
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask,
|
||||
int8_t isSucceed);
|
||||
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
|
||||
|
|
|
@ -801,6 +801,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_VERUPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -232,7 +232,7 @@ int32_t doAddShuffleSinkTask(SMnode* pMnode, SArray* pTaskList, SStreamObj* pStr
|
|||
|
||||
int32_t doAddSinkTask(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup,
|
||||
SEpSet* pEpset, bool isFillhistory) {
|
||||
int64_t uid = (isFillhistory)? pStream->uid:pStream->hTaskUid;
|
||||
int64_t uid = (isFillhistory)? pStream->hTaskUid:pStream->uid;
|
||||
SStreamTask* pTask = tNewStreamTask(uid, TASK_LEVEL__SINK, isFillhistory, 0, pTaskList, pStream->conf.fillHistory);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -177,6 +177,9 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
|
|||
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
|
||||
|
||||
// send msg to update the nextProcessedVer attribute for this task if it is a stream task
|
||||
streamBuildAndSendVerUpdateMsg(pTask->pMsgCb, pSnode->pMeta->vgId, &pTask->id, 0);
|
||||
|
||||
streamTaskCheckDownstream(pTask);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -227,6 +227,7 @@ int tqScanWalAsync(STQ* pTq, bool ckPause);
|
|||
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
||||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen);
|
||||
int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
|
|
|
@ -964,18 +964,17 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
|
||||
if (tsDisableStream) {
|
||||
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
||||
|
||||
// 1.deserialize msg and build task
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||
int32_t size = sizeof(SStreamTask);
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
||||
if (pTask == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId,
|
||||
(int32_t)sizeof(SStreamTask));
|
||||
return -1;
|
||||
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SDecoder decoder;
|
||||
|
@ -983,9 +982,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
code = tDecodeStreamTask(&decoder, pTask);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
if (code < 0) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pTask);
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
|
||||
|
@ -1001,9 +1000,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
taosWUnLockLatch(&pStreamMeta->lock);
|
||||
|
||||
if (code < 0) {
|
||||
tqError("vgId:%d failed to add s-task:0x%x, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
return code;
|
||||
}
|
||||
|
||||
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
|
||||
|
@ -1012,6 +1011,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
// only handled in the leader node
|
||||
if (vnodeIsRoleLeader(pTq->pVnode)) {
|
||||
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
|
||||
#if 0
|
||||
if (pTq->pVnode->restored) {
|
||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
|
||||
if (p != NULL) {
|
||||
// send msg to update the nextProcessedVer attribute for this task if it is a stream task
|
||||
streamBuildAndSendVerUpdateMsg(p->pMsgCb, vgId, &p->id, sversion);
|
||||
streamMetaReleaseTask(pStreamMeta, p);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
|
||||
|
||||
bool restored = pTq->pVnode->restored;
|
||||
|
@ -1035,7 +1044,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
|
|||
tFreeStreamTask(pTask);
|
||||
}
|
||||
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
@ -1498,14 +1507,15 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
|
|||
|
||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SStreamTask* pHistoryTask =
|
||||
streamMetaAcquireTask(pTq->pStreamMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||
if (pHistoryTask) {
|
||||
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
||||
}
|
||||
|
@ -1524,9 +1534,11 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
tDecodeStreamRetrieveReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
|
||||
if (pTask == NULL) {
|
||||
// tDeleteStreamDispatchReq(&req);
|
||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId,
|
||||
req.dstTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -1887,3 +1899,35 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
taosArrayDestroy(req.pNodeList);
|
||||
return rsp.code;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskDataVerUpdateReq(STQ* pTq, char* pMsg, int32_t msgLen) {
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
SVStreamTaskVerUpdateReq* pReq = (SVStreamTaskVerUpdateReq*) pMsg;
|
||||
tqDebug("vgId:%d receive msg to update task dataVer, task:0x%x dataVer:%" PRId64, vgId, pReq->taskId, pReq->dataVer);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process dataVer msg, failed to find task:0x%x, it may have been destroyed", vgId, pReq->taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// commit the dataVer update
|
||||
streamTaskUpdateDataVer(pTask, pReq->dataVer);
|
||||
|
||||
if (vnodeIsLeader(pTq->pVnode)) {
|
||||
if (pTq->pVnode->restored) {
|
||||
ASSERT(pTask->execInfo.init == 0);
|
||||
|
||||
pTask->execInfo.init = taosGetTimestampMs();
|
||||
tqDebug("s-task:%s set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.init);
|
||||
streamTaskCheckDownstream(pTask);
|
||||
} else {
|
||||
tqWarn("s-task:%s not launched since vnode (vgId:%d) not ready", pTask->id.idStr, vgId);
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -468,7 +468,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
void *ptr = NULL;
|
||||
void *pReq;
|
||||
int32_t len;
|
||||
int32_t ret;
|
||||
|
||||
if (ver <= pVnode->state.applied) {
|
||||
vError("vgId:%d, duplicate write request. ver: %" PRId64 ", applied: %" PRId64 "", TD_VID(pVnode), ver,
|
||||
|
@ -561,7 +560,9 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
}
|
||||
break;
|
||||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
if (tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len) < 0) {
|
||||
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
goto _err;
|
||||
}
|
||||
} break;
|
||||
|
@ -582,13 +583,15 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
|||
goto _err;
|
||||
}
|
||||
} break;
|
||||
case TDMT_STREAM_TASK_VERUPDATE:
|
||||
tqProcessTaskDataVerUpdateReq(pVnode->pTq, pMsg->pCont, pMsg->contLen);
|
||||
break;
|
||||
case TDMT_VND_ALTER_CONFIRM:
|
||||
needCommit = pVnode->config.hashChange;
|
||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
break;
|
||||
|
||||
case TDMT_VND_ALTER_CONFIG:
|
||||
vnodeProcessAlterConfigReq(pVnode, ver, pReq, len, pRsp);
|
||||
break;
|
||||
|
|
|
@ -394,28 +394,28 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
*pAdded = false;
|
||||
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p == NULL) {
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
|
||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (p != NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
|
||||
tFreeStreamTask(pTask);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
|
||||
// if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||
// tFreeStreamTask(pTask);
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// if (streamMetaCommit(pMeta) < 0) {
|
||||
// tFreeStreamTask(pTask);
|
||||
// return -1;
|
||||
// }
|
||||
|
||||
taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
|
@ -716,6 +716,9 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
|
|||
|
||||
taosArrayPush(pMeta->pTaskList, &pTask->id);
|
||||
} else {
|
||||
stError("s-task:0x%x already added into table meta by replaying WAL, need check", pTask->id.taskId);
|
||||
ASSERT(0);
|
||||
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
taosMemoryFree(pTask);
|
||||
|
|
|
@ -119,7 +119,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
|||
streamTaskEnablePause(pTask);
|
||||
}
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||
if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
|
||||
stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -144,10 +146,10 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
|
|||
req.downstreamTaskId = pTask->fixedDispatcher.taskId;
|
||||
pTask->checkReqId = req.reqId;
|
||||
|
||||
stDebug("s-task:%s stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
|
||||
pTask->id.idStr, req.reqId, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
|
||||
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId,
|
||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||
|
||||
streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
|
@ -583,6 +585,7 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
|
|||
SDataRange* pRange = &pHTask->dataRange;
|
||||
pRange->range.minVer = 0;
|
||||
|
||||
// todo remove this
|
||||
// the query version range should be limited to the already processed data
|
||||
pRange->range.maxVer = pTask->chkInfo.nextProcessVer - 1;
|
||||
if (pRange->range.maxVer < pRange->range.minVer) {
|
||||
|
@ -725,7 +728,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
|||
if (pTask->hTaskInfo.pTimer == NULL) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer);
|
||||
if (pTask->hTaskInfo.pTimer == NULL) { // todo failed to create timer
|
||||
if (pTask->hTaskInfo.pTimer == NULL) {
|
||||
atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr,
|
||||
pTask->status.timerActive);
|
||||
|
@ -883,20 +886,20 @@ int32_t tDecodeStreamScanHistoryFinishReq(SDecoder* pDecoder, SStreamScanHistory
|
|||
}
|
||||
|
||||
void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
|
||||
if (pTask->hTaskInfo.id.taskId == 0) {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
if (pTask->info.fillHistory == 1) {
|
||||
stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
|
||||
"-%" PRId64,
|
||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
} else {
|
||||
stDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64
|
||||
"-%" PRId64,
|
||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
stDebug(
|
||||
"s-task:%s no related fill-history task, stream time window and verRange are not set. default stream time "
|
||||
"window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
|
||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||
}
|
||||
} else {
|
||||
SDataRange* pRange = &pTask->dataRange;
|
||||
|
||||
int64_t ekey = 0;
|
||||
if (pRange->window.ekey < INT64_MAX) {
|
||||
ekey = pRange->window.ekey + 1;
|
||||
|
|
|
@ -413,6 +413,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
pTask->pMeta = pMeta;
|
||||
|
||||
pTask->chkInfo.checkpointVer = ver - 1;
|
||||
pTask->chkInfo.nextProcessVer = ver;
|
||||
pTask->dataRange.range.maxVer = ver;
|
||||
pTask->dataRange.range.minVer = ver;
|
||||
|
@ -688,6 +689,63 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t streamBuildAndSendVerUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, int64_t ver) {
|
||||
SVStreamTaskVerUpdateReq* pReq = rpcMallocCont(sizeof(SVStreamTaskVerUpdateReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = vgId;
|
||||
pReq->taskId = pTaskId->taskId;
|
||||
pReq->streamId = pTaskId->streamId;
|
||||
pReq->dataVer = ver;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_VERUPDATE, .pCont = pReq, .contLen = sizeof(SVStreamTaskVerUpdateReq)};
|
||||
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("vgId:%d failed to send update task:0x%x dataVer msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
stDebug("vgId:%d build and send update table:0x%x dataVer:%"PRId64" msg", vgId, pTaskId->taskId, ver);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamTaskUpdateDataVer(SStreamTask* pTask, int64_t ver) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
// commit the dataVer update
|
||||
int64_t prevVer = 0;
|
||||
taosThreadMutexLock(&pTask->lock);
|
||||
|
||||
if (pTask->chkInfo.checkpointId == 0) {
|
||||
prevVer = pTask->chkInfo.nextProcessVer;
|
||||
pTask->chkInfo.nextProcessVer = ver;
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
taosWLockLatch(&pMeta->lock);
|
||||
if (streamMetaSaveTask(pMeta, pTask) < 0) {
|
||||
// return -1;
|
||||
}
|
||||
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
}
|
||||
|
||||
stDebug("s-task:%s nextProcessedVer is update from %" PRId64 " to %" PRId64 " checkpointId:%" PRId64
|
||||
" checkpointVer:%" PRId64,
|
||||
pTask->id.idStr, prevVer, ver, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
|
||||
taosWUnLockLatch(&pMeta->lock);
|
||||
} else {
|
||||
stDebug("s-task:%s not update the dataVer, existed:%" PRId64 ", checkpointId:%" PRId64 " checkpointVer:%" PRId64,
|
||||
pTask->id.idStr, pTask->chkInfo.nextProcessVer, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STaskId streamTaskExtractKey(const SStreamTask* pTask) {
|
||||
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
|
||||
return id;
|
||||
|
|
Loading…
Reference in New Issue