refactor backend

This commit is contained in:
yihaoDeng 2023-10-11 19:46:35 +08:00
parent 4f47162512
commit 031506aef8
3 changed files with 20 additions and 2 deletions

View File

@ -725,6 +725,7 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaReopen(SStreamMeta* pMeta, int64_t chkpId);
int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta);
void streamMetaNotifyClose(SStreamMeta* pMeta);
void* streamMetaGetBackendByTaskKey(SStreamMeta* pMeta, char* key, int64_t* ref);

View File

@ -169,14 +169,15 @@ int32_t streamStateSnapWriterClose(SStreamStateWriter* pWriter, int8_t rollback)
}
int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) {
tqDebug("vgId:%d, vnode %s start to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
// int32_t code = streamMetaReopen(pWriter->pTq->pStreamMeta, chkpId);
int32_t code = streamStateLoadTasks(pWriter);
tqDebug("vgId:%d, vnode %s succ to rebuild stream-state", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);
taosMemoryFree(pWriter);
return code;
}
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); }
int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) {
return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta);
}
int32_t streamStateSnapWrite(SStreamStateWriter* pWriter, uint8_t* pData, uint32_t nData) {
tqDebug("vgId:%d, vnode %s snapshot write data", TD_VID(pWriter->pTq->pVnode), STREAM_STATE_TRANSFER);

View File

@ -756,6 +756,22 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) {
taosArrayDestroy(pRecycleList);
}
int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) {
if (pMeta == NULL) return 0;
void* pIter = taosHashIterate(pMeta->pTaskBackendUnique, NULL);
while (pIter) {
STaskBackendWrapper* taskBackend = *(STaskBackendWrapper**)pIter;
if (taskBackend != NULL) {
taskBackendRemoveRef(taskBackend);
}
pIter = taosHashIterate(pMeta->pTaskBackendUnique, pIter);
}
taosHashClear(pMeta->pTaskBackendUnique);
return 0;
}
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL;