Merge pull request #22659 from taosdata/fix/TD-26035

fix(stream): do not restart the stream tasks when it is running.
This commit is contained in:
Haojun Liao 2023-08-31 10:16:59 +08:00 committed by GitHub
commit 0facde6ddc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 114 additions and 70 deletions

View File

@ -321,7 +321,7 @@ typedef struct {
struct SStreamTask { struct SStreamTask {
int64_t ver; int64_t ver;
SStreamTaskId id; SStreamTaskId id;
SSTaskBasicInfo info; SSTaskBasicInfo info;
STaskOutputInfo outputInfo; STaskOutputInfo outputInfo;
SDispatchMsgInfo msgInfo; SDispatchMsgInfo msgInfo;
@ -329,8 +329,8 @@ struct SStreamTask {
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
STaskExec exec; STaskExec exec;
SHistDataRange dataRange; SHistDataRange dataRange;
SStreamTaskId historyTaskId; SStreamTaskId historyTaskId;
SStreamTaskId streamTaskId; SStreamTaskId streamTaskId;
int32_t nextCheckId; int32_t nextCheckId;
SArray* checkpointInfo; // SArray<SStreamCheckpointInfo> SArray* checkpointInfo; // SArray<SStreamCheckpointInfo>
STaskTimestamp tsInfo; STaskTimestamp tsInfo;
@ -654,7 +654,7 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen)
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
// recover and fill history // recover and fill history
void streamTaskCheckDownstreamTasks(SStreamTask* pTask); void streamTaskCheckDownstream(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
@ -718,7 +718,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId); int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta);
// checkpoint // checkpoint

View File

@ -182,7 +182,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr, qDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks); streamGetTaskStatusStr(pTask->status.taskStatus), numOfTasks);
streamTaskCheckDownstreamTasks(pTask); streamTaskCheckDownstream(pTask);
return 0; return 0;
} }

View File

@ -45,8 +45,8 @@ extern "C" {
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
// tqPush // tqPush
#define EXTRACT_DATA_FROM_WAL_ID (-1) #define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_TASK_STATUS_CHECK_ID (-2) #define STREAM_EXEC_TASK_STATUS_CHECK_ID (-2)
// tqExec // tqExec
typedef struct { typedef struct {
@ -163,8 +163,9 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqStreamTasksScanWal(STQ* pTq); int32_t tqScanWalForStreamTasks(STQ* pTq);
int32_t tqStreamTasksStatusCheck(STQ* pTq); int32_t tqSetStreamTasksReady(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq);
// tq util // tq util
int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock); int32_t extractDelDataBlock(const void* pData, int32_t len, int64_t ver, SStreamRefDataBlock** pRefBlock);

View File

@ -223,11 +223,11 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, tmsg_t msgType); int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg); int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle); int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed. int tqStartStreamTasksAsync(STQ* pTq, bool ckPause); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq); int32_t tqSetStreamTasksReadyAsync(STQ* pTq);
int tqCommit(STQ*); int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);

View File

