fix(stream): drop task in synch model in write thread.

This commit is contained in:
Haojun Liao 2025-02-17 10:06:09 +08:00
parent 012ee53d35
commit bf1f162aa0
9 changed files with 38 additions and 30 deletions

View File

@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored); bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);

View File

@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo);
*/ */
int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode);
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting); int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration);
bool qTaskIsExecuting(qTaskInfo_t qinfo); bool qTaskIsExecuting(qTaskInfo_t qinfo);

View File

@ -760,7 +760,6 @@ int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
@ -800,6 +799,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta); bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);

View File

@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
} }
case TDMT_STREAM_TASK_DROP: case TDMT_STREAM_TASK_DROP:
return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen); return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
case TDMT_VND_STREAM_TASK_RESET: case TDMT_VND_STREAM_TASK_RESET:

View File

@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
} }
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen); return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
} }
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {

View File

@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
return code; return code;
} }
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) { int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t code = 0; int32_t code = 0;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -860,7 +860,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code; return code;
} else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) {
code = streamMetaDropTask(pMeta, req.streamId, req.taskId); code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId);
return code; return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;

View File

@ -972,34 +972,43 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo == NULL) { if (pTaskInfo == NULL) {
return TSDB_CODE_QRY_INVALID_QHANDLE; return TSDB_CODE_QRY_INVALID_QHANDLE;
} }
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); if (waitDuration > 0) {
qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0);
} else {
qDebug("%s async killed execTask", GET_TASKID(pTaskInfo));
}
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
// while (1) { if (waitDuration > 0) {
// taosWLockLatch(&pTaskInfo->lock); while (1) {
// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again taosWLockLatch(&pTaskInfo->lock);
// taosWUnLockLatch(&pTaskInfo->lock); if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again
// taosWUnLockLatch(&pTaskInfo->lock);
// taosMsleep(200);
// taosMsleep(200);
// int64_t d = taosGetTimestampMs() - st;
// if (d >= waitingDuration && waitingDuration >= 0) { int64_t d = taosGetTimestampMs() - st;
// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); if (d >= waitDuration && waitDuration >= 0) {
// return TSDB_CODE_SUCCESS; qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0);
// } return TSDB_CODE_SUCCESS;
// } else { // not running now }
// pTaskInfo->code = rspCode; } else { // not running now
// taosWUnLockLatch(&pTaskInfo->lock); pTaskInfo->code = rspCode;
taosWUnLockLatch(&pTaskInfo->lock);
return TSDB_CODE_SUCCESS;
}
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
// }
// }
} }
bool qTaskIsExecuting(qTaskInfo_t qinfo) { bool qTaskIsExecuting(qTaskInfo_t qinfo) {

View File

@ -897,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
// let's kill the query procedure within stream, to end it ASAP. // let's kill the query procedure within stream, to end it ASAP.
if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
} }
@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
return 0; return 0;
} }
int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
SStreamTask* pTask = NULL; SStreamTask* pTask = NULL;
int32_t code = 0; int32_t code = 0;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;

View File

@ -447,7 +447,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
int64_t refId = pTask->id.refId;
int32_t ret = streamTaskStop(pTask); int32_t ret = streamTaskStop(pTask);
if (ret) { if (ret) {
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret)); stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));