From 2a4708d82af9ce8eeec02cb061572d12524e2469 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 16 Feb 2023 16:02:32 +0800 Subject: [PATCH] fix: save fill history state --- include/libs/stream/tstream.h | 3 ++- source/dnode/vnode/src/tq/tq.c | 5 ++++- source/libs/stream/src/streamMeta.c | 31 +++++++++++++++++++---------- 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c5352eee46..ac09d5dfde 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -616,6 +616,7 @@ typedef struct SStreamMeta { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t startVer, char* msg, int32_t msgLen); SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId); @@ -627,7 +628,7 @@ void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); -int32_t streamLoadTasks(SStreamMeta* pMeta); +int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); // checkpoint int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3e13eaa6e8..276de64bbd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -106,7 +106,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { return NULL; } - if (streamLoadTasks(pTq->pStreamMeta) < 0) { + if (streamLoadTasks(pTq->pStreamMeta, walGetCommittedVer(pVnode->pWal)) < 0) { return NULL; } @@ -1196,6 +1196,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t m return -1; } + atomic_store_8(&pTask->fillHistory, 0); + streamMetaSaveTask(pTq->pStreamMeta, pTask); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e345bc1c6e..63527e2b1c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -129,13 +129,8 @@ FAIL: } #endif -#if 1 -int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { - void* buf = NULL; - if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { - return -1; - } - +int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) { + void* buf = NULL; int32_t len; int32_t code; tEncodeSize(tEncodeSStreamTask, pTask, len, code); @@ -153,11 +148,23 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { tEncoderClear(&encoder); if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), buf, len, pMeta->txn) < 0) { - ASSERT(0); return -1; } taosMemoryFree(buf); + return 0; +} + +#if 1 +int32_t streamMetaAddTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask) { + if (pMeta->expandFunc(pMeta->ahandle, pTask, ver) < 0) { + return -1; + } + + if (streamMetaSaveTask(pMeta, pTask) < 0) { + return -1; + } + taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)); return 0; @@ -255,7 +262,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { return 0; } -int32_t streamLoadTasks(SStreamMeta* pMeta) { +int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { return -1; @@ -294,7 +301,11 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { tdbTbcClose(pCur); return -1; } - pTask->taskStatus = TASK_STATUS__NORMAL; + /*pTask->taskStatus = TASK_STATUS__NORMAL;*/ + if (pTask->fillHistory) { + pTask->taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + streamTaskCheckDownstream(pTask, ver); + } } tdbFree(pKey);