@ -133,7 +133,7 @@ int32_t tqInitialize(STQ* pTq) {
return -1; return -1;
} }
if (streamLoadTasks(pTq->pStreamMeta) < 0) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
return -1; return -1;
} }
@ -896,7 +896,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
// reset the task status from unfinished transaction // reset the task status from unfinished transaction
if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { if (pTask->status.taskStatus == TASK_STATUS__PAUSE) {
tqWarn("s-task:%s reset task status to be normal, kept in meta status: Paused", pTask->id.idStr); tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__NORMAL; pTask->status.taskStatus = TASK_STATUS__NORMAL;
} }
@ -1052,7 +1052,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
bool restored = pTq->pVnode->restored; bool restored = pTq->pVnode->restored;
if (p != NULL && restored) { if (p != NULL && restored) {
streamTaskCheckDownstreamTasks(p); streamTaskCheckDownstream(p);
} else if (!restored) { } else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId); tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
} }
@ -1199,7 +1199,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
} }
tqStartStreamTasks(pTq, false); tqStartStreamTasksAsync(pTq, false);
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
@ -1320,7 +1320,7 @@ int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1); int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) { if (remain > 0) {
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, remain:%d not send finish rsp", tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
pTask->id.idStr, req.downstreamId, remain); pTask->id.idStr, req.downstreamId, remain);
} else { } else {
tqDebug( tqDebug(
@ -1340,13 +1340,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_TASK_STATUS_CHECK_ID) { if (taskId == STREAM_EXEC_TASK_STATUS_CHECK_ID) {
tqStreamTasksStatusCheck(pTq); tqSetStreamTasksReady(pTq);
return 0; return 0;
} }
if (taskId == EXTRACT_DATA_FROM_WAL_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqStreamTasksScanWal(pTq); tqScanWalForStreamTasks(pTq);
return 0; return 0;
} }
@ -1365,7 +1365,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
} }
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq, false); tqStartStreamTasksAsync(pTq, false);
return 0; return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec. } else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this // todo add one function to handle this
@ -1505,7 +1505,7 @@ int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion,
pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) { pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY) {
streamStartScanHistoryAsync(pTask, igUntreated); streamStartScanHistoryAsync(pTask, igUntreated);
} else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) { } else if (level == TASK_LEVEL__SOURCE && (taosQueueItemSize(pTask->inputQueue->queue) == 0)) {
tqStartStreamTasks(pTq, false); tqStartStreamTasksAsync(pTq, false);
} else { } else {
streamSchedExec(pTask); streamSchedExec(pTask);
} }
@ -1815,7 +1815,7 @@ _end:
return -1; return -1;
} }
if (streamLoadTasks(pTq->pStreamMeta) < 0) { if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId); tqError("vgId:%d failed to load stream tasks", vgId);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
@ -1824,7 +1824,7 @@ _end:
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d, restart all stream tasks", vgId); vInfo("vgId:%d, restart all stream tasks", vgId);
tqCheckStreamStatus(pTq); tqSetStreamTasksReadyAsync(pTq);
} }
} }
} }

View File

@ -46,7 +46,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
// 2. the vnode should be the leader. // 2. the vnode should be the leader.
// 3. the stream is not suspended yet. // 3. the stream is not suspended yet.
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) { if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
tqStartStreamTasks(pTq, true); tqStartStreamTasksAsync(pTq, true);
} }
return 0; return 0;

View File

@ -19,9 +19,8 @@
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId); static int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId);
// this function should be executed by stream threads.
// extract submit block from WAL, and add them into the input queue for the sources tasks. // extract submit block from WAL, and add them into the input queue for the sources tasks.
int32_t tqStreamTasksScanWal(STQ* pTq) { int32_t tqScanWalForStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
@ -57,7 +56,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStreamTasksStatusCheck(STQ* pTq) { int32_t tqSetStreamTasksReady(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -80,7 +79,23 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
continue; continue;
} }
streamTaskCheckDownstreamTasks(pTask); if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// todo: how about the fill-history task?
if (pTask->status.downstreamReady == 1) {
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
streamSetStatusNormal(pTask);
streamTaskCheckDownstream(pTask);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} }
@ -88,7 +103,7 @@ int32_t tqStreamTasksStatusCheck(STQ* pTq) {
return 0; return 0;
} }
int32_t tqCheckStreamStatus(STQ* pTq) { int32_t tqSetStreamTasksReadyAsync(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -109,10 +124,10 @@ int32_t tqCheckStreamStatus(STQ* pTq) {
return -1; return -1;
} }
tqDebug("vgId:%d check for stream tasks status, numOfTasks:%d", vgId, numOfTasks); tqDebug("vgId:%d check %d stream task(s) status async", vgId, numOfTasks);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = STREAM_TASK_STATUS_CHECK_ID; pRunReq->taskId = STREAM_EXEC_TASK_STATUS_CHECK_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
@ -121,7 +136,7 @@ int32_t tqCheckStreamStatus(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) { int32_t tqStartStreamTasksAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -168,7 +183,7 @@ int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) {
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks); tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d", vgId, numOfTasks);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;
pRunReq->taskId = EXTRACT_DATA_FROM_WAL_ID; pRunReq->taskId = STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
@ -177,6 +192,37 @@ int32_t tqStartStreamTasks(STQ* pTq, bool ckPause) {
return 0; return 0;
} }
int32_t tqStopStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
taosWLockLatch(&pMeta->lock);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosWUnLockLatch(&pMeta->lock);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
streamTaskStop(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return 0;
}
int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) { int32_t doSetOffsetForWalReader(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);

View File

@ -177,7 +177,7 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId)
return code; return code;
} }
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamLoadTasks(pWriter->pTq->pStreamMeta); } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER); tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);

