hen(stream): support the rollback and checkdown stream tasks.

This commit is contained in:
Haojun Liao 2023-08-07 16:13:39 +08:00
parent d3bc7c3c94
commit 225c0e61dc
8 changed files with 151 additions and 109 deletions

View File

@ -183,7 +183,8 @@ int32_t streamInit();
void streamCleanUp(); void streamCleanUp();
SStreamQueue* streamQueueOpen(int64_t cap); SStreamQueue* streamQueueOpen(int64_t cap);
void streamQueueClose(SStreamQueue* queue); void streamQueueClose(SStreamQueue* pQueue);
void streamQueueCleanup(SStreamQueue* pQueue);
static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) { static FORCE_INLINE void streamQueueProcessSuccess(SStreamQueue* queue) {
ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING); ASSERT(atomic_load_8(&queue->status) == STREAM_QUEUE__PROCESSING);
@ -270,6 +271,7 @@ typedef struct SStreamStatus {
bool transferState; bool transferState;
int8_t timerActive; // timer is active int8_t timerActive; // timer is active
int8_t pauseAllowed; // allowed task status to be set to be paused int8_t pauseAllowed; // allowed task status to be set to be paused
int32_t stage; // rollback will increase this attribute one for each time
} SStreamStatus; } SStreamStatus;
typedef struct SHistDataRange { typedef struct SHistDataRange {
@ -399,6 +401,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo); int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo);
@ -460,6 +463,7 @@ typedef struct {
int32_t downstreamNodeId; int32_t downstreamNodeId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t childId; int32_t childId;
int32_t stage;
} SStreamTaskCheckReq; } SStreamTaskCheckReq;
typedef struct { typedef struct {
@ -470,6 +474,7 @@ typedef struct {
int32_t downstreamNodeId; int32_t downstreamNodeId;
int32_t downstreamTaskId; int32_t downstreamTaskId;
int32_t childId; int32_t childId;
int32_t stage;
int8_t status; int8_t status;
} SStreamTaskCheckRsp; } SStreamTaskCheckRsp;
@ -607,7 +612,6 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus);
bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskIsIdle(const SStreamTask* pTask);
int32_t streamTaskEndScanWAL(SStreamTask* pTask); int32_t streamTaskEndScanWAL(SStreamTask* pTask);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize); int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize);
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
@ -617,7 +621,9 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId);
void streamTaskCheckDownstreamTasks(SStreamTask* pTask); void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask); int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId); SRpcHandleInfo* pRpcInfo, int32_t taskId);
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp);

View File

@ -60,25 +60,11 @@ FAIL:
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, ver);
pTask->refCnt = 1; if (code != TSDB_CODE_SUCCESS) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); return code;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
return -1;
} }
pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pSnode->msgCb;
pTask->pMeta = pSnode->pMeta;
taosThreadMutexInit(&pTask->lock, NULL);
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
@ -91,10 +77,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0);
ASSERT(pTask->exec.pExecutor); ASSERT(pTask->exec.pExecutor);
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE,
pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel); pTask->id.idStr, pTask->chkInfo.checkpointVer, pTask->info.selfChildId, pTask->info.taskLevel);
@ -344,7 +327,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask, req.stage);
streamMetaReleaseTask(pSnode->pMeta, pTask); streamMetaReleaseTask(pSnode->pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
@ -352,9 +335,8 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
") from task:0x%x (vgId:%d), rsp status %d", taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} }
SEncoder encoder; SEncoder encoder;

View File

