diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 2a9974b8fd..3e1827cb97 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -183,7 +183,8 @@ int32_t streamInit();
void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap);
-void streamQueueClose(SStreamQueue* queue);
+void streamQueueClose(SStreamQueue* pQueue);
+void streamQueueCleanup(SStreamQueue* pQueue);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
@@ -270,6 +271,7 @@ typedef struct SStreamStatus {
bool transferState;
int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused
+ int32_t stage; // rollback will increase this attribute one for each time
} SStreamStatus;
typedef struct SHistDataRange {
@@ -399,6 +401,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
+int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
@@ -460,6 +463,7 @@ typedef struct {
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
+ int32_t stage;
} SStreamTaskCheckReq;
typedef struct {
@@ -470,6 +474,7 @@ typedef struct {
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int32_t childId;
+ int32_t stage;
int8_t status;
} SStreamTaskCheckRsp;
@@ -607,7 +612,6 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask);
-SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@@ -617,7 +621,9 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
-int32_t streamTaskCheckStatus(SStreamTask* pTask);
+int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
+int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
+int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);
diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c
index cc394124f4..2a4ee047cd 100644
--- a/source/dnode/snode/src/snode.c
+++ b/source/dnode/snode/src/snode.c
@@ -60,25 +60,11 @@ FAIL:
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
-
- pTask->refCnt = 1;
- pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
-
- pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
- pTask->inputQueue = streamQueueOpen(512 << 10);
- pTask->outputInfo.queue = streamQueueOpen(512 << 10);
-
- if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
- return -1;
+ int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, ver);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
}
- pTask->tsInfo.init = taosGetTimestampMs();
- pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
- pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
- pTask->pMsgCb = &pSnode->msgCb;
- pTask->pMeta = pSnode->pMeta;
- taosThreadMutexInit(&pTask->lock, NULL);
-
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
@@ -91,10 +77,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor);
- taosThreadMutexInit(&pTask->lock, NULL);
- streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask);
-
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
@@ -344,7 +327,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask != NULL) {
- rsp.status = streamTaskCheckStatus(pTask);
+ rsp.status = streamTaskCheckStatus(pTask, req.stage);
streamMetaReleaseTask(pSnode->pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
@@ -352,9 +335,8 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = 0;
- qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
- ") from task:0x%x (vgId:%d), rsp status %d",
- taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
+ qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
+ taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index e94416ff50..cb053a9838 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -193,40 +193,35 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) {
- taosWLockLatch(&pTq->pStreamMeta->lock);
+ SStreamMeta* pMeta = pTq->pStreamMeta;
+ taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
- pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
+ pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
- tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
- pTask->status.taskStatus = TASK_STATUS__STOP;
-
- int64_t st = taosGetTimestampMs();
- qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
-
- int64_t el = taosGetTimestampMs() - st;
- tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
+ tqDebug("vgId:%d s-task:%s set closing flag", pMeta->vgId, pTask->id.idStr);
+ streamTaskStop(pTask);
}
- taosWUnLockLatch(&pTq->pStreamMeta->lock);
+ taosWUnLockLatch(&pMeta->lock);
- tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId);
+ tqDebug("vgId:%d start to check all tasks", pMeta->vgId);
int64_t st = taosGetTimestampMs();
- while (hasStreamTaskInTimer(pTq->pStreamMeta)) {
- tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId);
+ while (hasStreamTaskInTimer(pMeta)) {
+ tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms",
- pTq->pStreamMeta->vgId, el);
+ pMeta->vgId, el);
}
}
@@ -913,38 +908,21 @@ void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode);
- pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
- pTask->refCnt = 1;
- pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
- pTask->inputQueue = streamQueueOpen(512 << 10);
- pTask->outputInfo.queue = streamQueueOpen(512 << 10);
-
- if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
- tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
- return -1;
+ int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
+ if (code != TSDB_CODE_SUCCESS) {
+ return code;
}
- pTask->tsInfo.init = taosGetTimestampMs();
- pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
- pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
- pTask->pMsgCb = &pTq->pVnode->msgCb;
- pTask->pMeta = pTq->pStreamMeta;
-
- // backup the initial status, and set it to be TASK_STATUS__INIT
- pTask->chkInfo.currentVer = ver;
- pTask->dataRange.range.maxVer = ver;
- pTask->dataRange.range.minVer = ver;
-
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
- SStreamTask* pSateTask = pTask;
+ SStreamTask* pStateTask = pTask;
SStreamTask task = {0};
if (pTask->info.fillHistory) {
task.id = pTask->streamTaskId;
task.pMeta = pTask->pMeta;
- pSateTask = &task;
+ pStateTask = &task;
}
- pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1);
+ pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pStateTask, false, -1, -1);
if (pTask->pState == NULL) {
return -1;
}
@@ -1008,7 +986,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t ver1 = 1;
SMetaInfo info = {0};
- int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
+ code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
if (code == TSDB_CODE_SUCCESS) {
ver1 = info.skmVer;
}
@@ -1034,8 +1012,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->status.taskStatus = TASK_STATUS__NORMAL;
}
- taosThreadMutexInit(&pTask->lock, NULL);
- streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
@@ -1080,14 +1056,16 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) {
- rsp.status = streamTaskCheckStatus(pTask);
+ rsp.status = streamTaskCheckStatus(pTask, req.stage);
+ rsp.stage = pTask->status.stage;
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
- tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
- pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
+ tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
+ pTask->id.idStr, pStatus, rsp.stage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = 0;
+ rsp.stage = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
@@ -1882,10 +1860,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// streamTaskUpdateEpInfo(pHistoryTask);
}
- streamMetaReleaseTask(pMeta, pTask);
if (pHistoryTask != NULL) {
+ streamTaskRestart(pHistoryTask, NULL);
streamMetaReleaseTask(pMeta, pHistoryTask);
}
+ streamTaskRestart(pTask, NULL);
+ streamMetaReleaseTask(pMeta, pTask);
+
return TSDB_CODE_SUCCESS;
}
diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c
index e956f22fe6..00fdb6e1ae 100644
--- a/source/libs/stream/src/streamExec.c
+++ b/source/libs/stream/src/streamExec.c
@@ -242,20 +242,20 @@ int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
return numOfItems1 + numOfItems2;
}
+// wait for the stream task to be idle
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
- // wait for the stream task to be idle
- int64_t st = taosGetTimestampMs();
+ const char* id = pTask->id.idStr;
+ int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) {
- qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr,
- pTask->info.taskLevel, pStreamTask->id.idStr);
+ qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
+ pStreamTask->id.idStr);
taosMsleep(100);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) {
- qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr,
- pStreamTask->id.idStr, el);
+ qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el);
}
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 8f58843c0a..80f59e51ac 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -255,8 +255,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
// add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false;
- int64_t checkpointId = 0;
-
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {
diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c
index 8bb0e3b506..2f90ad4512 100644
--- a/source/libs/stream/src/streamQueue.c
+++ b/source/libs/stream/src/streamQueue.c
@@ -15,38 +15,45 @@
#include "streamInt.h"
+#define MAX_STREAM_EXEC_BATCH_NUM 32
+#define MIN_STREAM_EXEC_BATCH_NUM 4
+
SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
- if (pQueue == NULL) return NULL;
+ if (pQueue == NULL) {
+ return NULL;
+ }
+
pQueue->queue = taosOpenQueue();
pQueue->qall = taosAllocateQall();
+
if (pQueue->queue == NULL || pQueue->qall == NULL) {
- goto FAIL;
+ if (pQueue->queue) taosCloseQueue(pQueue->queue);
+ if (pQueue->qall) taosFreeQall(pQueue->qall);
+ taosMemoryFree(pQueue);
+ return NULL;
}
+
pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->queue, cap);
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
return pQueue;
-
-FAIL:
- if (pQueue->queue) taosCloseQueue(pQueue->queue);
- if (pQueue->qall) taosFreeQall(pQueue->qall);
- taosMemoryFree(pQueue);
- return NULL;
}
-void streamQueueClose(SStreamQueue* queue) {
- while (1) {
- void* qItem = streamQueueNextItem(queue);
- if (qItem) {
- streamFreeQitem(qItem);
- } else {
- break;
- }
+void streamQueueClose(SStreamQueue* pQueue) {
+ streamQueueCleanup(pQueue);
+
+ taosFreeQall(pQueue->qall);
+ taosCloseQueue(pQueue->queue);
+ taosMemoryFree(pQueue);
+}
+
+void streamQueueCleanup(SStreamQueue* pQueue) {
+ void* qItem = NULL;
+ while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
+ streamFreeQitem(qItem);
}
- taosFreeQall(queue->qall);
- taosCloseQueue(queue->queue);
- taosMemoryFree(queue);
+ pQueue->status = STREAM_QUEUE__SUCESS;
}
#if 0
@@ -107,8 +114,6 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
}
#endif
-#define MAX_STREAM_EXEC_BATCH_NUM 32
-#define MIN_STREAM_EXEC_BATCH_NUM 4
// todo refactor:
// read data from input queue
diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c
index d77a07a516..7b24368007 100644
--- a/source/libs/stream/src/streamRecover.c
+++ b/source/libs/stream/src/streamRecover.c
@@ -193,8 +193,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
return 0;
}
-int32_t streamTaskCheckStatus(SStreamTask* pTask) {
- return (pTask->status.downstreamReady == 1)? 1:0;
+int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage) {
+ return ((pTask->status.downstreamReady == 1) && (pTask->status.stage == stage))? 1:0;
}
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
@@ -261,10 +261,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask, 1);
}
} else { // not ready, wait for 100ms and retry
- qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId,
- pRsp->downstreamNodeId);
+ qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
+ pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->stage);
taosMsleep(100);
-
streamRecheckDownstream(pTask, pRsp);
}
@@ -655,6 +654,7 @@ int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq*
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
+ if (tEncodeI32(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@@ -668,6 +668,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
@@ -681,6 +682,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp*
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
+ if (tEncodeI32(pEncoder, pRsp->stage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
@@ -695,6 +697,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
+ if (tDecodeI32(pDecoder, &pRsp->stage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder);
return 0;
@@ -839,7 +842,7 @@ void streamTaskPause(SStreamTask* pTask) {
}
int64_t el = taosGetTimestampMs() - st;
- qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
+ qDebug("vgId:%d s-task:%s set pause flag, prev:%s, pause elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
}
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 43aeeba40f..b3ec5d5868 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -13,8 +13,7 @@
* along with this program. If not, see .
*/
-#include
-#include
+#include "streamInt.h"
#include "executor.h"
#include "tstream.h"
#include "wal.h"
@@ -308,6 +307,34 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask);
}
+int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
+ pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
+ pTask->refCnt = 1;
+ pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
+ pTask->inputQueue = streamQueueOpen(512 << 10);
+ pTask->outputInfo.queue = streamQueueOpen(512 << 10);
+
+ if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
+ qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
+ return -1;
+ }
+
+ pTask->tsInfo.init = taosGetTimestampMs();
+ pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
+ pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
+ pTask->pMeta = pMeta;
+
+ pTask->chkInfo.currentVer = ver;
+ pTask->dataRange.range.maxVer = ver;
+ pTask->dataRange.range.minVer = ver;
+ pTask->pMsgCb = pMsgCb;
+
+ taosThreadMutexInit(&pTask->lock, NULL);
+ streamTaskOpenAllUpstreamInput(pTask);
+
+ return TSDB_CODE_SUCCESS;
+}
+
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0;
@@ -397,3 +424,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
// do nothing
}
}
+
+int32_t streamTaskStop(SStreamTask* pTask) {
+ SStreamMeta* pMeta = pTask->pMeta;
+ int64_t st = taosGetTimestampMs();
+ const char* id = pTask->id.idStr;
+
+ pTask->status.taskStatus = TASK_STATUS__STOP;
+ qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
+
+ while (!streamTaskIsIdle(pTask)) {
+ qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
+ taosMsleep(100);
+ }
+
+ int64_t el = taosGetTimestampMs() - st;
+ qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
+ return 0;
+}
+
+int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
+ // 1. stop task
+ streamTaskStop(pTask);
+
+ // 2. clear state info
+ streamQueueCleanup(pTask->inputQueue);
+ streamQueueCleanup(pTask->outputInfo.queue);
+ taosArrayClear(pTask->checkReqIds);
+ taosArrayClear(pTask->pRspMsgList);
+
+ pTask->status.downstreamReady = 0;
+ pTask->status.stage += 1;
+
+ qDebug("s-task:%s reset downstream status and stage:%d, start to check downstream", pTask->id.idStr,
+ pTask->status.stage);
+
+ // 3. start to check the downstream status
+ streamTaskCheckDownstreamTasks(pTask);
+ return 0;
+}
+