transaction multithread

This commit is contained in:
dmchen 2023-05-04 05:49:36 +00:00
parent cef0aba54d
commit 38f507d941
2 changed files with 12 additions and 1 deletions

View File

@ -177,6 +177,7 @@ typedef struct {
SArray* pRpcArray; SArray* pRpcArray;
SRWLatch lockRpcArray; SRWLatch lockRpcArray;
int64_t mTraceId; int64_t mTraceId;
TdThreadMutex mutex;
} STrans; } STrans;
typedef struct { typedef struct {

View File

@ -546,6 +546,7 @@ static void mndTransDropData(STrans *pTrans) {
pTrans->param = NULL; pTrans->param = NULL;
pTrans->paramLen = 0; pTrans->paramLen = 0;
} }
(void)taosThreadMutexDestroy(&pTrans->mutex);
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
@ -651,6 +652,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0; pTrans->mTraceId = pReq ? TRACE_GET_ROOTID(&pReq->info.traceId) : 0;
taosInitRWLatch(&pTrans->lockRpcArray); taosInitRWLatch(&pTrans->lockRpcArray);
taosThreadMutexInit(&pTrans->mutex, NULL);
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL || if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) { pTrans->pRpcArray == NULL) {
@ -1307,7 +1309,13 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
int32_t code = 0; int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code;
taosThreadMutexLock(&pTrans->mutex);
if (pTrans->redoActionPos >= numOfActions) {
taosThreadMutexUnlock(&pTrans->mutex);
return code;
}
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos);
@ -1377,6 +1385,8 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
} }
taosThreadMutexUnlock(&pTrans->mutex);
return code; return code;
} }