fix(stream): remove this unused drop history task msg and corresponding functions.
This commit is contained in:
parent
8620e88965
commit
f8d0c52483
|
@ -301,7 +301,6 @@
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_HTASK_DROP, "stream-htask-drop", NULL, NULL)
|
|
||||||
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
|
||||||
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
|
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,6 @@ SArray *smGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
|
@ -835,7 +835,6 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_HTASK_DROP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -2917,81 +2917,6 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// static bool needDropRelatedFillhistoryTask(STaskStatusEntry *pTaskEntry, SStreamExecInfo *pExecNode) {
|
|
||||||
// if (pTaskEntry->status == TASK_STATUS__STREAM_SCAN_HISTORY && pTaskEntry->statusLastDuration >= 10) {
|
|
||||||
// if (!pTaskEntry->inputQChanging && pTaskEntry->inputQUnchangeCounter > 10) {
|
|
||||||
// int32_t numOfReady = 0;
|
|
||||||
// int32_t numOfTotal = 0;
|
|
||||||
// for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
|
|
||||||
// STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
|
|
||||||
// if (pTaskEntry->id.streamId == pId->streamId) {
|
|
||||||
// numOfTotal++;
|
|
||||||
//
|
|
||||||
// if (pTaskEntry->id.taskId != pId->taskId) {
|
|
||||||
// STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
|
|
||||||
// if (pEntry->status == TASK_STATUS__READY) {
|
|
||||||
// numOfReady++;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// if (numOfReady > 0) {
|
|
||||||
// mDebug("stream:0x%" PRIx64
|
|
||||||
// " %d tasks are ready, %d tasks in stream-scan-history for more than 50s, drop related fill-history
|
|
||||||
// task", pTaskEntry->id.streamId, numOfReady, numOfTotal - numOfReady);
|
|
||||||
// return true;
|
|
||||||
// } else {
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return false;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// currently only handle the sink task
|
|
||||||
// 1. sink task, drop related fill-history task msg is missing
|
|
||||||
// 2. other tasks are in ready state for at least 3 * hb_interval
|
|
||||||
static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *pTaskEntry, SStreamObj *pStream) {
|
|
||||||
SStreamTask *pTask = mndGetStreamTask(&pTaskEntry->id, pStream);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
mError("failed to get the stream task:0x%x, may have been dropped", (int32_t)pTaskEntry->id.taskId);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SVDropHTaskReq *pReq = rpcMallocCont(sizeof(SVDropHTaskReq));
|
|
||||||
if (pReq == NULL) {
|
|
||||||
mError("failed to malloc in drop related fill-history task, size:%" PRIzu ", code:%s", sizeof(SVDropHTaskReq),
|
|
||||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
|
||||||
pReq->taskId = pTask->id.taskId;
|
|
||||||
pReq->streamId = pTask->id.streamId;
|
|
||||||
|
|
||||||
SRpcMsg msg = {.info.noResp = 1};
|
|
||||||
|
|
||||||
initRpcMsg(&msg, TDMT_STREAM_HTASK_DROP, pReq, sizeof(SVDropHTaskReq));
|
|
||||||
|
|
||||||
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
|
|
||||||
|
|
||||||
SEpSet epset = {0};
|
|
||||||
bool hasEpset = false;
|
|
||||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasEpset) {
|
|
||||||
tmsgSendReq(&epset, &msg);
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
int32_t num = taosArrayGetSize(pNodeList);
|
int32_t num = taosArrayGetSize(pNodeList);
|
||||||
mInfo("set node expired for %d nodes", num);
|
mInfo("set node expired for %d nodes", num);
|
||||||
|
|
|
@ -1233,35 +1233,3 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: here we may receive this message more than once, so need to handle this case
|
|
||||||
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {
|
|
||||||
SVDropHTaskReq* pReq = (SVDropHTaskReq*)pMsg->pCont;
|
|
||||||
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
tqError("vgId:%d process drop fill-history task req, failed to acquire task:0x%x, it may have been dropped already",
|
|
||||||
pMeta->vgId, pReq->taskId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("s-task:%s receive drop fill-history msg from mnode", pTask->id.idStr);
|
|
||||||
if (pTask->hTaskInfo.id.taskId == 0) {
|
|
||||||
tqError("vgId:%d s-task:%s not have related fill-history task", pMeta->vgId, pTask->id.idStr);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
SStreamTaskId id = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
|
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &id);
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
// clear the scheduler status
|
|
||||||
streamTaskSetSchedStatusInactive(pTask);
|
|
||||||
tqDebug("s-task:%s set scheduler status:%d after drop fill-history task", pTask->id.idStr, pTask->status.schedStatus);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -595,11 +595,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
|
||||||
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
tqProcessTaskResetReq(pVnode->pTq, pMsg);
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_STREAM_HTASK_DROP: {
|
|
||||||
if (pVnode->restored && vnodeIsLeader(pVnode)) {
|
|
||||||
tqProcessTaskDropHTask(pVnode->pTq, pMsg);
|
|
||||||
}
|
|
||||||
} break;
|
|
||||||
case TDMT_VND_ALTER_CONFIRM:
|
case TDMT_VND_ALTER_CONFIRM:
|
||||||
needCommit = pVnode->config.hashChange;
|
needCommit = pVnode->config.hashChange;
|
||||||
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {
|
||||||
|
|
Loading…
Reference in New Issue