View File

@ -560,10 +560,10 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
tqCheckStreamStatus(pVnode->pTq); tqSetStreamTasksReadyAsync(pVnode->pTq);
} }
} else { } else {
vInfo("vgId:%d, sync restore finished, no launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
} }
} }
@ -578,6 +578,8 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
tsem_post(&pVnode->syncSem); tsem_post(&pVnode->syncSem);
} }
taosThreadMutexUnlock(&pVnode->lock); taosThreadMutexUnlock(&pVnode->lock);
tqStopStreamTasks(pVnode->pTq);
} }
static void vnodeBecomeLearner(const SSyncFSM *pFsm) { static void vnodeBecomeLearner(const SSyncFSM *pFsm) {

View File

@ -929,7 +929,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i); SStreamContinueExecInfo* pInfo = taosArrayGet(pTask->pRspMsgList, i);
tmsgSendRsp(&pInfo->msg); tmsgSendRsp(&pInfo->msg);
qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data from WAL", pTask->id.idStr, pTask->info.taskLevel, qDebug("s-task:%s level:%d notify upstream:0x%x to continue process data in WAL", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId); pInfo->taskId);
} }

View File

@ -605,7 +605,7 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
taosArrayDestroy(pRecycleList); taosArrayDestroy(pRecycleList);
} }
int32_t streamLoadTasks(SStreamMeta* pMeta) { int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL; TBC* pCur = NULL;
qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId); qInfo("vgId:%d load stream tasks from meta files", pMeta->vgId);

View File

