Merge branch 'fix/snodeBackendCrash' of https://github.com/taosdata/TDengine into fix/snodeBackendCrash
This commit is contained in:
commit
bb5f0d9112
|
@ -13,9 +13,9 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstream.h"
|
||||
#include "tmsgcb.h"
|
||||
#include "tq.h"
|
||||
#include "tstream.h"
|
||||
|
||||
typedef struct STaskUpdateEntry {
|
||||
int64_t streamId;
|
||||
|
@ -24,7 +24,7 @@ typedef struct STaskUpdateEntry {
|
|||
} STaskUpdateEntry;
|
||||
|
||||
int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
if (numOfTasks == 0) {
|
||||
|
@ -42,7 +42,7 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
|||
tqDebug("vgId:%d start all %d stream task(s) async", vgId, numOfTasks);
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = 0;
|
||||
pRunReq->taskId = restart? STREAM_EXEC_RESTART_ALL_TASKS_ID:STREAM_EXEC_START_ALL_TASKS_ID;
|
||||
pRunReq->taskId = restart ? STREAM_EXEC_RESTART_ALL_TASKS_ID : STREAM_EXEC_START_ALL_TASKS_ID;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
tmsgPutToQueue(cb, STREAM_QUEUE, &msg);
|
||||
|
@ -50,10 +50,10 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
|
||||
|
||||
SStreamTaskNodeUpdateMsg req = {0};
|
||||
|
||||
|
@ -72,7 +72,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
streamMetaWLock(pMeta);
|
||||
|
||||
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
|
||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask == NULL || *ppTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", pMeta->vgId,
|
||||
|
@ -96,7 +96,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
}
|
||||
|
||||
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
||||
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||
if (exist != NULL) {
|
||||
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||
req.transId);
|
||||
|
@ -166,7 +166,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
if (!restored) {
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag",
|
||||
vgId);
|
||||
pMeta->startInfo.tasksWillRestart = 0;
|
||||
streamMetaWUnLock(pMeta);
|
||||
} else {
|
||||
|
@ -238,7 +239,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
SStreamDispatchReq req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
|
||||
if (tDecodeStreamDispatchReq(&decoder, &req) < 0) {
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -251,7 +252,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||
if (pTask) {
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0){
|
||||
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
|
||||
return -1;
|
||||
}
|
||||
tDeleteStreamDispatchReq(&req);
|
||||
|
@ -355,8 +356,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMs
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed",
|
||||
pMeta->vgId, req.downstreamTaskId);
|
||||
tqError("vgId:%d process scan history finish msg, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||
req.downstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -381,8 +382,8 @@ int32_t tqStreamTaskProcessScanHistoryFinishRsp(SStreamMeta* pMeta, SRpcMsg* pMs
|
|||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed",
|
||||
pMeta->vgId, req.upstreamTaskId);
|
||||
tqError("vgId:%d process scan history finish rsp, failed to find task:0x%x, it may be destroyed", pMeta->vgId,
|
||||
req.upstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -428,8 +429,9 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
// only the leader node handle the check request
|
||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
||||
tqError(
|
||||
"s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
|
||||
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
|
||||
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
|
||||
} else {
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
|
||||
|
@ -439,13 +441,14 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
char* p = NULL;
|
||||
streamTaskGetStatus(pTask, &p);
|
||||
tqDebug("s-task:%s status:%s, stage:%"PRId64" recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), check_status:%d",
|
||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64
|
||||
") task:0x%x (vgId:%d), check_status:%d",
|
||||
pTask->id.idStr, p, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
} else {
|
||||
rsp.status = TASK_DOWNSTREAM_NOT_READY;
|
||||
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
|
||||
") from task:0x%x (vgId:%d), rsp check_status %d",
|
||||
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
") from task:0x%x (vgId:%d), rsp check_status %d",
|
||||
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -472,7 +475,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
|
||||
tDecoderClear(&decoder);
|
||||
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(reqId:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d",
|
||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||
rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
|
||||
|
||||
if (!isLeader) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||
|
@ -485,7 +488,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
if (pTask == NULL) {
|
||||
streamMetaUpdateTaskDownstreamStatus(pMeta, rsp.streamId, rsp.upstreamTaskId, 0, taosGetTimestampMs(), false);
|
||||
tqError("tq failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
|
||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||
rsp.streamId, rsp.upstreamTaskId, vgId);
|
||||
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
@ -496,10 +499,10 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
|||
}
|
||||
|
||||
int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int32_t code = 0;
|
||||
|
||||
SStreamCheckpointReadyMsg req = {0};
|
||||
|
||||
|
@ -526,7 +529,8 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored) {
|
||||
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char* msg, int32_t msgLen, bool isLeader,
|
||||
bool restored) {
|
||||
int32_t code = 0;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
|
@ -538,7 +542,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
|||
tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId);
|
||||
|
||||
// 1.deserialize msg and build task
|
||||
int32_t size = sizeof(SStreamTask);
|
||||
int32_t size = sizeof(SStreamTask);
|
||||
SStreamTask* pTask = taosMemoryCalloc(1, size);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size);
|
||||
|
@ -566,7 +570,8 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
|||
streamMetaWUnLock(pMeta);
|
||||
|
||||
if (code < 0) {
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code));
|
||||
tqError("failed to add s-task:0x%x into vgId:%d meta, total:%d, code:%s", vgId, taskId, numOfTasks,
|
||||
tstrerror(code));
|
||||
tFreeStreamTask(pTask);
|
||||
return code;
|
||||
}
|
||||
|
@ -603,7 +608,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, int64_t sversion, char*
|
|||
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
|
||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
|
@ -634,8 +639,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
}
|
||||
|
||||
int32_t startStreamTasks(SStreamMeta* pMeta) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
tqDebug("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
|
||||
|
@ -679,7 +684,7 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
EStreamTaskEvent event = (HAS_RELATED_FILLHISTORY_TASK(pTask)) ? TASK_EVENT_INIT_STREAM_SCANHIST : TASK_EVENT_INIT;
|
||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
code = ret;
|
||||
}
|
||||
|
@ -692,8 +697,8 @@ int32_t startStreamTasks(SStreamMeta* pMeta) {
|
|||
}
|
||||
|
||||
int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
int32_t vgId = pMeta->vgId;
|
||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||
|
||||
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
|
||||
if (numOfTasks == 0) {
|
||||
|
@ -703,7 +708,7 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) {
|
|||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||
|
||||
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
streamTaskResetStatus(*pTask);
|
||||
}
|
||||
|
@ -716,7 +721,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
int32_t code = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
|
||||
while(1) {
|
||||
while (1) {
|
||||
int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1);
|
||||
if (startVal == 0) {
|
||||
break;
|
||||
|
@ -739,7 +744,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
|||
streamMetaClear(pMeta);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - st;
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.);
|
||||
tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el / 1000.);
|
||||
|
||||
code = streamMetaLoadAllTasks(pMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -780,11 +785,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
}
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId);
|
||||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||
if (pTask != NULL) { // even in halt status, the data in inputQ must be processed
|
||||
char* p = NULL;
|
||||
if (streamTaskReadyToRun(pTask, &p)) {
|
||||
tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
streamExecTask(pTask);
|
||||
} else {
|
||||
int8_t status = streamTaskSetSchedStatusInactive(pTask);
|
||||
|
@ -800,5 +805,3 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue