From d54ae0c840991998f4fcf8846383179dd2130885 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 7 Mar 2023 17:49:20 +0800 Subject: [PATCH] fix(tmq): free reader after check the reader status. --- source/dnode/vnode/src/inc/tq.h | 31 +++++++++---------------- source/dnode/vnode/src/tq/tq.c | 20 ++++++++++++---- source/dnode/vnode/src/tq/tqExec.c | 15 +++++++++++- source/libs/executor/src/executor.c | 2 -- source/libs/executor/src/executorimpl.c | 21 +++++++++++++++-- source/libs/executor/src/scanoperator.c | 2 +- 6 files changed, 61 insertions(+), 30 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 792fed2309..1172062e9b 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -79,8 +79,7 @@ typedef struct { } STqExecDb; typedef struct { - int8_t subType; - + int8_t subType; STqReader* pExecReader; qTaskInfo_t task; union { @@ -89,27 +88,19 @@ typedef struct { STqExecDb execDb; }; int32_t numOfCols; // number of out pout column, temporarily used + bool stop; // denote if needs to be stopped or not } STqExecHandle; typedef struct { - // info - char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - int64_t consumerId; - int32_t epoch; - int8_t fetchMeta; - - int64_t snapshotVer; - - SWalReader* pWalReader; - - SWalRef* pRef; - - // push - STqPushHandle pushHandle; - - // exec - STqExecHandle execHandle; - + char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int64_t consumerId; + int32_t epoch; + int8_t fetchMeta; + int64_t snapshotVer; + SWalReader* pWalReader; + SWalRef* pRef; + STqPushHandle pushHandle; // push + STqExecHandle execHandle; // exec } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4d21a2e7f3..026acd0e64 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -569,16 +569,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); + // lock taosWLockLatch(&pTq->pushLock); if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { return -1; } + // todo handle the case where re-balance occurs. // till now, all data has been rsp to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version) { + dataRsp.reqOffset.version == dataRsp.rspOffset.version && (pHandle->execHandle.stop != false)) { STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); + if (pPushEntry != NULL) { pPushEntry->pInfo = pMsg->info; memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); @@ -591,17 +594,21 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr", consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode)); + // unlock taosWUnLockLatch(&pTq->pushLock); return 0; } } - taosWUnLockLatch(&pTq->pushLock); + taosWUnLockLatch(&pTq->pushLock); if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { code = -1; } + pHandle->execHandle.stop = false; + + //NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -610,6 +617,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return code; } + // todo handle the case where re-balance occurs. // for taosx SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; @@ -894,11 +902,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } } else { // TODO handle qmsg and exec modification - tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId); + tqInfo("vgId:%d switch consumer from Id:0x%"PRIx64" to Id:0x%"PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_32(&pHandle->epoch, -1); atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); + + pHandle->execHandle.stop = true; + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pHandle->execHandle.task); } @@ -906,7 +917,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { return -1; } - // close handle + + pHandle->execHandle.stop = false; } return 0; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index f97c5ce93c..81b68f8473 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -46,11 +46,13 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); + // TODO add reference to gurantee success if (metaGetTableEntryByUidCache(&mr, uid) < 0) { metaReaderClear(&mr); return -1; } + for (int32_t i = 0; i < n; i++) { char* tbName = taosStrdup(mr.me.name); taosArrayPush(pRsp->blockTbName, &tbName); @@ -83,13 +85,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; + tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId); if (qExecTask(task, &pDataBlock, &ts) < 0) { tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr()); return -1; } + tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock); + // current scan should be stopped asap, since the rebalance occurs. if (pDataBlock == NULL) { break; } @@ -99,7 +104,15 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { rowCnt += pDataBlock->info.rows; - if (rowCnt >= 4096) break; + if (rowCnt >= 4096) { + break; + } + } + + if (pExec->stop) { + tqDebug("vgId:%d, current vgroups has been transferred to other consumer, return results asap", + TD_VID(pTq->pVnode)); + break; } } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9fe0f4f8a7..d00f9d9843 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -709,7 +709,6 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; - if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } @@ -717,7 +716,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); setTaskKilled(pTaskInfo, rspCode); - qStopTaskOperators(pTaskInfo); return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 0dcefec93d..54e443d1da 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2771,11 +2771,17 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta } void qStreamCloseTsdbReader(void* task) { - if (task == NULL) return; + if (task == NULL) { + return; + } + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task; SOperatorInfo* pOp = pTaskInfo->pRoot; - qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid, + + qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.lastStatus.uid, pTaskInfo->streamInfo.lastStatus.ts); + + // todo refactor, other thread may already use this read to extract data. pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0}; while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) { SOperatorInfo* pDownstreamOp = pOp->pDownstream[0]; @@ -2783,8 +2789,19 @@ void qStreamCloseTsdbReader(void* task) { SStreamScanInfo* pInfo = pDownstreamOp->info; if (pInfo->pTableScanOp) { STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + + setOperatorCompleted(pInfo->pTableScanOp); + while(pTaskInfo->owner != 0) { + taosMsleep(100); + qDebug("wait for the reader stopping"); + } + tsdbReaderClose(pTSInfo->base.dataReader); pTSInfo->base.dataReader = NULL; + + // restore the status, todo refactor. + pInfo->pTableScanOp->status = OP_OPENED; + pTaskInfo->status = TASK_NOT_COMPLETED; return; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8ccd0c78fc..e98c27fb88 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -751,7 +751,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { while (1) { SSDataBlock* result = doGroupedTableScan(pOperator); - if (result) { + if (result || (pOperator->status == OP_EXEC_DONE)) { return result; }