diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a106fe148b..dd7618adc3 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -13,9 +13,9 @@ * along with this program. If not, see . */ -#include "tstream.h" #include "tmsgcb.h" #include "tq.h" +#include "tstream.h" typedef struct STaskUpdateEntry { int64_t streamId; @@ -24,7 +24,7 @@ typedef struct STaskUpdateEntry { } STaskUpdateEntry; int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { @@ -42,7 +42,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks); pRunReq->head.vgId = vgId; pRunReq->streamId = 0; - pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID; + pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(cb, STREAM_QUEUE, &msg); @@ -50,10 +50,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) { } int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; SStreamTaskNodeUpdateMsg req = {0}; @@ -72,7 +72,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWLock(pMeta); // the task epset may be updated again and again, when replaying the WAL, the task may be in stop status. - STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; + STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask == NULL || *ppTask == NULL) { tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId, @@ -96,7 +96,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM } STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; - void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); + void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); if (exist != NULL) { tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId, req.transId); @@ -166,7 +166,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaWUnLock(pMeta); } else { if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId); + tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", + vgId); pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { @@ -238,7 +239,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamDispatchReq req = {0}; - SDecoder decoder; + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); if (tDecodeStreamDispatchReq(&decoder, &req) < 0) { tDecoderClear(&decoder); @@ -251,7 +252,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){ + if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) { return -1; } tDeleteStreamDispatchReq(&req); @@ -355,8 +356,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId); if (pTask == NULL) { - tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.downstreamTaskId); + tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId, + req.downstreamTaskId); return -1; } @@ -381,8 +382,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId); if (pTask == NULL) { - tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", - pMeta->vgId, req.upstreamTaskId); + tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId, + req.upstreamTaskId); return -1; } @@ -428,8 +429,9 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { // only the leader node handle the check request if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); + tqError( + "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); rsp.status = TASK_DOWNSTREAM_NOT_LEADER; } else { SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); @@ -439,13 +441,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { char* p = NULL; streamTaskGetStatus(pTask, &p); - tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 + ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = TASK_DOWNSTREAM_NOT_READY; tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + ") from task:0x%x (vgId:%d), rsp check_status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } } @@ -472,7 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe tDecoderClear(&decoder); tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", - rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); if (!isLeader) { streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); @@ -485,7 +488,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe if (pTask == NULL) { streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false); tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", - rsp.streamId, rsp.upstreamTaskId, vgId); + rsp.streamId, rsp.upstreamTaskId, vgId); terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST; return -1; } @@ -496,10 +499,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe } int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) { - int32_t vgId = pMeta->vgId; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + int32_t vgId = pMeta->vgId; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; SStreamCheckpointReadyMsg req = {0}; @@ -526,7 +529,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) return code; } -int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) { +int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, + bool restored) { int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -538,7 +542,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); // 1.deserialize msg and build task - int32_t size = sizeof(SStreamTask); + int32_t size = sizeof(SStreamTask); SStreamTask* pTask = taosMemoryCalloc(1, size); if (pTask == NULL) { tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); @@ -566,7 +570,8 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); + tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, + tstrerror(code)); tFreeStreamTask(pTask); return code; } @@ -603,7 +608,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; - int32_t vgId = pMeta->vgId; + int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); @@ -634,8 +639,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } int32_t startStreamTasks(SStreamMeta* pMeta) { - int32_t code = TSDB_CODE_SUCCESS; - int32_t vgId = pMeta->vgId; + int32_t code = TSDB_CODE_SUCCESS; + int32_t vgId = pMeta->vgId; int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); @@ -679,7 +684,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { } EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT; - int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); + int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event); if (ret != TSDB_CODE_SUCCESS) { code = ret; } @@ -692,8 +697,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) { } int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { - int32_t vgId = pMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks); if (numOfTasks == 0) { @@ -703,7 +708,7 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { for (int32_t i = 0; i < numOfTasks; ++i) { SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); - STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; + STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); streamTaskResetStatus(*pTask); } @@ -716,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { int32_t code = 0; int64_t st = taosGetTimestampMs(); - while(1) { + while (1) { int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); if (startVal == 0) { break; @@ -739,7 +744,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { streamMetaClear(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); + tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.); code = streamMetaLoadAllTasks(pMeta); if (code != TSDB_CODE_SUCCESS) { @@ -780,11 +785,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); - if (pTask != NULL) { // even in halt status, the data in inputQ must be processed + if (pTask != NULL) { // even in halt status, the data in inputQ must be processed char* p = NULL; if (streamTaskReadyToRun(pTask, &p)) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, - pTask->chkInfo.nextProcessVer); + pTask->chkInfo.nextProcessVer); streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInactive(pTask); @@ -800,5 +805,3 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead return -1; } } - -