@ -193,40 +193,35 @@ static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
void tqNotifyClose(STQ* pTq) { void tqNotifyClose(STQ* pTq) {
if (pTq != NULL) { if (pTq != NULL) {
taosWLockLatch(&pTq->pStreamMeta->lock); SStreamMeta* pMeta = pTq->pStreamMeta;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) { if (pIter == NULL) {
break; break;
} }
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set closing flag", pTq->pStreamMeta->vgId, pTask->id.idStr); tqDebug("vgId:%d s-task:%s set closing flag", pMeta->vgId, pTask->id.idStr);
pTask->status.taskStatus = TASK_STATUS__STOP; streamTaskStop(pTask);
int64_t st = taosGetTimestampMs();
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pTq->pStreamMeta->vgId, pTask->id.idStr, el);
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pMeta->lock);
tqDebug("vgId:%d start to check all tasks", pTq->pStreamMeta->vgId); tqDebug("vgId:%d start to check all tasks", pMeta->vgId);
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pTq->pStreamMeta)) { while (hasStreamTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pTq->pStreamMeta->vgId); tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100); taosMsleep(100);
} }
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms",
pTq->pStreamMeta->vgId, el); pMeta->vgId, el);
} }
} }
@ -913,38 +908,21 @@ void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver);
pTask->refCnt = 1; if (code != TSDB_CODE_SUCCESS) {
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; return code;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
tqError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return -1;
} }
pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMsgCb = &pTq->pVnode->msgCb;
pTask->pMeta = pTq->pStreamMeta;
// backup the initial status, and set it to be TASK_STATUS__INIT
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
SStreamTask* pSateTask = pTask; SStreamTask* pStateTask = pTask;
SStreamTask task = {0}; SStreamTask task = {0};
if (pTask->info.fillHistory) { if (pTask->info.fillHistory) {
task.id = pTask->streamTaskId; task.id = pTask->streamTaskId;
task.pMeta = pTask->pMeta; task.pMeta = pTask->pMeta;
pSateTask = &task; pStateTask = &task;
} }
pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pSateTask, false, -1, -1); pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pStateTask, false, -1, -1);
if (pTask->pState == NULL) { if (pTask->pState == NULL) {
return -1; return -1;
} }
@ -1008,7 +986,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
int32_t ver1 = 1; int32_t ver1 = 1;
SMetaInfo info = {0}; SMetaInfo info = {0};
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
ver1 = info.skmVer; ver1 = info.skmVer;
} }
@ -1034,8 +1012,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->status.taskStatus = TASK_STATUS__NORMAL; pTask->status.taskStatus = TASK_STATUS__NORMAL;
} }
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask); streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo; SCheckpointInfo* pChkInfo = &pTask->chkInfo;
@ -1080,14 +1056,16 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
if (pTask != NULL) { if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask); rsp.status = streamTaskCheckStatus(pTask, req.stage);
rsp.stage = pTask->status.stage;
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); pTask->id.idStr, pStatus, rsp.stage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else { } else {
rsp.status = 0; rsp.status = 0;
rsp.stage = 0;
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d", ") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
@ -1882,10 +1860,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// streamTaskUpdateEpInfo(pHistoryTask); // streamTaskUpdateEpInfo(pHistoryTask);
} }
streamMetaReleaseTask(pMeta, pTask);
if (pHistoryTask != NULL) { if (pHistoryTask != NULL) {
streamTaskRestart(pHistoryTask, NULL);
streamMetaReleaseTask(pMeta, pHistoryTask); streamMetaReleaseTask(pMeta, pHistoryTask);
} }
streamTaskRestart(pTask, NULL);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -242,20 +242,20 @@ int32_t streamTaskGetInputQItems(const SStreamTask* pTask) {
return numOfItems1 + numOfItems2; return numOfItems1 + numOfItems2;
} }
// wait for the stream task to be idle
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
// wait for the stream task to be idle const char* id = pTask->id.idStr;
int64_t st = taosGetTimestampMs();
int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) { while (!streamTaskIsIdle(pStreamTask)) {
qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", pTask->id.idStr, qDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pTask->info.taskLevel, pStreamTask->id.idStr); pStreamTask->id.idStr);
taosMsleep(100); taosMsleep(100);
} }
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) { if (el > 0) {
qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", pTask->id.idStr, qDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el);
pStreamTask->id.idStr, el);
} }
} }

View File

@ -255,8 +255,6 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
// add to the ready tasks hash map, not the restored tasks hash map // add to the ready tasks hash map, not the restored tasks hash map
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) { int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded) {
*pAdded = false; *pAdded = false;
int64_t checkpointId = 0;
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) { if (p == NULL) {
if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) {

View File

@ -15,38 +15,45 @@
#include "streamInt.h" #include "streamInt.h"
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
SStreamQueue* streamQueueOpen(int64_t cap) { SStreamQueue* streamQueueOpen(int64_t cap) {
SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue));
if (pQueue == NULL) return NULL; if (pQueue == NULL) {
return NULL;
}
pQueue->queue = taosOpenQueue(); pQueue->queue = taosOpenQueue();
pQueue->qall = taosAllocateQall(); pQueue->qall = taosAllocateQall();
if (pQueue->queue == NULL || pQueue->qall == NULL) { if (pQueue->queue == NULL || pQueue->qall == NULL) {
goto FAIL; if (pQueue->queue) taosCloseQueue(pQueue->queue);
if (pQueue->qall) taosFreeQall(pQueue->qall);
taosMemoryFree(pQueue);
return NULL;
} }
pQueue->status = STREAM_QUEUE__SUCESS; pQueue->status = STREAM_QUEUE__SUCESS;
taosSetQueueCapacity(pQueue->queue, cap); taosSetQueueCapacity(pQueue->queue, cap);
taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024); taosSetQueueMemoryCapacity(pQueue->queue, cap * 1024);
return pQueue; return pQueue;
FAIL:
if (pQueue->queue) taosCloseQueue(pQueue->queue);
if (pQueue->qall) taosFreeQall(pQueue->qall);
taosMemoryFree(pQueue);
return NULL;
} }
void streamQueueClose(SStreamQueue* queue) { void streamQueueClose(SStreamQueue* pQueue) {
while (1) { streamQueueCleanup(pQueue);
void* qItem = streamQueueNextItem(queue);
if (qItem) { taosFreeQall(pQueue->qall);
streamFreeQitem(qItem); taosCloseQueue(pQueue->queue);
} else { taosMemoryFree(pQueue);
break; }
}
void streamQueueCleanup(SStreamQueue* pQueue) {
void* qItem = NULL;
while ((qItem = streamQueueNextItem(pQueue)) != NULL) {
streamFreeQitem(qItem);
} }
taosFreeQall(queue->qall); pQueue->status = STREAM_QUEUE__SUCESS;
taosCloseQueue(queue->queue);
taosMemoryFree(queue);
} }
#if 0 #if 0
@ -107,8 +114,6 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
} }
#endif #endif
#define MAX_STREAM_EXEC_BATCH_NUM 32
#define MIN_STREAM_EXEC_BATCH_NUM 4
// todo refactor: // todo refactor:
// read data from input queue // read data from input queue

