diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 1172062e9b..62d6a40333 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -101,6 +101,7 @@ typedef struct { SWalRef* pRef; STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec + int8_t execStatus; // this handle is used to handle the poll requirement } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index c0d017e350..09ff844d95 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -192,6 +192,9 @@ void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); +int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp); +int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); + int tqCommit(STQ*); int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd); int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 026acd0e64..14bc51eb2c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -463,152 +463,120 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) { return 0; } -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { - SMqPollReq req = {0}; - int32_t code = 0; - STqOffsetVal fetchOffsetNew; - SWalCkHead* pCkHead = NULL; +static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, + SRpcMsg* pMsg, bool* pBlockReturned) { + uint64_t consumerId = pRequest->consumerId; + STqOffsetVal reqOffset = pRequest->reqOffset; + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey); + *pBlockReturned = false; - if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { - tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); - return -1; - } + // In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value. + if (pOffset != NULL) { + *pOffsetVal = pOffset->val; - int64_t consumerId = req.consumerId; - int32_t reqEpoch = req.epoch; - STqOffsetVal reqOffset = req.reqOffset; - - // 1. find handle - STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle == NULL) { - tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode), - req.subKey); - return -1; - } - - // 2. check rebalance - if (pHandle->consumerId != consumerId) { - tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, - consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); - terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; - return -1; - } - - // update epoch if need - int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - while (savedEpoch < reqEpoch) { - tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); - savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); - } - - char buf[80]; - tFormatOffset(buf, 80, &reqOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, - req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); - - // 2.reset offset if needed - if (reqOffset.type > 0) { - fetchOffsetNew = reqOffset; + char formatBuf[80]; + tFormatOffset(formatBuf, 80, pOffsetVal); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.", + consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf); + return 0; } else { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey); - if (pOffset != NULL) { - fetchOffsetNew = pOffset->val; - char formatBuf[80]; - tFormatOffset(formatBuf, 80, &fetchOffsetNew); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey, - TD_VID(pTq->pVnode), formatBuf); - } else { - if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { - if (req.useSnapshot) { - if (pHandle->fetchMeta) { - tqOffsetResetToMeta(&fetchOffsetNew, 0); - } else { - tqOffsetResetToData(&fetchOffsetNew, 0, 0); - } + // no poll occurs in this vnode for this topic, let's seek to the right offset value. + if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) { + if (pRequest->useSnapshot) { + if (pHandle->fetchMeta) { + tqOffsetResetToMeta(pOffsetVal, 0); } else { - pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); - if (pHandle->pRef == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1); + tqOffsetResetToData(pOffsetVal, 0, 0); + } + } else { + pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef); + if (pHandle->pRef == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } - } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); - tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, - pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); - if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { - code = -1; - } - tDeleteSMqDataRsp(&dataRsp); - return code; - } else { - STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, &req); - tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { - code = -1; - } - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { - tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64 - " in vg %d, subkey %s, reset none failed", - pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey); - terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - return -1; + tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1); } + } else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) { + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + + tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, + pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp); + tDeleteSMqDataRsp(&dataRsp); + + *pBlockReturned = true; + return code; + } else { + STaosxRsp taosxRsp = {0}; + tqInitTaosxRsp(&taosxRsp, pRequest); + tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); + int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); + tDeleteSTaosxRsp(&taosxRsp); + + *pBlockReturned = true; + return code; + } + } else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) { + tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64 + " in vg %d, subkey %s, reset none failed", + pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey); + terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; + return -1; } } + return 0; +} + +static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { + int32_t code = -1; + STqOffsetVal reqOffset = pRequest->reqOffset; + STqOffsetVal fetchOffsetNew; + SWalCkHead* pCkHead = NULL; + uint64_t consumerId = pRequest->consumerId; + + // 1. reset the offset if needed + if (reqOffset.type > 0) { + fetchOffsetNew = reqOffset; + } else { // handle the reset offset cases, according to the consumer's choice. + bool blockReturned = false; + code = extractResetOffsetVal(&fetchOffsetNew, pTq, pHandle, pRequest, pMsg, &blockReturned); + if (code != 0) { + return code; + } + + // empty block returned, quit + if (blockReturned) { + return 0; + } + } + + // this is a normal subscription requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { SMqDataRsp dataRsp = {0}; - tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); + tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); // lock taosWLockLatch(&pTq->pushLock); - if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) { - return -1; - } + code = tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew); - // todo handle the case where re-balance occurs. - // till now, all data has been rsp to consumer, new data needs to push client once arrived. + // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && - dataRsp.reqOffset.version == dataRsp.rspOffset.version && (pHandle->execHandle.stop != false)) { - STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); - - if (pPushEntry != NULL) { - pPushEntry->pInfo = pMsg->info; - memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - dataRsp.withTbName = 0; - memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp)); - pPushEntry->dataRsp.head.consumerId = consumerId; - pPushEntry->dataRsp.head.epoch = reqEpoch; - pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*)); - - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr", - consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode)); - - // unlock - taosWUnLockLatch(&pTq->pushLock); - return 0; - } + dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { + code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp); + taosWUnLockLatch(&pTq->pushLock); + return code; } taosWUnLockLatch(&pTq->pushLock); - if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) { - code = -1; - } - + code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp); pHandle->execHandle.stop = false; - //NOTE: this pHandle->consumerId may have been changed already. + // NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "", consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid, dataRsp.rspOffset.ts); @@ -621,7 +589,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { // for taosx SMqMetaRsp metaRsp = {0}; STaosxRsp taosxRsp = {0}; - tqInitTaosxRsp(&taosxRsp, &req); + tqInitTaosxRsp(&taosxRsp, pRequest); if (fetchOffsetNew.type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) { @@ -629,11 +597,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (metaRsp.metaRspLen > 0) { - if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { - code = -1; - } + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64 - ",version:%" PRId64, + ",version:%" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.version); taosMemoryFree(metaRsp.metaRsp); @@ -642,9 +608,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } if (taosxRsp.blockNum > 0) { - if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { - code = -1; - } + code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); tDeleteSTaosxRsp(&taosxRsp); return code; } else { @@ -668,17 +632,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { walSetReaderCapacity(pHandle->pWalReader, 2048); while (1) { - savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > reqEpoch) { + // todo refactor: this is not correct. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + if (savedEpoch > pRequest->epoch) { tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64 ", found new consumer epoch %d, discard req epoch %d", - consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch); + consumerId, pRequest->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, + pRequest->epoch); break; } if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -689,7 +655,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { SWalCont* pHead = &pCkHead->head; tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, - req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); + pRequest->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SPackedData submit = { @@ -699,12 +665,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode), - req.subKey); + pRequest->subKey); return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) { + if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) { code = -1; } tDeleteSTaosxRsp(&taosxRsp); @@ -722,7 +688,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) { + if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { code = -1; taosMemoryFreeClear(pCkHead); tDeleteSTaosxRsp(&taosxRsp); @@ -741,6 +707,68 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { + SMqPollReq req = {0}; + if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) { + tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int64_t consumerId = req.consumerId; + int32_t reqEpoch = req.epoch; + STqOffsetVal reqOffset = req.reqOffset; + int32_t vgId = TD_VID(pTq->pVnode); + + // 1. find handle + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); + if (pHandle == NULL) { + tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->execStatus, 0, 1); + + // other thread has started to re-balance this handle, return empty block for this poll + if (oldVal != 0) { + SMqDataRsp dataRsp = {0}; + tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType); + int32_t code = tqSendDataRsp(pTq, pMsg, &req, &dataRsp); // here we return an empty block to client + tDeleteSMqDataRsp(&dataRsp); + return code; + } + + // pHandle->execStatus == 1, and we are safe now + // keep the epoch in the first plance, this value may be change by other threads. + int32_t savedEpoch = atomic_load_32(&pHandle->epoch); + + // 2. check rebalance status + if (pHandle->consumerId != consumerId) { + tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, + consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + atomic_store_8(&pHandle->execStatus, 0); // reset the flag + return -1; + } + + // 3. update the epoch value + while (savedEpoch < reqEpoch) { + tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch); + savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch); + } + + char buf[80]; + tFormatOffset(buf, 80, &reqOffset); + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId, + req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf); + + int32_t code = extractDataForMq(pTq, pHandle, &req, pMsg); + + atomic_store_8(&pHandle->execStatus, 0); // restore the flag + return code; +} + int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg; @@ -813,7 +841,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tDecodeSMqRebVgReq(msg, &req); // todo lock - tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey); + tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey, + req.oldConsumerId, req.newConsumerId); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { @@ -821,11 +850,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "", req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); } + if (req.newConsumerId == -1) { tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); taosMemoryFree(req.qmsg); return 0; } + STqHandle tqHandle = {0}; pHandle = &tqHandle; /*taosInitRWLatch(&pExec->lock);*/ @@ -853,6 +884,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg .initTqReader = true, .version = ver, }; + pHandle->snapshotVer = ver; if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { @@ -867,6 +899,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode); + pHandle->execHandle.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta, @@ -875,7 +908,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - pHandle->execHandle.execTb.suid = req.suid; SArray* tbUidList = taosArrayInit(0, sizeof(int64_t)); @@ -895,25 +927,39 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); - tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId, - oldConsumerId); + tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, + pHandle->consumerId, oldConsumerId); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { return -1; } } else { - // TODO handle qmsg and exec modification - tqInfo("vgId:%d switch consumer from Id:0x%"PRIx64" to Id:0x%"PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); +// ASSERT(pHandle->consumerId == req.oldConsumerId || req.oldConsumerId == -1 || pHandle->consumerId == -1); + if (pHandle->consumerId == req.newConsumerId) { // do nothing + tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, -1); + atomic_add_fetch_32(&pHandle->epoch, 1); + return tqMetaSaveHandle(pTq, req.subKey, pHandle); + } + + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); + + taosWLockLatch(&pTq->pushLock); atomic_store_32(&pHandle->epoch, -1); + + // remove if it has been register in the push manager, and return one empty block to consumer + tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); atomic_add_fetch_32(&pHandle->epoch, 1); taosMemoryFree(req.qmsg); pHandle->execHandle.stop = true; - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { qStreamCloseTsdbReader(pHandle->execHandle.task); } + taosWUnLockLatch(&pTq->pushLock); if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { return -1; } diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index a85e6e0a70..ce0aa144f9 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -283,7 +283,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) { tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { - STqHandle handle; + STqHandle handle = {0}; tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSTqHandle(&decoder, &handle); tDecoderClear(&decoder); diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 5a25d7e894..03bb14e447 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -344,3 +344,51 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) return 0; } + +int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, + SMqDataRsp* pDataRsp) { + uint64_t consumerId = pRequest->consumerId; + int32_t vgId = TD_VID(pTq->pVnode); + STqHandle* pTqHandle = pHandle; + + STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry)); + if (pPushEntry == NULL) { + tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId, + (int32_t)sizeof(STqPushEntry)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pPushEntry->pInfo = pRpcMsg->info; + memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); + pDataRsp->withTbName = 0; + + memcpy(&pPushEntry->dataRsp, pDataRsp, sizeof(SMqDataRsp)); + pPushEntry->dataRsp.head.consumerId = consumerId; + pPushEntry->dataRsp.head.epoch = pRequest->epoch; + pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); + + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId, + pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr)); + return 0; +} + +int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { + int32_t vgId = TD_VID(pTq->pVnode); + STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); + if (pEntry != NULL) { + uint64_t cId = (*pEntry)->dataRsp.head.consumerId; + ASSERT(consumerId == cId); + + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId, + (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); + taosHashRemove(pTq->pPushMgr, pKey, keyLen); + + if (rspConsumer) { // rsp the old consumer with empty block. + tqPushDataRsp(pTq, *pEntry); + } + } + + return 0; +}