fix(tmq): add lock before close the tsdbreader.
This commit is contained in:
parent
41f26148a2
commit
03aa391859
|
@ -101,6 +101,7 @@ typedef struct {
|
|||
SWalRef* pRef;
|
||||
STqPushHandle pushHandle; // push
|
||||
STqExecHandle execHandle; // exec
|
||||
int8_t execStatus; // this handle is used to handle the poll requirement
|
||||
} STqHandle;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -192,6 +192,9 @@ void tqCleanUp();
|
|||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp);
|
||||
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
|
||||
int tqCommit(STQ*);
|
||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||
|
|
|
@ -463,152 +463,120 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqPollReq req = {0};
|
||||
int32_t code = 0;
|
||||
STqOffsetVal fetchOffsetNew;
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||
SRpcMsg* pMsg, bool* pBlockReturned) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||
*pBlockReturned = false;
|
||||
|
||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||
return -1;
|
||||
}
|
||||
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||
if (pOffset != NULL) {
|
||||
*pOffsetVal = pOffset->val;
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t reqEpoch = req.epoch;
|
||||
STqOffsetVal reqOffset = req.reqOffset;
|
||||
|
||||
// 1. find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 2. check rebalance
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// update epoch if need
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
while (savedEpoch < reqEpoch) {
|
||||
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
||||
}
|
||||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
|
||||
// 2.reset offset if needed
|
||||
if (reqOffset.type > 0) {
|
||||
fetchOffsetNew = reqOffset;
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
|
||||
return 0;
|
||||
} else {
|
||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
|
||||
if (pOffset != NULL) {
|
||||
fetchOffsetNew = pOffset->val;
|
||||
char formatBuf[80];
|
||||
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
|
||||
TD_VID(pTq->pVnode), formatBuf);
|
||||
} else {
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (req.useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(&fetchOffsetNew, 0);
|
||||
} else {
|
||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
||||
}
|
||||
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||
if (pRequest->useSnapshot) {
|
||||
if (pHandle->fetchMeta) {
|
||||
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||
} else {
|
||||
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
if (pHandle->pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
|
||||
tqOffsetResetToData(pOffsetVal, 0, 0);
|
||||
}
|
||||
} else {
|
||||
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||
if (pHandle->pRef == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||
|
||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
|
||||
*pBlockReturned = true;
|
||||
return code;
|
||||
} else {
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||
int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
||||
*pBlockReturned = true;
|
||||
return code;
|
||||
}
|
||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
||||
" in vg %d, subkey %s, reset none failed",
|
||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
|
||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||
int32_t code = -1;
|
||||
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||
STqOffsetVal fetchOffsetNew;
|
||||
SWalCkHead* pCkHead = NULL;
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
|
||||
// 1. reset the offset if needed
|
||||
if (reqOffset.type > 0) {
|
||||
fetchOffsetNew = reqOffset;
|
||||
} else { // handle the reset offset cases, according to the consumer's choice.
|
||||
bool blockReturned = false;
|
||||
code = extractResetOffsetVal(&fetchOffsetNew, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// empty block returned, quit
|
||||
if (blockReturned) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// this is a normal subscription requirement
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||
|
||||
// lock
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
||||
return -1;
|
||||
}
|
||||
code = tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew);
|
||||
|
||||
// todo handle the case where re-balance occurs.
|
||||
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
|
||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && (pHandle->execHandle.stop != false)) {
|
||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||
|
||||
if (pPushEntry != NULL) {
|
||||
pPushEntry->pInfo = pMsg->info;
|
||||
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
dataRsp.withTbName = 0;
|
||||
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
||||
pPushEntry->dataRsp.head.consumerId = consumerId;
|
||||
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
|
||||
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
|
||||
|
||||
// unlock
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
return 0;
|
||||
}
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp);
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
return code;
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
|
||||
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp);
|
||||
pHandle->execHandle.stop = false;
|
||||
|
||||
//NOTE: this pHandle->consumerId may have been changed already.
|
||||
// NOTE: this pHandle->consumerId may have been changed already.
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||
|
@ -621,7 +589,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// for taosx
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
STaosxRsp taosxRsp = {0};
|
||||
tqInitTaosxRsp(&taosxRsp, &req);
|
||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||
|
||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
|
||||
|
@ -629,11 +597,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (metaRsp.metaRspLen > 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
||||
",version:%" PRId64,
|
||||
",version:%" PRId64,
|
||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||
metaRsp.rspOffset.version);
|
||||
taosMemoryFree(metaRsp.metaRsp);
|
||||
|
@ -642,9 +608,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
if (taosxRsp.blockNum > 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
return code;
|
||||
} else {
|
||||
|
@ -668,17 +632,19 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||
|
||||
while (1) {
|
||||
savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
if (savedEpoch > reqEpoch) {
|
||||
// todo refactor: this is not correct.
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
if (savedEpoch > pRequest->epoch) {
|
||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||
", found new consumer epoch %d, discard req epoch %d",
|
||||
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
|
||||
consumerId, pRequest->epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch,
|
||||
pRequest->epoch);
|
||||
break;
|
||||
}
|
||||
|
||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -689,7 +655,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SWalCont* pHead = &pCkHead->head;
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
pRequest->epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
||||
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
SPackedData submit = {
|
||||
|
@ -699,12 +665,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
};
|
||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
||||
req.subKey);
|
||||
pRequest->subKey);
|
||||
return -1;
|
||||
}
|
||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
||||
if (tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp) < 0) {
|
||||
code = -1;
|
||||
}
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -722,7 +688,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
||||
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
|
||||
code = -1;
|
||||
taosMemoryFreeClear(pCkHead);
|
||||
tDeleteSTaosxRsp(&taosxRsp);
|
||||
|
@ -741,6 +707,68 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SMqPollReq req = {0};
|
||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int64_t consumerId = req.consumerId;
|
||||
int32_t reqEpoch = req.epoch;
|
||||
STqOffsetVal reqOffset = req.reqOffset;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
// 1. find handle
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int8_t oldVal = atomic_val_compare_exchange_8(&pHandle->execStatus, 0, 1);
|
||||
|
||||
// other thread has started to re-balance this handle, return empty block for this poll
|
||||
if (oldVal != 0) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||
int32_t code = tqSendDataRsp(pTq, pMsg, &req, &dataRsp); // here we return an empty block to client
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
}
|
||||
|
||||
// pHandle->execStatus == 1, and we are safe now
|
||||
// keep the epoch in the first plance, this value may be change by other threads.
|
||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||
|
||||
// 2. check rebalance status
|
||||
if (pHandle->consumerId != consumerId) {
|
||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||
atomic_store_8(&pHandle->execStatus, 0); // reset the flag
|
||||
return -1;
|
||||
}
|
||||
|
||||
// 3. update the epoch value
|
||||
while (savedEpoch < reqEpoch) {
|
||||
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
||||
}
|
||||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &reqOffset);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||
|
||||
int32_t code = extractDataForMq(pTq, pHandle, &req, pMsg);
|
||||
|
||||
atomic_store_8(&pHandle->execStatus, 0); // restore the flag
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||
|
||||
|
@ -813,7 +841,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tDecodeSMqRebVgReq(msg, &req);
|
||||
// todo lock
|
||||
|
||||
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
|
||||
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||
req.oldConsumerId, req.newConsumerId);
|
||||
|
||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
|
@ -821,11 +850,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
|
||||
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
||||
}
|
||||
|
||||
if (req.newConsumerId == -1) {
|
||||
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||
taosMemoryFree(req.qmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
STqHandle tqHandle = {0};
|
||||
pHandle = &tqHandle;
|
||||
/*taosInitRWLatch(&pExec->lock);*/
|
||||
|
@ -853,6 +884,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
.initTqReader = true,
|
||||
.version = ver,
|
||||
};
|
||||
|
||||
pHandle->snapshotVer = ver;
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
|
@ -867,6 +899,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||
|
||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||
|
@ -875,7 +908,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
|
||||
pHandle->execHandle.execTb.suid = req.suid;
|
||||
|
||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||
|
@ -895,25 +927,39 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
}
|
||||
|
||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
|
||||
oldConsumerId);
|
||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||
pHandle->consumerId, oldConsumerId);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
// TODO handle qmsg and exec modification
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%"PRIx64" to Id:0x%"PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
||||
// ASSERT(pHandle->consumerId == req.oldConsumerId || req.oldConsumerId == -1 || pHandle->consumerId == -1);
|
||||
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||
}
|
||||
|
||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||
req.newConsumerId);
|
||||
|
||||
taosWLockLatch(&pTq->pushLock);
|
||||
atomic_store_32(&pHandle->epoch, -1);
|
||||
|
||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
taosMemoryFree(req.qmsg);
|
||||
|
||||
pHandle->execHandle.stop = true;
|
||||
|
||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pushLock);
|
||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
|||
tdbTbcMoveToFirst(pCur);
|
||||
|
||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
STqHandle handle;
|
||||
STqHandle handle = {0};
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, &handle);
|
||||
tDecoderClear(&decoder);
|
||||
|
|
|
@ -344,3 +344,51 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||
SMqDataRsp* pDataRsp) {
|
||||
uint64_t consumerId = pRequest->consumerId;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqHandle* pTqHandle = pHandle;
|
||||
|
||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||
if (pPushEntry == NULL) {
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId,
|
||||
(int32_t)sizeof(STqPushEntry));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pPushEntry->pInfo = pRpcMsg->info;
|
||||
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pDataRsp->withTbName = 0;
|
||||
|
||||
memcpy(&pPushEntry->dataRsp, pDataRsp, sizeof(SMqDataRsp));
|
||||
pPushEntry->dataRsp.head.consumerId = consumerId;
|
||||
pPushEntry->dataRsp.head.epoch = pRequest->epoch;
|
||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId,
|
||||
pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
||||
if (pEntry != NULL) {
|
||||
uint64_t cId = (*pEntry)->dataRsp.head.consumerId;
|
||||
ASSERT(consumerId == cId);
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
|
||||
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
|
||||
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
|
||||
|
||||
if (rspConsumer) { // rsp the old consumer with empty block.
|
||||
tqPushDataRsp(pTq, *pEntry);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue