support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 08:59:04 +00:00
parent eeb97351e8
commit b6c991f896
4 changed files with 15 additions and 14 deletions

View File

@ -611,8 +611,8 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); // int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId); int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -163,7 +163,11 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
return code; return code;
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) {
return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId); int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
if (code == 0) {
code = streamStateLoadTasks(pWriter);
}
return code;
} }
int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) { int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) {

View File

@ -412,9 +412,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0); code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0);
if (code) goto _exit; if (code) goto _exit;
code = streamStateLoadTasks(pWriter->pStreamStateWriter);
if (code) goto _exit;
} }
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {

View File

@ -36,14 +36,14 @@ void streamMetaCleanup() {
taosCloseRef(streamBackendCfWrapperId); taosCloseRef(streamBackendCfWrapperId);
} }
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) { // int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId) {
int32_t code = 0; // int32_t code = 0;
int32_t nTask = taosHashGetSize(pMeta->pTasks); // int32_t nTask = taosHashGetSize(pMeta->pTasks);
assert(nTask == 0); // assert(nTask == 0);
return code; // return code;
} // }
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId) {
int32_t code = -1; int32_t code = -1;
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
@ -128,7 +128,7 @@ _err:
return NULL; return NULL;
} }
void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) { int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// stop all running tasking and reopen later // stop all running tasking and reopen later
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
@ -172,7 +172,7 @@ void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
// if (streamLoadTasks(pMeta,int64_t ver)) // if (streamLoadTasks(pMeta,int64_t ver))
return; return 0;
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);