View File

@ -193,8 +193,8 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p
return 0; return 0;
} }
int32_t streamTaskCheckStatus(SStreamTask* pTask) { int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage) {
return (pTask->status.downstreamReady == 1)? 1:0; return ((pTask->status.downstreamReady == 1) && (pTask->status.stage == stage))? 1:0;
} }
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
@ -261,10 +261,9 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask, 1); doProcessDownstreamReadyRsp(pTask, 1);
} }
} else { // not ready, wait for 100ms and retry } else { // not ready, wait for 100ms and retry
qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", id, pRsp->downstreamTaskId, qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->stage);
taosMsleep(100); taosMsleep(100);
streamRecheckDownstream(pTask, pRsp); streamRecheckDownstream(pTask, pRsp);
} }
@ -655,6 +654,7 @@ int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq*
if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->stage) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -668,6 +668,7 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->stage) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
} }
@ -681,6 +682,7 @@ int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp*
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->stage) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->status) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -695,6 +697,7 @@ int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp)
if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->childId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->stage) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->status) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -839,7 +842,7 @@ void streamTaskPause(SStreamTask* pTask) {
} }
int64_t el = taosGetTimestampMs() - st; int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s set pause flag, prev:%s, elapsed time:%dms", pMeta->vgId, pTask->id.idStr, qDebug("vgId:%d s-task:%s set pause flag, prev:%s, pause elapsed time:%dms", pMeta->vgId, pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el); streamGetTaskStatusStr(pTask->status.keepTaskStatus), (int32_t)el);
} }

View File

@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <libs/transport/trpc.h> #include "streamInt.h"
#include <streamInt.h>
#include "executor.h" #include "executor.h"
#include "tstream.h" #include "tstream.h"
#include "wal.h" #include "wal.h"
@ -308,6 +307,34 @@ void tFreeStreamTask(SStreamTask* pTask) {
taosMemoryFree(pTask); taosMemoryFree(pTask);
} }
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->inputQueue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) {
qError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
return -1;
}
pTask->tsInfo.init = taosGetTimestampMs();
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
pTask->chkInfo.currentVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
taosThreadMutexInit(&pTask->lock, NULL);
streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0; return 0;
@ -397,3 +424,43 @@ void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SE
// do nothing // do nothing
} }
} }
int32_t streamTaskStop(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int64_t st = taosGetTimestampMs();
const char* id = pTask->id.idStr;
pTask->status.taskStatus = TASK_STATUS__STOP;
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
while (!streamTaskIsIdle(pTask)) {
qDebug("s-task:%s level:%d wait for task to be idle, check again in 100ms", id, pTask->info.taskLevel);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", pMeta->vgId, pTask->id.idStr, el);
return 0;
}
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
// 1. stop task
streamTaskStop(pTask);
// 2. clear state info
streamQueueCleanup(pTask->inputQueue);
streamQueueCleanup(pTask->outputInfo.queue);
taosArrayClear(pTask->checkReqIds);
taosArrayClear(pTask->pRspMsgList);
pTask->status.downstreamReady = 0;
pTask->status.stage += 1;
qDebug("s-task:%s reset downstream status and stage:%d, start to check downstream", pTask->id.idStr,
pTask->status.stage);
// 3. start to check the downstream status
streamTaskCheckDownstreamTasks(pTask);
return 0;
}