@ -26,7 +26,6 @@ typedef struct SStreamTaskRetryInfo {
} SStreamTaskRetryInfo; } SStreamTaskRetryInfo;
static int32_t streamSetParamForScanHistory(SStreamTask* pTask); static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
static void launchFillHistoryTask(SStreamTask* pTask);
static void streamTaskSetRangeStreamCalc(SStreamTask* pTask); static void streamTaskSetRangeStreamCalc(SStreamTask* pTask);
static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated); static int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
@ -109,7 +108,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
} }
// check status // check status
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) { static int32_t doCheckDownstreamStatus(SStreamTask* pTask) {
SHistDataRange* pRange = &pTask->dataRange; SHistDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window; STimeWindow* pWindow = &pRange->window;
@ -121,7 +120,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
.stage = pTask->pMeta->stage, .stage = pTask->pMeta->stage,
}; };
// serialize // serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
@ -160,8 +159,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask) {
streamTaskSetReady(pTask, 0); streamTaskSetReady(pTask, 0);
streamTaskSetRangeStreamCalc(pTask); streamTaskSetRangeStreamCalc(pTask);
streamTaskLaunchScanHistory(pTask); streamTaskLaunchScanHistory(pTask);
streamLaunchFillHistoryTask(pTask);
launchFillHistoryTask(pTask);
} }
return 0; return 0;
@ -242,7 +240,7 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
} }
// when current stream task is ready, check the related fill history task. // when current stream task is ready, check the related fill history task.
launchFillHistoryTask(pTask); streamLaunchFillHistoryTask(pTask);
} }
// todo handle error // todo handle error
@ -437,7 +435,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory
initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len); initRpcMsg(&msg, 0, pBuf, sizeof(SMsgHead) + len);
tmsgSendRsp(&msg); tmsgSendRsp(&msg);
qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data from WAL", pTask->id.idStr, qDebug("s-task:%s level:%d notify upstream:0x%x(vgId:%d) to continue process data in WAL", pTask->id.idStr,
pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId); pTask->info.taskLevel, pReq->upstreamTaskId, pReq->upstreamNodeId);
return 0; return 0;
} }
@ -504,7 +502,7 @@ int32_t streamProcessScanHistoryFinishRsp(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) { static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
pHTask->dataRange.range.minVer = 0; pHTask->dataRange.range.minVer = 0;
pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer; pHTask->dataRange.range.maxVer = pTask->chkInfo.currentVer;
@ -518,7 +516,7 @@ static void doCheckDownstreamStatus(SStreamTask* pTask, SStreamTask* pHTask) {
} }
// check if downstream tasks have been ready // check if downstream tasks have been ready
streamTaskDoCheckDownstreamTasks(pHTask); doCheckDownstreamStatus(pHTask);
} }
static void tryLaunchHistoryTask(void* param, void* tmrId) { static void tryLaunchHistoryTask(void* param, void* tmrId) {
@ -565,7 +563,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
} }
if (pHTask != NULL) { if (pHTask != NULL) {
doCheckDownstreamStatus(pTask, pHTask); checkFillhistoryTaskStatus(pTask, pHTask);
streamMetaReleaseTask(pMeta, pHTask); streamMetaReleaseTask(pMeta, pHTask);
} }
@ -582,10 +580,20 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) {
// todo fix the bug: 2. race condition // todo fix the bug: 2. race condition
// an fill history task needs to be started. // an fill history task needs to be started.
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId;
if (tId == 0) {
return TSDB_CODE_SUCCESS;
}
ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId);
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t hTaskId = pTask->historyTaskId.taskId; int32_t hTaskId = pTask->historyTaskId.taskId;
int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId}; int64_t keys[2] = {pTask->historyTaskId.streamId, pTask->historyTaskId.taskId};
// Set the execute conditions, including the query time window and the version range // Set the execute conditions, including the query time window and the version range
SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys)); SStreamTask** pHTask = taosHashGet(pMeta->pTasks, keys, sizeof(keys));
if (pHTask == NULL) { if (pHTask == NULL) {
@ -612,11 +620,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer); taosTmrReset(tryLaunchHistoryTask, 100, pInfo, streamEnv.timer, &pTask->launchTaskTimer);
} }
// try again in 500ms // try again in 100ms
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
doCheckDownstreamStatus(pTask, *pHTask); checkFillhistoryTaskStatus(pTask, *pHTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -786,28 +794,15 @@ void streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
} }
} }
void launchFillHistoryTask(SStreamTask* pTask) {
int32_t tId = pTask->historyTaskId.taskId;
if (tId == 0) {
return;
}
ASSERT(pTask->status.downstreamReady == 1);
qDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", pTask->id.idStr,
pTask->historyTaskId.streamId, tId);
// launch associated fill history task
streamLaunchFillHistoryTask(pTask);
}
// only the downstream tasks are ready, set the task to be ready to work. // only the downstream tasks are ready, set the task to be ready to work.
void streamTaskCheckDownstreamTasks(SStreamTask* pTask) { void streamTaskCheckDownstream(SStreamTask* pTask) {
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr); qDebug("s-task:%s fill history task, wait for being launched", pTask->id.idStr);
return; return;
} }
ASSERT(pTask->status.downstreamReady == 0); ASSERT(pTask->status.downstreamReady == 0);
streamTaskDoCheckDownstreamTasks(pTask); doCheckDownstreamStatus(pTask);
} }
// normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause // normal -> pause, pause/stop/dropping -> pause, halt -> pause, scan-history -> pause
@ -882,10 +877,10 @@ void streamTaskResume(SStreamTask* pTask, SStreamMeta* pMeta) {
pTask->status.taskStatus = pTask->status.keepTaskStatus; pTask->status.taskStatus = pTask->status.keepTaskStatus;
pTask->status.keepTaskStatus = TASK_STATUS__NORMAL; pTask->status.keepTaskStatus = TASK_STATUS__NORMAL;
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); qInfo("vgId:%d s-task:%s resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) { } else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1); int32_t num = atomic_sub_fetch_32(&pMeta->pauseTaskNum, 1);
qInfo("vgId:%d s-task:%s sink task.resume from pause, status%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num); qInfo("vgId:%d s-task:%s sink task.resume from pause, status:%s. pause task num:%d", pMeta->vgId, pTask->id.idStr, streamGetTaskStatusStr(status), num);
} else { } else {
qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); qError("s-task:%s not in pause, failed to resume, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
} }