fix: add lock for trans
This commit is contained in:
parent
a3a81e6b02
commit
38c1f7d48d
|
@ -181,7 +181,7 @@ typedef struct {
|
||||||
SArray* pRpcArray;
|
SArray* pRpcArray;
|
||||||
SRWLatch lockRpcArray;
|
SRWLatch lockRpcArray;
|
||||||
int64_t mTraceId;
|
int64_t mTraceId;
|
||||||
TdThreadMutex mutex;
|
int8_t lock;
|
||||||
} STrans;
|
} STrans;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -470,7 +470,6 @@ void mndTransDropData(STrans *pTrans) {
|
||||||
pTrans->param = NULL;
|
pTrans->param = NULL;
|
||||||
pTrans->paramLen = 0;
|
pTrans->paramLen = 0;
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexDestroy(&pTrans->mutex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
static int32_t mndTransDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
|
||||||
|
@ -543,10 +542,6 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
|
||||||
STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
|
STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
|
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
|
||||||
} else {
|
|
||||||
#ifdef WINDOWS
|
|
||||||
taosThreadMutexInit(&pTrans->mutex, NULL);
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
return pTrans;
|
return pTrans;
|
||||||
}
|
}
|
||||||
|
@ -582,7 +577,6 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
||||||
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
|
||||||
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
|
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : tGenIdPI64();
|
||||||
taosInitRWLatch(&pTrans->lockRpcArray);
|
taosInitRWLatch(&pTrans->lockRpcArray);
|
||||||
taosThreadMutexInit(&pTrans->mutex, NULL);
|
|
||||||
|
|
||||||
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
|
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
|
||||||
pTrans->pRpcArray == NULL) {
|
pTrans->pRpcArray == NULL) {
|
||||||
|
@ -1264,10 +1258,10 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
|
||||||
if (numOfActions == 0) return code;
|
if (numOfActions == 0) return code;
|
||||||
|
|
||||||
taosThreadMutexLock(&pTrans->mutex);
|
if (atomic_val_compare_exchange_8(&pTrans->lock, 0, 1) != 0) return code;
|
||||||
|
|
||||||
if (pTrans->redoActionPos >= numOfActions) {
|
if (pTrans->redoActionPos >= numOfActions) {
|
||||||
taosThreadMutexUnlock(&pTrans->mutex);
|
atomic_store_8(&pTrans->lock, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1339,7 +1333,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTrans->mutex);
|
atomic_store_8(&pTrans->lock, 0);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue