Merge pull request #24418 from taosdata/fix/TD-28185
fix:[TD-28185]add pause & resume logic for snode
This commit is contained in:
commit
0ab308ef79
|
@ -33,5 +33,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
|
||||
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
|
||||
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
|
||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
|
||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
|
||||
|
||||
#endif // TDENGINE_TQ_COMMON_H
|
||||
|
|
|
@ -195,7 +195,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
case TDMT_STREAM_TASK_DEPLOY: {
|
||||
void * pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
return tqStreamTaskProcessDeployReq(pSnode->pMeta, -1, pReq, len, true, true);
|
||||
return tqStreamTaskProcessDeployReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pReq, len, true, true);
|
||||
}
|
||||
|
||||
case TDMT_STREAM_TASK_DROP:
|
||||
|
@ -204,6 +204,10 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
|||
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
|
||||
case TDMT_VND_STREAM_TASK_RESET:
|
||||
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg);
|
||||
case TDMT_STREAM_TASK_PAUSE:
|
||||
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
|
||||
case TDMT_STREAM_TASK_RESUME:
|
||||
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
|
||||
default:
|
||||
ASSERT(0);
|
||||
}
|
||||
|
|
|
@ -1094,108 +1094,11 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)msg;
|
||||
|
||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
pReq->taskId);
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||
streamTaskPause(pTask, pMeta);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
||||
if (pHistoryTask == NULL) {
|
||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||
", it may have been dropped already",
|
||||
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||
|
||||
streamTaskPause(pHistoryTask, pMeta);
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskResumeImpl(STQ* pTq, SStreamTask* pTask, int64_t sversion, int8_t igUntreated) {
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamTaskResume(pTask);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SINK) {
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
}
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||
// no lock needs to secure the access of the version
|
||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||
// discard all the data when the stream task is suspended.
|
||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
|
||||
", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
} else { // from the previous paused version and go on
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
}
|
||||
|
||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
|
||||
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
||||
tqScanWalAsync(pTq, false);
|
||||
} else {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
// todo: fill-history task init ?
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->streamId, pReq->taskId);
|
||||
int32_t code = tqProcessTaskResumeImpl(pTq, pTask, sversion, pReq->igUntreated);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pTq->pStreamMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||
if (pHistoryTask) {
|
||||
code = tqProcessTaskResumeImpl(pTq, pHistoryTask, sversion, pReq->igUntreated);
|
||||
}
|
||||
|
||||
return code;
|
||||
return tqStreamTaskProcessTaskResumeReq(pTq, sversion, msg, true);
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
|
|
|
@ -752,4 +752,109 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg){
|
||||
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
pReq->taskId);
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s receive pause msg from mnode", pTask->id.idStr);
|
||||
streamTaskPause(pTask, pMeta);
|
||||
|
||||
SStreamTask* pHistoryTask = NULL;
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
pHistoryTask = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
|
||||
if (pHistoryTask == NULL) {
|
||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||
", it may have been dropped already",
|
||||
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
|
||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
tqDebug("s-task:%s fill-history task handle paused along with related stream task", pHistoryTask->id.idStr);
|
||||
|
||||
streamTaskPause(pHistoryTask, pMeta);
|
||||
streamMetaReleaseTask(pMeta, pHistoryTask);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t sversion, int8_t igUntreated, bool fromVnode) {
|
||||
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
||||
int32_t vgId = pMeta->vgId;
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamTaskResume(pTask);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask, NULL);
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SINK) {
|
||||
if (status == TASK_STATUS__UNINIT) {
|
||||
}
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (status == TASK_STATUS__READY || status == TASK_STATUS__SCAN_HISTORY || status == TASK_STATUS__CK) {
|
||||
// no lock needs to secure the access of the version
|
||||
if (igUntreated && level == TASK_LEVEL__SOURCE && !pTask->info.fillHistory) {
|
||||
// discard all the data when the stream task is suspended.
|
||||
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
|
||||
", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
} else { // from the previous paused version and go on
|
||||
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
|
||||
vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer, sversion, pTask->status.schedStatus);
|
||||
}
|
||||
|
||||
if (level == TASK_LEVEL__SOURCE && pTask->info.fillHistory && status == TASK_STATUS__SCAN_HISTORY) {
|
||||
streamStartScanHistoryAsync(pTask, igUntreated);
|
||||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
||||
tqScanWalAsync((STQ*)handle, false);
|
||||
} else {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
} else if (status == TASK_STATUS__UNINIT) {
|
||||
// todo: fill-history task init ?
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT;
|
||||
streamTaskHandleEvent(pTask->status.pSM, event);
|
||||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode){
|
||||
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
|
||||
SStreamMeta *pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||
SStreamTask* pHistoryTask = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId);
|
||||
if (pHistoryTask) {
|
||||
code = tqProcessTaskResumeImpl(handle, pHistoryTask, sversion, pReq->igUntreated, fromVnode);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
|
@ -377,16 +377,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
|||
pMeta->rid = taosAddRef(streamMetaId, pMeta);
|
||||
|
||||
// set the attribute when running on Linux OS
|
||||
#if defined LINUX
|
||||
TdThreadRwlockAttr attr;
|
||||
taosThreadRwlockAttrInit(&attr);
|
||||
|
||||
#ifdef LINUX
|
||||
pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);
|
||||
taosThreadRwlockInit(&pMeta->lock, &attr);
|
||||
|
||||
taosThreadRwlockAttrDestroy(&attr);
|
||||
#endif
|
||||
|
||||
taosThreadRwlockInit(&pMeta->lock, &attr);
|
||||
taosThreadRwlockAttrDestroy(&attr);
|
||||
|
||||
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
|
||||
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
|
||||
metaRefMgtAdd(pMeta->vgId, pRid);
|
||||
|
|
Loading…
Reference in New Issue