From c3498fbfe8be8e0fe30820cb1f5f4b4682978573 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 18 Sep 2023 14:35:07 +0800 Subject: [PATCH] fix:core in race condition for pTq->pExecStore & return if poll too long & fix test cases if submit empty --- source/dnode/vnode/src/tq/tq.c | 60 +++++++++++++------ source/dnode/vnode/src/tq/tqRead.c | 12 ++-- source/dnode/vnode/src/tq/tqSnapshot.c | 2 + source/dnode/vnode/src/tq/tqUtil.c | 5 +- tests/system-test/7-tmq/subscribeDb3.py | 2 +- .../tmqConsFromTsdb1-1ctb-funcNFilter.py | 2 +- .../7-tmq/tmqConsFromTsdb1-1ctb.py | 2 +- .../tmqConsFromTsdb1-mutilVg-mutilCtb.py | 2 +- .../7-tmq/tmqConsFromTsdb1-mutilVg.py | 2 +- tests/system-test/7-tmq/tmqConsFromTsdb1.py | 2 +- 10 files changed, 59 insertions(+), 32 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0bf9cba2dd..bd37d36352 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { } tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); + taosWLockLatch(&pTq->lock); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); code = 0; + taosWUnLockLatch(&pTq->lock); goto end; } // 2. check consumer-vg assignment status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != req.consumerId) { tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, req.consumerId, vgId, req.subKey, pHandle->consumerId); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto end; } @@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. tqUnregisterPushHandle(pTq, pHandle); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); end: rsp.code = code; @@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); // 1. find handle + taosRLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; + taosRUnLockLatch(&pTq->lock); return -1; } // 2. check re-balance status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, vgId, req.subKey, pHandle->consumerId); @@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg bool exec = tqIsHandleExec(pHandle); if(exec){ - tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); @@ -667,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = NULL; while (1) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + + if (ret < 0) { break; } } @@ -687,21 +697,33 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqDestroyTqHandle(&handle); goto end; } - ret = tqMetaSaveHandle(pTq, req.subKey, &handle); - } else { taosWLockLatch(&pTq->lock); - - if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); - } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); - } + ret = tqMetaSaveHandle(pTq, req.subKey, &handle); taosWUnLockLatch(&pTq->lock); + } else { + while(1){ + taosWLockLatch(&pTq->lock); + bool exec = tqIsHandleExec(pHandle); + if(exec){ + tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId, + pHandle->subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + taosMsleep(10); + continue; + } + if (pHandle->consumerId == req.newConsumerId) { // do nothing + tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); + } else { + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); + tqUnregisterPushHandle(pTq, pHandle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + } + taosWUnLockLatch(&pTq->lock); + break; + } } end: diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cadbc70c6f..6a4650eb9d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -367,7 +367,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; -// uint64_t st = taosGetTimestampMs(); + uint64_t st = taosGetTimestampMs(); while (1) { SArray* pBlockList = pReader->submit.aSubmitTbData; if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { @@ -442,9 +442,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { pReader->msg.msgStr = NULL; -// if(taosGetTimestampMs() - st > 5){ -// return false; -// } + if(taosGetTimestampMs() - st > 1000){ + return false; + } } } @@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle + taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1116,6 +1117,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); taosHashCancelIterate(pTq->pHandle, pIter); + taosWUnLockLatch(&pTq->lock); + return ret; } tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); @@ -1125,6 +1128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } + taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index 5c0649c109..16cbacd59d 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -200,7 +200,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tDecodeSTqHandle(pDecoder, &handle); if (code) goto _err; + taosWLockLatch(&pTq->lock); code = tqMetaSaveHandle(pTq, handle.subKey, &handle); + taosWUnLockLatch(&pTq->lock); if (code < 0) goto _err; tDecoderClear(pDecoder); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 62ef06fec2..73e7f9d44a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -218,7 +218,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version; -// uint64_t st = taosGetTimestampMs(); + uint64_t st = taosGetTimestampMs(); int totalRows = 0; while (1) { // int32_t savedEpoch = atomic_load_32(&pHandle->epoch); @@ -265,8 +265,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } -// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { - if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { + if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index b66334a6a6..6ddd829c4c 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows < 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py index 6a03f0f751..117c3ce637 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py @@ -218,7 +218,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py index c11159c6e5..2864240441 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py index 439845aa54..d8606efe58 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py index 53ff020b08..05aa82c929 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 4bb6cf463f..dcaa6ceb7c 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -217,7 +217,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)): + if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10)