refactor: refactor the lock type.

This commit is contained in:
Haojun Liao 2023-12-22 16:14:49 +08:00
parent dce0a6c74a
commit 7ce545bcd4
4 changed files with 21 additions and 22 deletions

View File

@ -497,7 +497,7 @@ typedef struct SStreamMeta {
int32_t role; int32_t role;
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower. bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo; STaskStartInfo startInfo;
SRWLatch lock; TdThreadRwlock lock;
int32_t walScanCounter; int32_t walScanCounter;
void* streamBackend; void* streamBackend;
int64_t streamBackendRid; int64_t streamBackendRid;

View File

@ -240,23 +240,23 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) { static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId}; STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
taosRLockLatch(&pMeta->lock); streamMetaRLock(pMeta);
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask && *ppTask) { if (ppTask && *ppTask) {
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer; pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
pItem->fetchResultVer = (*ppTask)->info.triggerParam; pItem->fetchResultVer = (*ppTask)->info.triggerParam;
} }
taosRUnLockLatch(&pMeta->lock); streamMetaRUnLock(pMeta);
} }
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaUnregisterTask(pMeta, streamId, taskId); streamMetaUnregisterTask(pMeta, streamId, taskId);
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks); smaDebug("vgId:%d, rsma task:%" PRIi64 ",%d dropped, remain tasks:%d", pMeta->vgId, streamId, taskId, numOfTasks);
} }
@ -1301,14 +1301,14 @@ _checkpoint:
checkpointBuilt = true; checkpointBuilt = true;
} }
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
if (streamMetaSaveTask(pMeta, pTask)) { if (streamMetaSaveTask(pMeta, pTask)) {
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
taosHashCancelIterate(pInfoHash, infoHash); taosHashCancelIterate(pInfoHash, infoHash);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64 smaDebug("vgId:%d, rsma commit, succeed to commit task:%p, submitReqVer:%" PRIi64 ", fetchResultVer:%" PRIi64
", table:%" PRIi64 ", level:%d", ", table:%" PRIi64 ", level:%d",
TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1); TD_VID(pVnode), pTask, pItem->submitReqVer, pItem->fetchResultVer, pRSmaInfo->suid, i + 1);
@ -1316,13 +1316,13 @@ _checkpoint:
} }
} }
if (pMeta) { if (pMeta) {
taosWLockLatch(&pMeta->lock); streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta)) { if (streamMetaCommit(pMeta)) {
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY; code = terrno ? terrno : TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
taosWUnLockLatch(&pMeta->lock); streamMetaWUnLock(pMeta);
} }
if (checkpointBuilt) { if (checkpointBuilt) {
smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId); smaInfo("vgId:%d, rsma commit, succeed to commit checkpoint:%" PRIi64, TD_VID(pVnode), checkpointId);

View File

@ -196,7 +196,7 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
taosWLockLatch(&pTq->pStreamMeta->lock); streamMetaWLock(pTq->pStreamMeta);
tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode)); tqDebug("vgId:%d, vnode stream-task snapshot writer closed", TD_VID(pTq->pVnode));
if (rollback) { if (rollback) {
tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn); tdbAbort(pTq->pStreamMeta->db, pTq->pStreamMeta->txn);
@ -212,14 +212,14 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
goto _err; goto _err;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
return code; return code;
_err: _err:
tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode), tqError("vgId:%d, vnode stream-task snapshot writer failed to close since %s", TD_VID(pWriter->pTq->pVnode),
tstrerror(code)); tstrerror(code));
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
return code; return code;
} }
@ -240,13 +240,13 @@ int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t
tDecoderClear(&decoder); tDecoderClear(&decoder);
int64_t key[2] = {taskId.streamId, taskId.taskId}; int64_t key[2] = {taskId.streamId, taskId.taskId};
taosWLockLatch(&pTq->pStreamMeta->lock); streamMetaWLock(pTq->pStreamMeta);
if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr), if (tdbTbUpsert(pTq->pStreamMeta->pTaskDb, key, sizeof(int64_t) << 1, (uint8_t*)pData + sizeof(SSnapDataHdr),
nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) { nData - sizeof(SSnapDataHdr), pTq->pStreamMeta->txn) < 0) {
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
return -1; return -1;
} }
taosWUnLockLatch(&pTq->pStreamMeta->lock); streamMetaWUnLock(pTq->pStreamMeta);
} else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) { } else if (pHdr->type == SNAP_DATA_STREAM_TASK_CHECKPOINT) {
// do nothing // do nothing
} }

View File

@ -1293,24 +1293,23 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
void streamMetaRLock(SStreamMeta* pMeta) { void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId); stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock); taosThreadRwlockRdlock(&pMeta->lock);
} }
void streamMetaRUnLock(SStreamMeta* pMeta) { void streamMetaRUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-runlock", pMeta->vgId); stTrace("vgId:%d meta-runlock", pMeta->vgId);
taosRUnLockLatch(&pMeta->lock); taosThreadRwlockUnlock(&pMeta->lock);
} }
void streamMetaWLock(SStreamMeta* pMeta) { void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId); stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock); taosThreadRwlockWrlock(&pMeta->lock);
ASSERT(pMeta->lock != 0x40000001);
stTrace("vgId:%d meta-wlock completed", pMeta->vgId); stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
} }
void streamMetaWUnLock(SStreamMeta* pMeta) { void streamMetaWUnLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wunlock", pMeta->vgId); stTrace("vgId:%d meta-wunlock", pMeta->vgId);
taosWUnLockLatch(&pMeta->lock); taosThreadRwlockUnlock(&pMeta->lock);
} }
static void execHelper(struct SSchedMsg* pSchedMsg) { static void execHelper(struct SSchedMsg* pSchedMsg) {