enh: streamMetaCommit in sync with vnodeCommit
This commit is contained in:
parent
92e05b8ece
commit
6fc47beb71
|
@ -124,6 +124,7 @@ FAIL:
|
||||||
}
|
}
|
||||||
|
|
||||||
void sndClose(SSnode *pSnode) {
|
void sndClose(SSnode *pSnode) {
|
||||||
|
streamMetaCommit(pSnode->pMeta);
|
||||||
streamMetaClose(pSnode->pMeta);
|
streamMetaClose(pSnode->pMeta);
|
||||||
taosMemoryFree(pSnode->path);
|
taosMemoryFree(pSnode->path);
|
||||||
taosMemoryFree(pSnode);
|
taosMemoryFree(pSnode);
|
||||||
|
|
|
@ -15,4 +15,11 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
|
|
||||||
int tqCommit(STQ* pTq) { return tqOffsetCommitFile(pTq->pOffsetStore); }
|
int tqCommit(STQ* pTq) {
|
||||||
|
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
|
||||||
|
tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return tqOffsetCommitFile(pTq->pOffsetStore);
|
||||||
|
}
|
||||||
|
|
|
@ -251,10 +251,9 @@ void vnodePreClose(SVnode *pVnode) {
|
||||||
|
|
||||||
void vnodeClose(SVnode *pVnode) {
|
void vnodeClose(SVnode *pVnode) {
|
||||||
if (pVnode) {
|
if (pVnode) {
|
||||||
|
tsem_wait(&pVnode->canCommit);
|
||||||
vnodeSyncClose(pVnode);
|
vnodeSyncClose(pVnode);
|
||||||
vnodeQueryClose(pVnode);
|
vnodeQueryClose(pVnode);
|
||||||
|
|
||||||
tsem_wait(&pVnode->canCommit);
|
|
||||||
walClose(pVnode->pWal);
|
walClose(pVnode->pWal);
|
||||||
tqClose(pVnode->pTq);
|
tqClose(pVnode->pTq);
|
||||||
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
|
||||||
|
|
|
@ -69,8 +69,7 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaClose(SStreamMeta* pMeta) {
|
void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
tdbCommit(pMeta->db, pMeta->txn);
|
tdbTxnClose(pMeta->txn);
|
||||||
tdbPostCommit(pMeta->db, pMeta->txn);
|
|
||||||
tdbTbClose(pMeta->pTaskDb);
|
tdbTbClose(pMeta->pTaskDb);
|
||||||
tdbTbClose(pMeta->pCheckpointDb);
|
tdbTbClose(pMeta->pCheckpointDb);
|
||||||
tdbClose(pMeta->db);
|
tdbClose(pMeta->db);
|
||||||
|
@ -88,6 +87,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
/*streamMetaReleaseTask(pMeta, pTask);*/
|
/*streamMetaReleaseTask(pMeta, pTask);*/
|
||||||
}
|
}
|
||||||
taosHashCleanup(pMeta->pTasks);
|
taosHashCleanup(pMeta->pTasks);
|
||||||
|
taosHashCleanup(pMeta->pRecoverStatus);
|
||||||
taosMemoryFree(pMeta->path);
|
taosMemoryFree(pMeta->path);
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue