enh(stream): support resume from the latest data, and do some internal refactor.

This commit is contained in:
Haojun Liao 2023-04-27 15:07:54 +08:00
parent dbdc31539c
commit b7040ed15e
4 changed files with 20 additions and 14 deletions

View File

@ -51,7 +51,7 @@ enum {
TASK_STATUS__RECOVER_PREPARE, TASK_STATUS__RECOVER_PREPARE,
TASK_STATUS__RECOVER1, TASK_STATUS__RECOVER1,
TASK_STATUS__RECOVER2, TASK_STATUS__RECOVER2,
TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint TASK_STATUS__RESTORE, // only available for source task to replay WAL from the checkpoint, todo remove it
TASK_STATUS__PAUSE, TASK_STATUS__PAUSE,
}; };

View File

@ -1220,11 +1220,22 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) { if (pTask) {
tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
streamSetStatusNormal(pTask); streamSetStatusNormal(pTask);
// no lock needs to secure the access of the version
if (pReq->igUntreated) { // discard all the data when the stream task is suspended.
pTask->chkInfo.currentVer = sversion;
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64, pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask);
tqStartStreamTasks(pTq); tqStartStreamTasks(pTq);
} }
return 0; return 0;
} }
@ -1329,7 +1340,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exists", vgId); tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return 0; return 0;
} }

View File

@ -273,17 +273,11 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
} }
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
qDebug("task %d receive retrieve req from node %d task %d", pTask->id.taskId, pReq->srcNodeId, pReq->srcTaskId); qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId);
streamTaskEnqueueRetrieve(pTask, pReq, pRsp); streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
streamSchedExec(pTask); streamSchedExec(pTask);
/*streamTryExec(pTask);*/
/*streamDispatch(pTask);*/
return 0; return 0;
} }

View File

@ -206,8 +206,9 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) {
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t left = atomic_sub_fetch_32(&pTask->refCnt, 1);
ASSERT(left >= 0); if (left < 0) {
if (left == 0) { qError("task ref is invalid, ref:%d, %s", left, pTask->id.idStr);
} else if (left == 0) {
ASSERT(streamTaskShouldStop(&pTask->status)); ASSERT(streamTaskShouldStop(&pTask->status));
tFreeStreamTask(pTask); tFreeStreamTask(pTask);
} }
@ -238,12 +239,12 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
int32_t streamMetaCommit(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, pMeta->txn) < 0) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
ASSERT(0); qError("failed to commit stream meta");
return -1; return -1;
} }
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
ASSERT(0); qError("failed to commit stream meta");
return -1; return -1;
} }