From 23c4d7eb0a099dc53b8222b7cfce75249bca72d4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Aug 2023 18:10:19 +0800 Subject: [PATCH 1/4] fix:dead lock --- source/dnode/vnode/src/tq/tq.c | 118 +++++------------------- source/dnode/vnode/src/tq/tqRead.c | 5 + source/dnode/vnode/src/tq/tqUtil.c | 24 +---- source/libs/executor/src/scanoperator.c | 3 - 4 files changed, 32 insertions(+), 118 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8fe2052a0e..5ddf38b8b6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -416,86 +416,6 @@ end: rsp.code = code; tmsgSendRsp(&rsp); return 0; - -// SMqVgOffset vgOffset = {0}; -// int32_t vgId = TD_VID(pTq->pVnode); -// -// SDecoder decoder; -// tDecoderInit(&decoder, (uint8_t*)msg, msgLen); -// if (tDecodeMqVgOffset(&decoder, &vgOffset) < 0) { -// tqError("vgId:%d failed to decode seek msg", vgId); -// return -1; -// } -// -// tDecoderClear(&decoder); -// -// tqDebug("topic:%s, vgId:%d process offset seek by consumer:0x%" PRIx64 ", req offset:%" PRId64, -// vgOffset.offset.subKey, vgId, vgOffset.consumerId, vgOffset.offset.val.version); -// -// STqOffset* pOffset = &vgOffset.offset; -// if (pOffset->val.type != TMQ_OFFSET__LOG) { -// tqError("vgId:%d, subKey:%s invalid seek offset type:%d", vgId, pOffset->subKey, pOffset->val.type); -// return -1; -// } -// -// STqHandle* pHandle = taosHashGet(pTq->pHandle, pOffset->subKey, strlen(pOffset->subKey)); -// if (pHandle == NULL) { -// tqError("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", vgOffset.consumerId, vgId, pOffset->subKey); -// terrno = TSDB_CODE_INVALID_MSG; -// return -1; -// } -// -// // 2. check consumer-vg assignment status -// taosRLockLatch(&pTq->lock); -// if (pHandle->consumerId != vgOffset.consumerId) { -// tqDebug("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, -// vgOffset.consumerId, vgId, pOffset->subKey, pHandle->consumerId); -// terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; -// taosRUnLockLatch(&pTq->lock); -// return -1; -// } -// taosRUnLockLatch(&pTq->lock); -// -// // 3. check the offset info -// STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey); -// if (pSavedOffset != NULL) { -// if (pSavedOffset->val.type != TMQ_OFFSET__LOG) { -// tqError("invalid saved offset type, vgId:%d sub:%s", vgId, pOffset->subKey); -// return 0; // no need to update the offset value -// } -// -// if (pSavedOffset->val.version == pOffset->val.version) { -// tqDebug("vgId:%d subKey:%s no need to seek to %" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, -// pOffset->val.version, pSavedOffset->val.version); -// return 0; -// } -// } -// -// int64_t sver = 0, ever = 0; -// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); -// if (pOffset->val.version < sver) { -// pOffset->val.version = sver; -// } else if (pOffset->val.version > ever) { -// pOffset->val.version = ever; -// } -// -// // save the new offset value -// if (pSavedOffset != NULL) { -// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " prev offset:%" PRId64, vgId, pOffset->subKey, pOffset->val.version, -// pSavedOffset->val.version); -// } else { -// tqDebug("vgId:%d sub:%s seek to:%" PRId64 " not saved yet", vgId, pOffset->subKey, pOffset->val.version); -// } -// -// if (tqOffsetWrite(pTq->pOffsetStore, pOffset) < 0) { -// tqError("failed to save offset, vgId:%d sub:%s seek to %" PRId64, vgId, pOffset->subKey, pOffset->val.version); -// return -1; -// } -// -// tqDebug("topic:%s, vgId:%d consumer:0x%" PRIx64 " offset is update to:%" PRId64, vgOffset.offset.subKey, vgId, -// vgOffset.consumerId, vgOffset.offset.val.version); -// -// return 0; } int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId) { @@ -610,7 +530,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { taosWUnLockLatch(&pTq->lock); tqDebug("tmq poll: consumer:0x%" PRIx64 - "vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", + " vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosMsleep(10); } @@ -707,10 +627,10 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { taosRUnLockLatch(&pTq->lock); return -1; } - taosRUnLockLatch(&pTq->lock); int64_t sver = 0, ever = 0; walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever); + taosRUnLockLatch(&pTq->lock); SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, req.reqOffset); @@ -766,27 +686,35 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqInfo("vgId:%d, tq process delete sub req %s", vgId, pReq->subKey); int32_t code = 0; - taosWLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); if (pHandle) { - while (tqIsHandleExec(pHandle)) { - tqDebug("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, - pHandle->subKey, pHandle); - taosMsleep(10); - } + while (1) { + taosWLockLatch(&pTq->lock); + bool exec = tqIsHandleExec(pHandle); - if (pHandle->pRef) { - walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); - } + if(exec){ + tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + pHandle->subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + taosMsleep(10); + continue; + } + if (pHandle->pRef) { + walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId); + } - tqUnregisterPushHandle(pTq, pHandle); + tqUnregisterPushHandle(pTq, pHandle); - code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); - if (code != 0) { - tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + code = taosHashRemove(pTq->pHandle, pReq->subKey, strlen(pReq->subKey)); + if (code != 0) { + tqError("cannot process tq delete req %s, since no such handle", pReq->subKey); + } + taosWUnLockLatch(&pTq->lock); + break; } } + taosWLockLatch(&pTq->lock); code = tqOffsetDelete(pTq->pOffsetStore, pReq->subKey); if (code != 0) { tqError("cannot process tq delete req %s, since no such offset in cache", pReq->subKey); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 04e3e8c0df..cb5b1e8a7d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -366,6 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; + uint64_t st = taosGetTimestampMs(); while (1) { SArray* pBlockList = pReader->submit.aSubmitTbData; if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { @@ -439,6 +440,10 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->msg.msgStr = NULL; + + if(taosGetTimestampMs() - st > 5){ + return false; + } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index b7fd505784..dfde0d0bda 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -126,12 +126,6 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } -//static void setRequestVersion(STqOffsetVal* offset, int64_t ver){ -// if(offset->type == TMQ_OFFSET__LOG){ -// offset->version = ver; -// } -//} - static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; @@ -140,7 +134,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, *pOffset); -// dataRsp.reqOffset.type = pOffset->type; // store origin type for getting offset in tmq_get_vgroup_offset qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -161,7 +154,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, taosWUnLockLatch(&pTq->lock); } -// setRequestVersion(&dataRsp.reqOffset, pOffset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); end : { @@ -182,7 +174,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, *offset); -// taosxRsp.reqOffset.type = offset->type; // store origin type for getting offset in tmq_get_vgroup_offset if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -216,19 +207,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version; + uint64_t st = taosGetTimestampMs(); int totalRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; - } + ASSERT (savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); -// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -241,7 +227,6 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); -// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } @@ -269,9 +254,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } - if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { + if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); -// setRequestVersion(&taosxRsp.reqOffset, offset->version); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end; } else { @@ -310,7 +294,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); - } else { // todo handle the case where re-balance occurs. + } else { // for taosx return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &reqOffset); } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index af1740750c..a3eb052b23 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1727,9 +1727,6 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); if (pResult && pResult->info.rows > 0) { -// qDebug("queue scan tsdb return %" PRId64 " rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, -// pResult->info.rows, pResult->info.window.skey, pResult->info.window.ekey, -// pInfo->tqReader->pWalReader->curVersion); tqOffsetResetToData(&pTaskInfo->streamInfo.currentOffset, pResult->info.id.uid, pResult->info.window.ekey); return pResult; } From a4e29e31a6e67207c2db6241689033197cc85756 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Aug 2023 23:36:24 +0800 Subject: [PATCH 2/4] fix:modify poll delay bigger to avoid lose data --- tests/system-test/7-tmq/tmqDropStb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/system-test/7-tmq/tmqDropStb.py b/tests/system-test/7-tmq/tmqDropStb.py index 00affabafc..35bee61916 100644 --- a/tests/system-test/7-tmq/tmqDropStb.py +++ b/tests/system-test/7-tmq/tmqDropStb.py @@ -29,7 +29,7 @@ class TDTestCase: 'rowsPerTbl': 10000, 'batchNum': 2000, 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 - 'pollDelay': 20, + 'pollDelay': 50, 'showMsg': 1, 'showRow': 1} @@ -45,7 +45,7 @@ class TDTestCase: autoCommitInterval = 'auto.commit.interval.ms:1000' autoOffset = 'auto.offset.reset:earliest' - pollDelay = 20 + pollDelay = 50 showMsg = 1 showRow = 1 From 4f20359f4323da935362387a82c2e5e89801254a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Aug 2023 23:57:48 +0800 Subject: [PATCH 3/4] fix:modify log level to trace --- source/dnode/vnode/src/tq/tqRead.c | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index cb5b1e8a7d..93649a245a 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -494,7 +494,7 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); return true; } else { - tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, + tqInfo("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, taosHashGetSize(pReader->tbIdHash), idstr); } @@ -855,7 +855,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas tDeleteSchemaWrapper(pSW); goto FAIL; } - tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(block.pDataBlock)); block.info.id.uid = uid; @@ -872,7 +872,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas SSDataBlock* pBlock = taosArrayGetLast(blocks); - tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(blocks)); int32_t targetIdx = 0; @@ -954,7 +954,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas tDeleteSchemaWrapper(pSW); goto FAIL; } - tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + tqTrace("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(block.pDataBlock)); block.info.id.uid = uid; @@ -971,7 +971,7 @@ int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas SSDataBlock* pBlock = taosArrayGetLast(blocks); - tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + tqTrace("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, (int32_t)taosArrayGetSize(blocks)); int32_t targetIdx = 0; From 7f2a9a7262fd7e934d9bf464551c6c20727a0934 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 31 Aug 2023 09:07:40 +0800 Subject: [PATCH 4/4] fix:cancel the limit time for consume --- source/dnode/vnode/src/tq/tqRead.c | 8 ++++---- source/dnode/vnode/src/tq/tqUtil.c | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 93649a245a..90e1ebcd6c 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -366,7 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con bool tqNextBlockInWal(STqReader* pReader, const char* id) { SWalReader* pWalReader = pReader->pWalReader; - uint64_t st = taosGetTimestampMs(); +// uint64_t st = taosGetTimestampMs(); while (1) { SArray* pBlockList = pReader->submit.aSubmitTbData; if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { @@ -441,9 +441,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) { pReader->msg.msgStr = NULL; - if(taosGetTimestampMs() - st > 5){ - return false; - } +// if(taosGetTimestampMs() - st > 5){ +// return false; +// } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index dfde0d0bda..06dd8dd1e9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -207,7 +207,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, walReaderVerifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version; - uint64_t st = taosGetTimestampMs(); +// uint64_t st = taosGetTimestampMs(); int totalRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); @@ -254,7 +254,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } - if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { +// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) { + if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto end;