opti:common message process logic in vnode and snode

This commit is contained in:
wangmm0220 2023-11-22 16:53:33 +08:00
parent 04e96b6d92
commit 9a09e318ca
18 changed files with 864 additions and 1522 deletions

View File

@ -1,18 +0,0 @@
//
// Created by mingming wanng on 2023/11/15.
//
#ifndef TDENGINE_STREAM_H
#define TDENGINE_STREAM_H
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
typedef struct STaskUpdateEntry {
int64_t streamId;
int32_t taskId;
int32_t transId;
} STaskUpdateEntry;
#endif // TDENGINE_STREAM_H

View File

@ -51,6 +51,10 @@ extern "C" {
(_t)->hTaskInfo.id.streamId = 0; \ (_t)->hTaskInfo.id.streamId = 0; \
} while (0) } while (0)
#define STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID (-1)
#define STREAM_EXEC_START_ALL_TASKS_ID (-2)
#define STREAM_EXEC_RESTART_ALL_TASKS_ID (-3)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue; typedef struct SStreamQueue SStreamQueue;
typedef struct SStreamTaskSM SStreamTaskSM; typedef struct SStreamTaskSM SStreamTaskSM;
@ -849,6 +853,23 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed); int8_t isSucceed);
// message process
int32_t streamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart);
int32_t streamTaskProcesUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored);
int32_t streamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t streamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t streamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored);
int32_t streamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t streamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t startStreamTasks(SStreamMeta* pMeta);
int32_t resetStreamTaskStatus(SStreamMeta* pMeta);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -193,10 +193,6 @@ int32_t smPutNodeMsgToStreamQueue(SSnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SSingleWorker *pWorker = &pMgmt->streamWorker; SSingleWorker *pWorker = &pMgmt->streamWorker;
dTrace("msg:%p, put into worker %s", pMsg, pWorker->name); dTrace("msg:%p, put into worker %s", pMsg, pWorker->name);
if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
sndEnqueueStreamDispatch(pMgmt->pSnode, pMsg);
} else {
taosWriteQitem(pWorker->queue, pMsg); taosWriteQitem(pWorker->queue, pMsg);
}
return 0; return 0;
} }

View File

@ -206,11 +206,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
break; break;
case STREAM_QUEUE: case STREAM_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
if (pMsg->msgType == TDMT_STREAM_TASK_DISPATCH) {
vnodeEnqueueStreamMsg(pVnode->pImpl, pMsg);
} else {
taosWriteQitem(pVnode->pStreamQ, pMsg); taosWriteQitem(pVnode->pStreamQ, pMsg);
}
break; break;
case FETCH_QUEUE: case FETCH_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg); dGTrace("vgId:%d, msg:%p put into vnode-fetch queue", pVnode->vgId, pMsg);

View File

@ -3,7 +3,6 @@ add_library(snode STATIC ${SNODE_SRC})
target_include_directories( target_include_directories(
snode snode
PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode" PUBLIC "${TD_SOURCE_DIR}/include/dnode/snode"
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
private "${CMAKE_CURRENT_SOURCE_DIR}/inc" private "${CMAKE_CURRENT_SOURCE_DIR}/inc"
) )
target_link_libraries( target_link_libraries(

View File

@ -18,7 +18,6 @@
#include "sndInt.h" #include "sndInt.h"
#include "tstream.h" #include "tstream.h"
#include "tuuid.h" #include "tuuid.h"
#include "stream.h"
#define sndError(...) \ #define sndError(...) \
do { \ do { \
@ -41,45 +40,6 @@
} \ } \
} while (0) } while (0)
void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamDispatchReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
goto FAIL;
}
tDecoderClear(&decoder);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
if (pTask) {
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
streamProcessDispatchMsg(pTask, &req, &rsp);
tDeleteStreamDispatchReq(&req);
streamMetaReleaseTask(pSnode->pMeta, pTask);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
return;
} else {
tDeleteStreamDispatchReq(&req);
return;
}
FAIL:
if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = { .code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
rpcFreeCont(pMsg->pCont);
taosFreeQitem(pMsg);
}
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
@ -130,159 +90,29 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1; pTask->chkInfo.nextProcessVer = pTask->chkInfo.checkpointVer + 1;
sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr, sndInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " nextProcessVer:%" PRId64, pTask->id.idStr,
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer); pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
} else {
if (pTask->chkInfo.nextProcessVer == -1) {
pTask->chkInfo.nextProcessVer = 0;
}
} }
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pTask, &p); streamTaskGetStatus(pTask, &p);
sndInfo("snode:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 if (pTask->info.fillHistory) {
" nextProcessVer:%" PRId64 " child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms", sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory, pTask->info.triggerParam); pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else {
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam);
}
return 0; return 0;
} }
int32_t sndStartStreamTasks(SSnode* pSnode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = SNODE_HANDLE;
SStreamMeta* pMeta = pSnode->pMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
sndDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
sndDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return code;
}
int32_t sndResetStreamTaskStatus(SSnode* pSnode) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
sndDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
}
int32_t sndRestartStreamTasks(SSnode* pSnode) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
sndDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
sndInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
sndDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
sndError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
sndInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pMeta);
if (code != TSDB_CODE_SUCCESS) {
sndError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
sndInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
sndResetStreamTaskStatus(pSnode);
streamMetaWUnLock(pMeta);
sndStartStreamTasks(pSnode);
code = terrno;
return code;
}
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode)); SSnode *pSnode = taosMemoryCalloc(1, sizeof(SSnode));
if (pSnode == NULL) { if (pSnode == NULL) {
@ -318,8 +148,8 @@ FAIL:
} }
int32_t sndInit(SSnode * pSnode) { int32_t sndInit(SSnode * pSnode) {
sndResetStreamTaskStatus(pSnode); resetStreamTaskStatus(pSnode->pMeta);
sndStartStreamTasks(pSnode); startStreamTasks(pSnode->pMeta);
return 0; return 0;
} }
@ -331,577 +161,26 @@ void sndClose(SSnode *pSnode) {
taosMemoryFree(pSnode); taosMemoryFree(pSnode);
} }
int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad) { return 0; }
int32_t sndStartStreamTaskAsync(SSnode* pSnode, bool restart) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
sndDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sndError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
return -1;
}
sndDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pSnode->msgCb, STREAM_QUEUE, &msg);
return 0;
}
int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) {
int32_t code;
// 1.deserialize msg and build task
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msg, msgLen);
code = tDecodeStreamTask(&decoder, pTask);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG);
// 2.save task
streamMetaWLock(pSnode->pMeta);
bool added = false;
code = streamMetaRegisterTask(pSnode->pMeta, -1, pTask, &added);
if (code < 0) {
streamMetaWUnLock(pSnode->pMeta);
return -1;
}
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
streamMetaWUnLock(pSnode->pMeta);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
sndDebug("snode:%d s-task:%s is deployed on snode and add into meta, status:%s, numOfTasks:%d", SNODE_HANDLE,
pTask->id.idStr, p, numOfTasks);
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(pTask->status.pSM, event);
return 0;
}
int32_t sndProcessTaskDropReq(SSnode *pSnode, char *msg, int32_t msgLen) {
SVDropStreamTaskReq *pReq = (SVDropStreamTaskReq *)msg;
sndDebug("snode:%d receive msg to drop stream task:0x%x", pSnode->pMeta->vgId, pReq->taskId);
streamMetaUnregisterTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
// commit the update
streamMetaWLock(pSnode->pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pSnode->pMeta);
sndDebug("vgId:%d task:0x%x dropped, remain tasks:%d", pSnode->pMeta->vgId, pReq->taskId, numOfTasks);
if (streamMetaCommit(pSnode->pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pSnode->pMeta);
return 0;
}
int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTaskRunReq *pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
sndStartStreamTasks(pSnode);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
sndRestartStreamTasks(pSnode);
return 0;
}
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId);
if (pTask) {
streamExecTask(pTask);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
} else {
return -1;
}
}
int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp);
tDeleteStreamDispatchReq(&req);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
} else {
tDeleteStreamDispatchReq(&req);
return -1;
}
}
int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamRetrieveReq req;
SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.dstTaskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
tDeleteStreamRetrieveReq(&req);
return 0;
} else {
return -1;
}
}
int32_t sndProcessTaskDispatchRsp(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamDispatchRsp *pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
pRsp->stage = htobe64(pRsp->stage);
pRsp->msgId = htonl(pRsp->msgId);
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
} else {
return -1;
}
}
int32_t sndProcessTaskRetrieveRsp(SSnode *pSnode, SRpcMsg *pMsg) {
//
return 0;
}
int32_t sndProcessTaskScanHistoryFinishReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamScanHistoryFinishReq req;
SDecoder decoder;
tDecoderInit(&decoder, msg, msgLen);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
// find task
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
return -1;
}
// do process request
if (streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info) < 0) {
streamMetaReleaseTask(pSnode->pMeta, pTask);
return -1;
}
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
int32_t sndProcessTaskScanHistoryFinishRsp(SSnode *pSnode, SRpcMsg *pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamCompleteHistoryMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
sndError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pSnode->pMeta->vgId, req.upstreamTaskId);
return -1;
}
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
sndDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
pTask->id.idStr, req.downstreamId, remain);
} else {
sndDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pSnode->pMeta, pTask);
return 0;
}
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
int32_t sndProcessTaskCheckpointReadyMsg(SSnode *pSnode, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pSnode->pMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamCheckpointReadyMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
sndError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", pMeta->vgId, req.downstreamTaskId);
return code;
}
sndDebug("snode vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", pMeta->vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
char *msgStr = pMsg->pCont;
char *msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t *)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pSnode->pMeta, pTask);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
sndDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, p, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
sndDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
SEncoder encoder;
int32_t code;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, &rsp, len, code);
if (code < 0) {
sndError("vgId:%d failed to encode task check rsp, task:0x%x", pSnode->pMeta->vgId, taskId);
return -1;
}
void *buf = rpcMallocCont(sizeof(SMsgHead) + len);
((SMsgHead *)buf)->vgId = htonl(req.upstreamNodeId);
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t *)abuf, len);
tEncodeStreamTaskCheckRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t sndProcessStreamTaskCheckRsp(SSnode* pSnode, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
tDecoderClear(&decoder);
return -1;
}
tDecoderClear(&decoder);
sndDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pSnode->pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
sndError("tq failed to locate the stream task:0x%x (vgId:%d), it may have been destroyed", rsp.upstreamTaskId,
pSnode->pMeta->vgId);
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pSnode->pMeta, pTask);
return code;
}
int32_t sndProcessTaskUpdateReq(SSnode* pSnode, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pSnode->pMeta;
int32_t vgId = SNODE_HANDLE;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
sndError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return rsp.code;
}
tDecoderClear(&decoder);
// update the nodeEpset when it exists
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
sndError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
SStreamTask* pTask = *ppTask;
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
sndInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
sndDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
}
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) {
sndDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
streamMetaWUnLock(pMeta);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
sndError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
sndDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
// keep the already handled info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
sndDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
sndDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
}
rsp.code = 0;
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) {
sndDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else {
sndDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1
sndStartStreamTaskAsync(pSnode, true);
streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return sndProcessTaskRunReq(pSnode, pMsg); return streamTaskProcessRunReq(pSnode->pMeta, pMsg, true);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
return sndProcessTaskDispatchReq(pSnode, pMsg, true); return streamTaskProcessDispatchReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
return sndProcessTaskDispatchRsp(pSnode, pMsg); return streamTaskProcessDispatchRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE: case TDMT_STREAM_RETRIEVE:
return sndProcessTaskRetrieveReq(pSnode, pMsg); return streamTaskProcessRetrieveReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_RSP:
return sndProcessTaskRetrieveRsp(pSnode, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH: case TDMT_VND_STREAM_SCAN_HISTORY_FINISH:
return sndProcessTaskScanHistoryFinishReq(pSnode, pMsg); return streamTaskProcessScanHistoryFinishReq(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP: case TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP:
return sndProcessTaskScanHistoryFinishRsp(pSnode, pMsg); return streamTaskProcessScanHistoryFinishRsp(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_TASK_CHECK: case TDMT_VND_STREAM_TASK_CHECK:
return sndProcessStreamTaskCheckReq(pSnode, pMsg); return streamTaskProcessCheckReq(pSnode->pMeta, pMsg);
case TDMT_VND_STREAM_TASK_CHECK_RSP: case TDMT_VND_STREAM_TASK_CHECK_RSP:
return sndProcessStreamTaskCheckRsp(pSnode, pMsg); return streamTaskProcessCheckRsp(pSnode->pMeta, pMsg, true);
case TDMT_STREAM_TASK_CHECKPOINT_READY: case TDMT_STREAM_TASK_CHECKPOINT_READY:
return sndProcessTaskCheckpointReadyMsg(pSnode, pMsg); return streamTaskProcessCheckpointReadyMsg(pSnode->pMeta, pMsg);
default: default:
ASSERT(0); ASSERT(0);
} }
@ -913,14 +192,13 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); void *pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
return sndProcessTaskDeployReq(pSnode, pReq, len); return streamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true);
} }
case TDMT_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
return sndProcessTaskDropReq(pSnode, pMsg->pCont, pMsg->contLen); return streamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE:
sndProcessTaskUpdateReq(pSnode, pMsg); return streamTaskProcesUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
break;
default: default:
ASSERT(0); ASSERT(0);
} }

