Merge pull request #22860 from taosdata/fix/fixTransferCrash3dev
fix transfer crash
This commit is contained in:
commit
9a57aba33d
|
@ -165,7 +165,6 @@ struct SStreamTaskWriter {
|
||||||
STQ* pTq;
|
STQ* pTq;
|
||||||
int64_t sver;
|
int64_t sver;
|
||||||
int64_t ever;
|
int64_t ever;
|
||||||
TXN* txn;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
|
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter) {
|
||||||
|
@ -182,12 +181,6 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
|
||||||
pWriter->sver = sver;
|
pWriter->sver = sver;
|
||||||
pWriter->ever = ever;
|
pWriter->ever = ever;
|
||||||
|
|
||||||
if (tdbBegin(pTq->pStreamMeta->db, &pWriter->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
|
||||||
code = -1;
|
|
||||||
taosMemoryFree(pWriter);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppWriter = pWriter;
|
*ppWriter = pWriter;
|
||||||
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
|
tqDebug("vgId:%d, vnode stream-task snapshot writer opened", TD_VID(pTq->pVnode));
|
||||||
return code;
|
return code;
|
||||||
|
@ -203,30 +196,33 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STQ* pTq = pWriter->pTq;
|
STQ* pTq = pWriter->pTq;
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
|
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
|
||||||
if (rollback) {
|
if (rollback) {
|
||||||
tdbAbort(pWriter->pTq->pStreamMeta->db, pWriter->txn);
|
tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
|
||||||
} else {
|
} else {
|
||||||
code = tdbCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn);
|
code = tdbCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
code = tdbPostCommit(pWriter->pTq->pStreamMeta->db, pWriter->txn);
|
code = tdbPostCommit(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (tdbBegin(pTq->pStreamMeta->db, &pTq->pStreamMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, 0) < 0) {
|
||||||
|
code = -1;
|
||||||
|
taosMemoryFree(pWriter);
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
|
|
||||||
taosMemoryFree(pWriter);
|
taosMemoryFree(pWriter);
|
||||||
|
|
||||||
// restore from metastore
|
|
||||||
// if (tqMetaRestoreHandle(pTq) < 0) {
|
|
||||||
// goto _err;
|
|
||||||
// }
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
|
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
return code;
|
return code;
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData) {
|
||||||
|
@ -247,11 +243,15 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
|
||||||
}
|
}
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
|
// tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn)
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||||
int64_t key[2] = {task.streamId, task.taskId};
|
int64_t key[2] = {task.streamId, task.taskId};
|
||||||
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
|
||||||
nData - sizeof(SSnapDataHdr), pWriter->txn) < 0) {
|
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||||
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
|
||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
|
@ -425,20 +425,6 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pWriter->pStreamTaskWriter) {
|
|
||||||
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback);
|
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWriter->pStreamStateWriter) {
|
|
||||||
code = streamStateSnapWriterClose(pWriter->pStreamStateWriter, rollback);
|
|
||||||
if (code) goto _exit;
|
|
||||||
|
|
||||||
code = streamStateRebuildFromSnap(pWriter->pStreamStateWriter, 0);
|
|
||||||
pWriter->pStreamStateWriter = NULL;
|
|
||||||
if (code) goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pWriter->pRsmaSnapWriter) {
|
if (pWriter->pRsmaSnapWriter) {
|
||||||
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
code = rsmaSnapWriterClose(&pWriter->pRsmaSnapWriter, rollback);
|
||||||
if (code) goto _exit;
|
if (code) goto _exit;
|
||||||
|
|
|
@ -539,10 +539,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue