fix transfer crash

This commit is contained in:
yihaoDeng 2023-09-12 10:44:56 +08:00
parent af08289a06
commit 6ce85f5cae
2 changed files with 16 additions and 18 deletions

View File

@ -182,12 +182,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
pWriter->sver = sver; pWriter->sver = sver;
pWriter->ever = ever; pWriter->ever = ever;
if (tdbBegin(pTq->pStreamMeta->db, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
taosMemoryFree(pWriter);
goto _err;
}
*ppWriter = pWriter; *ppWriter = pWriter;
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
return code; return code;
@ -204,29 +198,31 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
taosWLockLatch(&pTq->pStreamMeta->lock);
if (rollback) { if (rollback) {
tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn); tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
} else { } else {
code = tdbCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); code = tdbCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
if (code) goto _err; if (code) goto _err;
code = tdbPostCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn); code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
if (code) goto _err; if (code) goto _err;
} }
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
code = -1;
goto _err;
}
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
// restore from metastore
// if (tqMetaRestoreHandle(pTq) < 0) {
// goto _err;
// }
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode), tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosWUnLockLatch(&pTq->pStreamMeta->lock);
return code; return code;
return 0;
} }
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
@ -251,7 +247,7 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
taosWLockLatch(&pTq->pStreamMeta->lock); taosWLockLatch(&pTq->pStreamMeta->lock);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) { nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
taosWUnLockLatch(&pTq->pStreamMeta->lock); taosWUnLockLatch(&pTq->pStreamMeta->lock);
return -1; return -1;
} }

View File

@ -129,7 +129,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) { if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb, 0) < 0) {
goto _err; goto _err;
} }
if (streamMetaBegin(pMeta) < 0) { if (streamMetaBegin(pMeta) < 0) {
goto _err; goto _err;
} }
@ -539,10 +538,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
} }
int32_t streamMetaBegin(SStreamMeta* pMeta) { int32_t streamMetaBegin(SStreamMeta* pMeta) {
taosWLockLatch(&pMeta->lock);
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
taosWUnLockLatch(&pMeta->lock);
return -1; return -1;
} }
taosWUnLockLatch(&pMeta->lock);
return 0; return 0;
} }