View File

@ -138,11 +138,6 @@ else()
endif() endif()
endif() endif()
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/include/dnode/vnode"
)
target_link_libraries( target_link_libraries(
vnode vnode
PUBLIC os PUBLIC os

View File

@ -253,8 +253,6 @@ bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr); int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet);
int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg);
// sma // sma
int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days); int32_t smaGetTSmaDays(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days);

View File

@ -153,7 +153,6 @@ char* tqOffsetBuildFName(const char* path, int32_t fVer);
int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname); int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
// tqStream // tqStream
int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq);
// tq util // tq util

View File

@ -233,7 +233,6 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart);
int32_t tqRestartStreamTasks(STQ* pTq); int32_t tqRestartStreamTasks(STQ* pTq);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);
@ -262,7 +261,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t version, char* msg, int32_t msg
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec); int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -15,7 +15,6 @@
#include "tq.h" #include "tq.h"
#include "vnd.h" #include "vnd.h"
#include "stream.h"
typedef struct { typedef struct {
int8_t inited; int8_t inited;
@ -891,172 +890,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
} }
int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; return streamTaskProcessCheckReq(pTq->pStreamMeta, pMsg);
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
} }
int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); return streamTaskProcessCheckRsp(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pTq->pStreamMeta->vgId;
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
tqError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
return -1;
}
tDecoderClear(&decoder);
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code;
}
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, pTq->pStreamMeta->vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code;
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
int32_t code = 0; return streamTaskProcessDeployReq(pTq->pStreamMeta, sversion, msg, msgLen, vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
int32_t vgId = TD_VID(pTq->pVnode);
if (tsDisableStream) {
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
return code;
}
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
// 1.deserialize msg and build task
int32_t size = sizeof(SStreamTask);
SStreamTask* pTask = taosMemoryCalloc(1, size);
if (pTask == NULL) {
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
return TSDB_CODE_OUT_OF_MEMORY;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTask);
return TSDB_CODE_INVALID_MSG;
}
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the latest commit version as the initial start version of stream task.
int32_t taskId = pTask->id.taskId;
int64_t streamId = pTask->id.streamId;
bool added = false;
streamMetaWLock(pStreamMeta);
code = streamMetaRegisterTask(pStreamMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pStreamMeta);
streamMetaWUnLock(pStreamMeta);
if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
tFreeStreamTask(pTask);
return code;
}
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
// it is added into the meta store
if (added) {
// only handled in the leader node
if (vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
bool restored = pTq->pVnode->restored;
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
}
if (p != NULL) {
streamMetaReleaseTask(pStreamMeta, p);
}
} else {
tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
}
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
}
return code;
} }
static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) { static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask, STQ* pTq) {
@ -1244,190 +1086,39 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
// only the agg tasks and the sink tasks will receive this message from upstream tasks // only the agg tasks and the sink tasks will receive this message from upstream tasks
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); return streamTaskProcessScanHistoryFinishReq(pTq->pStreamMeta, pMsg);
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamScanHistoryFinishReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.downstreamTaskId);
return -1;
}
tqDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return code;
} }
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); return streamTaskProcessScanHistoryFinishRsp(pTq->pStreamMeta, pMsg);
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamCompleteHistoryMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pTq->pStreamMeta->vgId, req.upstreamTaskId);
return -1;
}
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
tqDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
pTask->id.idStr, req.downstreamId, remain);
} else {
tqDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} }
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont; SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
int32_t vgId = TD_VID(pTq->pVnode);
if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal if (taskId == STREAM_EXEC_EXTRACT_DATA_IN_WAL_ID) { // all tasks are extracted submit data from the wal
tqScanWal(pTq); tqScanWal(pTq);
return 0; return 0;
} else if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
tqStartStreamTasks(pTq);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
tqRestartStreamTasks(pTq);
return 0;
} }
int32_t code = streamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, taskId); if(code == 0 && taskId > 0){
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, p, status);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqScanWalAsync(pTq, false); tqScanWalAsync(pTq, false);
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId);
return -1;
} }
return code;
} }
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; return streamTaskProcessDispatchReq(pTq->pStreamMeta, pMsg);
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessDispatchMsg(pTask, &req, &rsp);
tDeleteStreamDispatchReq(&req);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
} else {
tqError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
pTq->pStreamMeta->vgId, req.taskId);
tDeleteStreamDispatchReq(&req);
return -1;
}
} }
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); return streamTaskProcessDispatchRsp(pTq->pStreamMeta, pMsg);
int32_t vgId = pTq->pStreamMeta->vgId;
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
pRsp->stage = htobe64(pRsp->stage);
pRsp->msgId = htonl(pRsp->msgId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return TSDB_CODE_SUCCESS;
} else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return terrno;
}
} }
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; return streamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
// drop the related fill-history task firstly
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id;
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
tqDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
}
streamMetaReleaseTask(pMeta, pTask);
}
// drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
// commit the update
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pMeta);
return 0;
} }
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
@ -1537,30 +1228,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont; return streamTaskProcessRetrieveReq(pTq->pStreamMeta, pMsg);
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
SStreamRetrieveReq req;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.dstTaskId);
if (pTask == NULL) {
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", vgId,
req.dstTaskId);
return -1;
}
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tDeleteStreamRetrieveReq(&req);
return 0;
} }
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
@ -1779,219 +1447,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task // downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); return streamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamCheckpointReadyMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return code;
}
tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
} }
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamMeta* pMeta = pTq->pStreamMeta; return streamTaskProcesUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored);
int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
tqError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return rsp.code;
}
tDecoderClear(&decoder);
// update the nodeEpset when it exists
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
SStreamTask* pTask = *ppTask;
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
}
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) {
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
streamMetaWUnLock(pMeta);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
// keep the already handled info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
}
rsp.code = 0;
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1
tqStartStreamTaskAsync(pTq, true);
streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
tqResetStreamTaskStatus(pTq);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
}
taosArrayDestroy(req.pNodeList);
return rsp.code;
} }
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -15,7 +15,6 @@
#include "tq.h" #include "tq.h"
#include "vnd.h" #include "vnd.h"
#include "stream.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3 #define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100 #define SCAN_WAL_IDLE_DURATION 100
@ -61,154 +60,6 @@ int32_t tqScanWal(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStartStreamTasks(STQ* pTq) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
tqDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return code;
}
int32_t tqRestartStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pTq->pStreamMeta);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqResetStreamTaskStatus(pTq);
streamMetaWUnLock(pMeta);
tqStartStreamTasks(pTq);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
code = terrno;
return code;
}
int32_t tqStartStreamTaskAsync(STQ* pTq, bool restart) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
return -1;
}
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
return 0;
}
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
@ -298,27 +149,6 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0; return 0;
} }
int32_t tqResetStreamTaskStatus(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
}
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
// seek the stored version and extract data from WAL // seek the stored version and extract data from WAL
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader); int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);

View File

@ -463,7 +463,6 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
break; break;
} }
_exit:
if (code) { if (code) {
vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code), vError("vgId:%d, failed to preprocess write request since %s, msg type:%s", TD_VID(pVnode), tstrerror(code),
TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
@ -762,7 +761,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TASK_RUN: case TDMT_STREAM_TASK_RUN:
return tqProcessTaskRunReq(pVnode->pTq, pMsg); return tqProcessTaskRunReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH: case TDMT_STREAM_TASK_DISPATCH:
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true); return tqProcessTaskDispatchReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_DISPATCH_RSP: case TDMT_STREAM_TASK_DISPATCH_RSP:
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg); return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TASK_CHECK: case TDMT_VND_STREAM_TASK_CHECK:

View File

@ -569,8 +569,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqResetStreamTaskStatus(pVnode->pTq); resetStreamTaskStatus(pVnode->pTq->pStreamMeta);
tqStartStreamTaskAsync(pVnode->pTq, false); streamTaskStartAsync(pMeta, &pVnode->msgCb, false);
} }
} else { } else {
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);

View File

@ -52,6 +52,12 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct STaskUpdateEntry {
int64_t streamId;
int32_t taskId;
int32_t transId;
} STaskUpdateEntry;
typedef struct { typedef struct {
int8_t type; int8_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;

View File

@ -276,6 +276,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code)); stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
terrno = code;
return code; return code;
} }

