support reopen stream state

This commit is contained in:
yihaoDeng 2023-08-07 08:51:01 +00:00
parent 7ef4df8752
commit eeb97351e8
6 changed files with 60 additions and 38 deletions

View File

@ -612,6 +612,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId); int32_t streamStateRebuild(SStreamMeta* pMeta, char* path, int64_t chkpId);
int32_t streamStateReopen(SStreamMeta *pMeta, int64_t chkpId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -331,6 +331,9 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId); int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId);
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter);
// SStreamTaskReader ====================================== // SStreamTaskReader ======================================
// SStreamStateWriter ===================================== // SStreamStateWriter =====================================
// SStreamStateReader ===================================== // SStreamStateReader =====================================

View File

@ -163,7 +163,16 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
return code; return code;
} }
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) { int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, char* path, int64_t chkpId) {
return streamStateRebuild(pWriter->pTq->pStreamMeta, path, chkpId); return streamStateReopen(pWriter->pTq->pStreamMeta, chkpId);
}
int32_t streamStateLoadTasksImpl(SStreamMeta* pMeta, int64_t ver) {
// impl later
return streamLoadTasks(pMeta, ver);
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
SWal* pWal = pWriter->pTq->pVnode->pWal;
return streamStateLoadTasksImpl(pWriter->pTq->pStreamMeta, walGetCommittedVer(pWal));
} }
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {

View File

@ -409,6 +409,12 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
if (pWriter->pStreamStateWriter) { if (pWriter->pStreamStateWriter) {
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback); code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
if (code) goto _exit; if (code) goto _exit;
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, NULL, 0);
if (code) goto _exit;
code = streamStateLoadTasks(pWriter->pStreamStateWriter);
if (code) goto _exit;
} }
if (pWriter->pRsmaSnapWriter) { if (pWriter->pRsmaSnapWriter) {

View File

@ -732,8 +732,8 @@ int stateSessionKeyDBComp(void* state, const char* aBuf, size_t aLen, const char
return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2)); return stateSessionKeyCmpr(&w1, sizeof(w1), &w2, sizeof(w2));
} }
int stateSessionKeyEncode(void* ses, char* buf) { int stateSessionKeyEncode(void* k, char* buf) {
SStateSessionKey* sess = ses; SStateSessionKey* sess = k;
int len = 0; int len = 0;
len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.skey);
len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey); len += taosEncodeFixedI64((void**)&buf, sess->key.win.ekey);
@ -741,8 +741,8 @@ int stateSessionKeyEncode(void* ses, char* buf) {
len += taosEncodeFixedI64((void**)&buf, sess->opNum); len += taosEncodeFixedI64((void**)&buf, sess->opNum);
return len; return len;
} }
int stateSessionKeyDecode(void* ses, char* buf) { int stateSessionKeyDecode(void* k, char* buf) {
SStateSessionKey* sess = ses; SStateSessionKey* sess = k;
int len = 0; int len = 0;
char* p = buf; char* p = buf;

View File

@ -128,48 +128,51 @@ _err:
return NULL; return NULL;
} }
void streamMetaReopen(SStreamMeta** ppMeta) { void streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId) {
SStreamMeta* pMeta = *ppMeta; // stop all running tasking and reopen later
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamMeta* pNewMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamTask* pTask = *(SStreamTask**)pIter;
pNewMeta->path = taosStrdup(pMeta->path); if (pTask->schedTimer) {
pNewMeta->vgId = pMeta->vgId; taosTmrStop(pTask->schedTimer);
pNewMeta->walScanCounter = 0; pTask->schedTimer = NULL;
pNewMeta->ahandle = pMeta->ahandle; }
pNewMeta->expandFunc = pMeta->expandFunc;
*ppMeta = pNewMeta; if (pTask->launchTaskTimer) {
taosTmrStop(pTask->launchTaskTimer);
pTask->launchTaskTimer = NULL;
}
streamMetaClose(pMeta); tFreeStreamTask(pTask);
}
// tdbAbort(pMeta->db, pMeta->txn); // close stream backend
// tdbTbClose(pMeta->pTaskDb); streamBackendCleanup(pMeta->streamBackend);
// tdbTbClose(pMeta->pCheckpointDb); taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
// tdbClose(pMeta->db); pMeta->streamBackendRid = -1;
pMeta->streamBackend = NULL;
// void* pIter = NULL; pMeta->streamBackend = streamBackendInit(pMeta->path, chkpId);
// while (1) { pMeta->streamBackendRid = taosAddRef(streamBackendId, pMeta->streamBackend);
// pIter = taosHashIterate(pMeta->pTasks, pIter);
// if (pIter == NULL) {
// break;
// }
// SStreamTask* pTask = *(SStreamTask**)pIter; taosHashClear(pMeta->pTasks);
// if (pTask->schedTimer) {
// taosTmrStop(pTask->schedTimer);
// pTask->schedTimer = NULL;
// }
// if (pTask->launchTaskTimer) { taosArrayClear(pMeta->pTaskList);
// taosTmrStop(pTask->launchTaskTimer);
// pTask->launchTaskTimer = NULL;
// }
// tFreeStreamTask(pTask); taosHashClear(pMeta->pTaskBackendUnique);
// }
// taosHashClear(pMeta->pTasks); taosArrayClear(pMeta->checkpointSaved);
// taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosArrayClear(pMeta->checkpointInUse);
// if (streamLoadTasks(pMeta,int64_t ver))
return;
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbAbort(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);