fix:change push mgr from SArray to Hash
This commit is contained in:
parent
db5b5c828e
commit
1ee1b0422c
|
@ -147,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
|||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp, int32_t* totalRows);
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle);
|
||||
|
||||
// tqMeta
|
||||
int32_t tqMetaOpen(STQ* pTq);
|
||||
|
|
|
@ -193,9 +193,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode);
|
|||
void tqNotifyClose(STQ*);
|
||||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
|
||||
int32_t type);
|
||||
int tqUnregisterPushHandle(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||
int tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
|
||||
|
||||
int tqCommit(STQ*);
|
||||
|
|
|
@ -98,7 +98,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
taosHashSetFreeFp(pTq->pHandle, destroyTqHandle);
|
||||
|
||||
taosInitRWLatch(&pTq->lock);
|
||||
pTq->pPushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
pTq->pPushMgr = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
||||
|
||||
pTq->pCheckInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||
taosHashSetFreeFp(pTq->pCheckInfo, (FDelete)tDeleteSTqCheckInfo);
|
||||
|
@ -220,17 +220,19 @@ static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqData
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||
int32_t tqPushDataRsp(STQ* pTq, STqHandle* pHandle) {
|
||||
SMqDataRsp dataRsp = {0};
|
||||
dataRsp.head.consumerId = pHandle->consumerId;
|
||||
dataRsp.head.epoch = pHandle->epoch;
|
||||
dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||
doSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP);
|
||||
|
||||
char buf1[80] = {0};
|
||||
char buf2[80] = {0};
|
||||
tFormatOffset(buf1, tListLen(buf1), &pRsp->reqOffset);
|
||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||
tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
|
||||
tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
|
||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||
TD_VID(pTq->pVnode), dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -552,14 +554,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
|||
atomic_store_32(&pHandle->epoch, -1);
|
||||
|
||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||
// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
||||
int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t));
|
||||
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||
if(pHandle->msg != NULL) {
|
||||
rpcFreeCont(pHandle->msg->pCont);
|
||||
taosMemoryFree(pHandle->msg);
|
||||
pHandle->msg = NULL;
|
||||
}
|
||||
tqUnregisterPushHandle(pTq, pHandle);
|
||||
|
||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||
|
||||
|
|
|
@ -300,3 +300,36 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t tqRegisterPushEntry(STQ* pTq, void* handle, SRpcMsg* pMsg) {
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
STqHandle* pHandle = (STqHandle*) handle;
|
||||
if(pHandle->msg == NULL){
|
||||
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
}
|
||||
|
||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||
pHandle->msg->contLen = pMsg->contLen;
|
||||
int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES);
|
||||
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) {
|
||||
STqHandle *pHandle = (STqHandle*)handle;
|
||||
int32_t vgId = TD_VID(pTq->pVnode);
|
||||
|
||||
int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey));
|
||||
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||
if(pHandle->msg != NULL) {
|
||||
tqPushDataRsp(pTq, pHandle);
|
||||
|
||||
rpcFreeCont(pHandle->msg->pCont);
|
||||
taosMemoryFree(pHandle->msg);
|
||||
pHandle->msg = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -178,21 +178,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||
// code = tqRegisterPushHandle(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||
// lock
|
||||
taosWLockLatch(&pTq->lock);
|
||||
// tqDebug("data is over, register to handle:%p, msg:%p", pHandle, pHandle->msg);
|
||||
if(pHandle->msg == NULL){
|
||||
pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
||||
}
|
||||
|
||||
memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg));
|
||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||
pHandle->msg->contLen = pMsg->contLen;
|
||||
int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES);
|
||||
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||
|
||||
code = tqRegisterPushEntry(pTq, pHandle, pMsg);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
tDeleteSMqDataRsp(&dataRsp);
|
||||
return code;
|
||||
|
|
Loading…
Reference in New Issue