View File

@ -411,7 +411,6 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
stDebug("s-task:%s should stop, do not do check downstream again", id); stDebug("s-task:%s should stop, do not do check downstream again", id);
@ -463,14 +462,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", " "s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%"PRId64", current stage:%"PRId64", "
"not check wait for downstream task nodeUpdate, and all tasks restart", "not check wait for downstream task nodeUpdate, and all tasks restart",
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage); id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
} else { } else {
stError( stError(
"s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check " "s-task:%s downstream taskId:0x%x (vgId:%d) not leader, self dispatch epset needs to be updated, not check "
"downstream again, nodeUpdate needed", "downstream again, nodeUpdate needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
} }
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false); streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, taosGetTimestampMs(), false);
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms

View File

@ -780,3 +780,786 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->activeCheckpointId = pSrc->activeCheckpointId;
pDst->checkpointFailed = pSrc->checkpointFailed; pDst->checkpointFailed = pSrc->checkpointFailed;
} }
int32_t streamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
stDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
return -1;
}
stDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
pRunReq->head.vgId = vgId;
pRunReq->streamId = 0;
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
return 0;
}
int32_t streamTaskProcesUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamTaskUpdateMsg(&decoder, &req) < 0) {
rsp.code = TSDB_CODE_MSG_DECODE_ERROR;
stError("vgId:%d failed to decode task update msg, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return rsp.code;
}
tDecoderClear(&decoder);
// update the nodeEpset when it exists
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
SStreamTask* pTask = *ppTask;
if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId;
stInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", pTask->id.idStr, req.transId);
// info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks);
} else {
stDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
}
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
if (exist != NULL) {
stDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
req.transId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
streamMetaWUnLock(pMeta);
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask);
// continue after lock the meta again
streamMetaWLock(pMeta);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
stError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
pMeta->vgId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
stDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
}
}
{
streamMetaSaveTask(pMeta, pTask);
if (ppHTask != NULL) {
streamMetaSaveTask(pMeta, *ppHTask);
}
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
}
streamTaskStop(pTask);
// keep the already handled info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
stDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
stDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
}
rsp.code = 0;
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.tasksWillRestart = 1;
if (updateTasks < numOfTasks) {
stDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta);
} else {
if (!restored) {
stDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
stDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
#if 1
streamTaskStartAsync(pMeta, cb, true);
streamMetaWUnLock(pMeta);
#else
streamMetaWUnLock(pMeta);
// For debug purpose.
// the following procedure consume many CPU resource, result in the re-election of leader
// with high probability. So we employ it as a test case for the stream processing framework, with
// checkpoint/restart/nodeUpdate etc.
while (1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
tqDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
int32_t code = streamMetaReopen(pMeta);
if (code != 0) {
tqError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
streamMetaInitBackend(pMeta);
if (streamMetaLoadAllTasks(pTq->pStreamMeta) < 0) {
tqError("vgId:%d failed to load stream tasks", vgId);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return -1;
}
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
tqInfo("vgId:%d start all stream tasks after all being updated", vgId);
resetStreamTaskStatus(pTq->pStreamMeta);
tqStartStreamTaskAsync(pTq, false);
} else {
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
}
streamMetaWUnLock(pMeta);
#endif
}
}
taosArrayDestroy(req.pNodeList);
return rsp.code;
}
int32_t streamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamDispatchReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
tDecoderClear(&decoder);
return TSDB_CODE_MSG_DECODE_ERROR;
}
tDecoderClear(&decoder);
stDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){
return -1;
}
tDeleteStreamDispatchReq(&req);
streamMetaReleaseTask(pMeta, pTask);
return 0;
} else {
stError("vgId:%d failed to find task:0x%x to handle the dispatch req, it may have been destroyed already",
pMeta->vgId, req.taskId);
SMsgHead* pRspHead = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
if (pRspHead == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stError("s-task:0x%x send dispatch error rsp, out of memory", req.taskId);
return -1;
}
pRspHead->vgId = htonl(req.upstreamNodeId);
ASSERT(pRspHead->vgId != 0);
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pRspHead, sizeof(SMsgHead));
pRsp->streamId = htobe64(req.streamId);
pRsp->upstreamTaskId = htonl(req.upstreamTaskId);
pRsp->upstreamNodeId = htonl(req.upstreamNodeId);
pRsp->downstreamNodeId = htonl(pMeta->vgId);
pRsp->downstreamTaskId = htonl(req.taskId);
pRsp->msgId = htonl(req.msgId);
pRsp->stage = htobe64(req.stage);
pRsp->inputStatus = TASK_OUTPUT_STATUS__NORMAL;
int32_t len = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
SRpcMsg rsp = {.code = TSDB_CODE_STREAM_TASK_NOT_EXIST, .info = pMsg->info, .contLen = len, .pCont = pRspHead};
stError("s-task:0x%x send dispatch error rsp, no task", req.taskId);
tmsgSendRsp(&rsp);
tDeleteStreamDispatchReq(&req);
return 0;
}
}
int32_t streamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t vgId = pMeta->vgId;
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
pRsp->stage = htobe64(pRsp->stage);
pRsp->msgId = htonl(pRsp->msgId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
} else {
stDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return terrno;
}
}
int32_t streamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
SStreamRetrieveReq req;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId);
if (pTask == NULL) {
stError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.dstTaskId);
return -1;
}
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamProcessRetrieveReq(pTask, &req, &rsp);
streamMetaReleaseTask(pMeta, pTask);
tDeleteStreamRetrieveReq(&req);
return 0;
}
int32_t streamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamScanHistoryFinishReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
stError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
pMeta->vgId, req.downstreamTaskId);
return -1;
}
stDebug("s-task:%s receive scan-history finish msg from task:0x%x", pTask->id.idStr, req.upstreamTaskId);
int32_t code = streamProcessScanHistoryFinishReq(pTask, &req, &pMsg->info);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t streamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
// deserialize
SStreamCompleteHistoryMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
tDecodeCompleteHistoryDataMsg(&decoder, &req);
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
stError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
pMeta->vgId, req.upstreamTaskId);
return -1;
}
int32_t remain = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
if (remain > 0) {
stDebug("s-task:%s scan-history finish rsp received from downstream task:0x%x, unfinished remain:%d",
pTask->id.idStr, req.downstreamId, remain);
} else {
stDebug(
"s-task:%s scan-history finish rsp received from downstream task:0x%x, all downstream tasks rsp scan-history "
"completed msg",
pTask->id.idStr, req.downstreamId);
streamProcessScanHistoryFinishRsp(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
int32_t streamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
SStreamTaskCheckReq req;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamTaskCheckReq(&decoder, &req);
tDecoderClear(&decoder);
int32_t taskId = req.downstreamTaskId;
SStreamTaskCheckRsp rsp = {
.reqId = req.reqId,
.streamId = req.streamId,
.childId = req.childId,
.downstreamNodeId = req.downstreamNodeId,
.downstreamTaskId = req.downstreamTaskId,
.upstreamNodeId = req.upstreamNodeId,
.upstreamTaskId = req.upstreamTaskId,
};
// only the leader node handle the check request
if (pMeta->role == NODE_ROLE_FOLLOWER) {
stError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage);
streamMetaReleaseTask(pMeta, pTask);
char* p = NULL;
streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
}
int32_t streamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t vgId = pMeta->vgId;
int32_t code;
SStreamTaskCheckRsp rsp;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
code = tDecodeStreamTaskCheckRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
stError("vgId:%d failed to parse check rsp msg, code:%s", vgId, tstrerror(terrno));
return -1;
}
tDecoderClear(&decoder);
stDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!isLeader) {
stError("vgId:%d not leader, task:0x%x not handle the check rsp, downstream:0x%x (vgId:%d)", vgId,
rsp.upstreamTaskId, rsp.downstreamTaskId, rsp.downstreamNodeId);
return code;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
if (pTask == NULL) {
stError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
rsp.streamId, rsp.upstreamTaskId, vgId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return -1;
}
code = streamProcessCheckRsp(pTask, &rsp);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t streamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamCheckpointReadyMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
stError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return code;
}
stDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t streamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
if (tsDisableStream) {
stInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
return code;
}
stDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
// 1.deserialize msg and build task
int32_t size = sizeof(SStreamTask);
SStreamTask* pTask = taosMemoryCalloc(1, size);
if (pTask == NULL) {
stError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
return TSDB_CODE_OUT_OF_MEMORY;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pTask);
return TSDB_CODE_INVALID_MSG;
}
// 2.save task, use the latest commit version as the initial start version of stream task.
int32_t taskId = pTask->id.taskId;
int64_t streamId = pTask->id.streamId;
bool added = false;
streamMetaWLock(pMeta);
code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
streamMetaWUnLock(pMeta);
if (code < 0) {
stError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
tFreeStreamTask(pTask);
return code;
}
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
// it is added into the meta store
if (added) {
// only handled in the leader node
if (isLeader) {
stDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pMeta, streamId, taskId);
if (p != NULL && restored && p->info.fillHistory == 0) {
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(p)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
streamTaskHandleEvent(p->status.pSM, event);
} else if (!restored) {
stWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
}
if (p != NULL) {
streamMetaReleaseTask(pMeta, p);
}
} else {
stDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
}
} else {
stWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
}
return code;
}
int32_t streamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask != NULL) {
// drop the related fill-history task firstly
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHTaskId = &pTask->hTaskInfo.id;
streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
stDebug("vgId:%d drop fill-history task:0x%x dropped firstly", vgId, (int32_t)pHTaskId->taskId);
}
streamMetaReleaseTask(pMeta, pTask);
}
// drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
// commit the update
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pMeta);
return 0;
}
int32_t startStreamTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
if (pTask->status.downstreamReady == 1) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
streamMetaUpdateTaskDownstreamStatus(pTask, pTask->execInfo.init, pTask->execInfo.start, true);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
if (ret != TSDB_CODE_SUCCESS) {
code = ret;
}
streamMetaReleaseTask(pMeta, pTask);
}
taosArrayDestroy(pTaskList);
return code;
}
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
}
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
int64_t st = taosGetTimestampMs();
while(1) {
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
if (startVal == 0) {
break;
}
stDebug("vgId:%d in start stream tasks procedure, wait for 500ms and recheck", vgId);
taosMsleep(500);
}
terrno = 0;
stInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
pMeta->updateInfo.transId);
while (streamMetaTaskInTimer(pMeta)) {
stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
code = streamMetaReopen(pMeta);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to reopen stream meta", vgId);
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
streamMetaInitBackend(pMeta);
int64_t el = taosGetTimestampMs() - st;
stInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
code = streamMetaLoadAllTasks(pMeta);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to load stream tasks, code:%s", vgId, tstrerror(terrno));
streamMetaWUnLock(pMeta);
code = terrno;
return code;
}
if (isLeader && !tsDisableStream) {
stInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
resetStreamTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
startStreamTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta);
stInfo("vgId:%d, follower node not start stream tasks", vgId);
}
code = terrno;
return code;
}
int32_t streamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId;
int32_t vgId = pMeta->vgId;
if (taskId == STREAM_EXEC_START_ALL_TASKS_ID) {
startStreamTasks(pMeta);
return 0;
} else if (taskId == STREAM_EXEC_RESTART_ALL_TASKS_ID) {
restartStreamTasks(pMeta, isLeader);
return 0;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
stDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.nextProcessVer);
streamExecTask(pTask);
} else {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
stDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, p, status);
}
streamMetaReleaseTask(pMeta, pTask);
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
stError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, taskId);
return -1;
}
}