fix(tmq): do some internal refactor.
This commit is contained in:
parent
ca29dd344f
commit
c3acab2070
|
@ -1811,7 +1811,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||||
taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
||||||
|
|
||||||
if (pRspWrapper == NULL) {
|
if (pRspWrapper == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1831,7 +1830,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
|
SMqDataRsp* pDataRsp = &pollRspWrapper->dataRsp;
|
||||||
|
|
||||||
if (pDataRsp->head.epoch == consumerEpoch) {
|
if (pDataRsp->head.epoch == consumerEpoch) {
|
||||||
// todo fix it: race condition
|
|
||||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||||
|
|
||||||
// update the epset
|
// update the epset
|
||||||
|
@ -1843,6 +1841,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
pVg->epSet = *pollRspWrapper->pEpset;
|
pVg->epSet = *pollRspWrapper->pEpset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// update the local offset value only for the returned values.
|
||||||
pVg->currentOffset = pDataRsp->rspOffset;
|
pVg->currentOffset = pDataRsp->rspOffset;
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
|
|
||||||
|
|
|
@ -109,23 +109,18 @@ typedef struct {
|
||||||
} STqPushEntry;
|
} STqPushEntry;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
int64_t walLogLastVer;
|
int64_t walLogLastVer;
|
||||||
|
SRWLatch lock;
|
||||||
SRWLatch pushLock;
|
SHashObj* pPushMgr; // consumerId -> STqPushEntry
|
||||||
|
SHashObj* pHandle; // subKey -> STqHandle
|
||||||
SHashObj* pPushMgr; // consumerId -> STqPushEntry
|
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
||||||
SHashObj* pHandle; // subKey -> STqHandle
|
|
||||||
SHashObj* pCheckInfo; // topic -> SAlterCheckInfo
|
|
||||||
|
|
||||||
STqOffsetStore* pOffsetStore;
|
STqOffsetStore* pOffsetStore;
|
||||||
|
TDB* pMetaDB;
|
||||||
TDB* pMetaDB;
|
TTB* pExecStore;
|
||||||
TTB* pExecStore;
|
TTB* pCheckStore;
|
||||||
TTB* pCheckStore;
|
SStreamMeta* pStreamMeta;
|
||||||
|
|
||||||
SStreamMeta* pStreamMeta;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -51,7 +51,7 @@ void tqCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void destroySTqHandle(void* data) {
|
static void destroyTqHandle(void* data) {
|
||||||
STqHandle* pData = (STqHandle*)data;
|
STqHandle* pData = (STqHandle*)data;
|
||||||
qDestroyTask(pData->execHandle.task);
|
qDestroyTask(pData->execHandle.task);
|
||||||
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pData->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -89,9 +89,9 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
||||||
|
|
||||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
|
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
|
||||||
|
|
||||||
taosInitRWLatch(&pTq->pushLock);
|
taosInitRWLatch(&pTq->lock);
|
||||||
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
|
taosHashSetFreeFp(pTq->pPushMgr, tqPushEntryFree);
|
||||||
|
|
||||||
|
@ -236,38 +236,6 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// int32_t len = 0;
|
|
||||||
// int32_t code = 0;
|
|
||||||
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
|
||||||
// if (code < 0) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
// void* buf = rpcMallocCont(tlen);
|
|
||||||
// if (buf == NULL) {
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
|
||||||
//
|
|
||||||
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
//
|
|
||||||
// SEncoder encoder = {0};
|
|
||||||
// tEncoderInit(&encoder, abuf, len);
|
|
||||||
// tEncodeSMqDataRsp(&encoder, pRsp);
|
|
||||||
// tEncoderClear(&encoder);
|
|
||||||
//
|
|
||||||
// SRpcMsg rsp = {
|
|
||||||
// .info = pPushEntry->pInfo,
|
|
||||||
// .pCont = buf,
|
|
||||||
// .contLen = tlen,
|
|
||||||
// .code = 0,
|
|
||||||
// };
|
|
||||||
//
|
|
||||||
// tmsgSendRsp(&rsp);
|
|
||||||
//
|
|
||||||
|
|
||||||
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||||
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||||
|
|
||||||
|
@ -444,7 +412,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
|
|
||||||
char formatBuf[80];
|
char formatBuf[80];
|
||||||
tFormatOffset(formatBuf, 80, pOffsetVal);
|
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.",
|
||||||
consumerId, pHandle->subKey, vgId, formatBuf);
|
consumerId, pHandle->subKey, vgId, formatBuf);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -502,7 +470,45 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
#define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0)
|
||||||
|
|
||||||
|
static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
|
SRpcMsg* pMsg, STqOffsetVal* pOffset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
|
// lock
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
|
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
|
code = tqScanData(pTq, pHandle, &dataRsp, pOffset);
|
||||||
|
|
||||||
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
|
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
|
", ts:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||||
|
dataRsp.rspOffset.ts, pRequest->reqId);
|
||||||
|
|
||||||
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t doPollDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
STqOffsetVal offset = {0};
|
STqOffsetVal offset = {0};
|
||||||
SWalCkHead* pCkHead = NULL;
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
@ -512,9 +518,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
|
||||||
// 1. reset the offset if needed
|
// 1. reset the offset if needed
|
||||||
if (reqOffset.type > 0) {
|
if (IS_OFFSET_RESET_TYPE(reqOffset.type)) {
|
||||||
offset = reqOffset;
|
// handle the reset offset cases, according to the consumer's choice.
|
||||||
} else { // handle the reset offset cases, according to the consumer's choice.
|
|
||||||
bool blockReturned = false;
|
bool blockReturned = false;
|
||||||
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -525,38 +530,41 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
if (blockReturned) {
|
if (blockReturned) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
} else { // use the consumer specified offset
|
||||||
|
offset = reqOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is a normal subscription requirement
|
// this is a normal subscribe requirement
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
SMqDataRsp dataRsp = {0};
|
return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset);
|
||||||
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
// SMqDataRsp dataRsp = {0};
|
||||||
|
// tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
// lock
|
//
|
||||||
taosWLockLatch(&pTq->pushLock);
|
// // lock
|
||||||
|
// taosWLockLatch(&pTq->lock);
|
||||||
qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
//
|
||||||
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
// qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId);
|
||||||
|
// code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
//
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
// // till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
// if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
// dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
return code;
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
// return code;
|
||||||
|
// }
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
//
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
// taosWUnLockLatch(&pTq->lock);
|
||||||
|
// code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
//
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
// // NOTE: this pHandle->consumerId may have been changed already.
|
||||||
", ts:%" PRId64", reqId:0x%"PRIx64,
|
// tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
// ", ts:%" PRId64", reqId:0x%"PRIx64,
|
||||||
dataRsp.rspOffset.ts, pRequest->reqId);
|
// consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||||
|
// dataRsp.rspOffset.ts, pRequest->reqId);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
//
|
||||||
return code;
|
// tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
// return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the case where re-balance occurs.
|
// todo handle the case where re-balance occurs.
|
||||||
|
@ -700,31 +708,31 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. check re-balance status
|
// 2. check re-balance status
|
||||||
taosRLockLatch(&pTq->pushLock);
|
taosRLockLatch(&pTq->lock);
|
||||||
if (pHandle->consumerId != consumerId) {
|
if (pHandle->consumerId != consumerId) {
|
||||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
taosRUnLockLatch(&pTq->pushLock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pTq->pushLock);
|
taosRUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pushLock);
|
|
||||||
// 3. update the epoch value
|
// 3. update the epoch value
|
||||||
|
taosWLockLatch(&pTq->lock);
|
||||||
int32_t savedEpoch = pHandle->epoch;
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
if (savedEpoch < reqEpoch) {
|
if (savedEpoch < reqEpoch) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||||
pHandle->epoch = reqEpoch;
|
pHandle->epoch = reqEpoch;
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
return extractDataForMq(pTq, pHandle, &req, pMsg);
|
return doPollDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
|
@ -732,12 +740,12 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
tqDebug("vgId:%d, tq process delete sub req %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->lock);
|
||||||
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
int32_t code = taosHashRemove(pTq->pPushMgr, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
tqDebug("vgId:%d, tq remove push handle %s", pTq->pVnode->config.vgId, pReq->subKey);
|
||||||
}
|
}
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pReq->subKey, strlen(pReq->subKey));
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
|
@ -801,18 +809,18 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
SVnode* pVnode = pTq->pVnode;
|
SVnode* pVnode = pTq->pVnode;
|
||||||
int32_t vgId = TD_VID(pVnode);
|
int32_t vgId = TD_VID(pVnode);
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
tqDebug("vgId:%d, tq process sub req:%s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pVnode->config.vgId, req.subKey,
|
||||||
req.oldConsumerId, req.newConsumerId);
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
if (req.oldConsumerId != -1) {
|
if (req.oldConsumerId != -1) {
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
|
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId:0x%" PRIx64,
|
||||||
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.newConsumerId == -1) {
|
if (req.newConsumerId == -1) {
|
||||||
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -902,28 +910,28 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
}
|
} else {
|
||||||
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
|
req.newConsumerId);
|
||||||
|
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
taosWLockLatch(&pTq->lock);
|
||||||
req.newConsumerId);
|
atomic_store_32(&pHandle->epoch, -1);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->pushLock);
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
atomic_store_32(&pHandle->epoch, -1);
|
tqRemovePushEntry(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
||||||
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||||
|
}
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
taosWUnLockLatch(&pTq->lock);
|
||||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
}
|
taosMemoryFree(req.qmsg);
|
||||||
|
return -1;
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
}
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
|
||||||
taosMemoryFree(req.qmsg);
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -213,7 +213,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->lock);
|
||||||
|
|
||||||
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
||||||
if (numOfRegisteredPush > 0) {
|
if (numOfRegisteredPush > 0) {
|
||||||
|
@ -231,7 +231,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
taosArrayDestroy(cachedKeyLens);
|
taosArrayDestroy(cachedKeyLens);
|
||||||
|
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -320,7 +320,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
}
|
}
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
|
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode)) {
|
||||||
|
|
Loading…
Reference in New Issue