fix:add log for msg push
This commit is contained in:
parent
8677b56a4f
commit
db5b5c828e
|
@ -1377,7 +1377,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
|
tscDebug("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet);
|
||||||
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
|
pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg));
|
||||||
|
|
||||||
for (int32_t j = 0; j < vgNumGet; j++) {
|
for (int32_t j = 0; j < vgNumGet; j++) {
|
||||||
|
@ -1447,14 +1447,14 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
|
||||||
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (pTopicCur->vgs) {
|
if (pTopicCur->vgs) {
|
||||||
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
|
tscDebug("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur);
|
||||||
for (int32_t j = 0; j < vgNumCur; j++) {
|
for (int32_t j = 0; j < vgNumCur; j++) {
|
||||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||||
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
|
tscDebug("consumer:0x%" PRIx64 ", doUpdateLocalEp current vg, epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, tmq->epoch, pVgCur->vgId,
|
||||||
vgKey, buf);
|
vgKey, buf);
|
||||||
|
|
||||||
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
|
SVgroupSaveInfo info = {.offset = pVgCur->currentOffset, .numOfRows = pVgCur->numOfRows};
|
||||||
|
|
|
@ -553,8 +553,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
// tqUnregisterPushHandle(pTq, req.subKey, (int32_t)strlen(req.subKey), pHandle->consumerId, true);
|
||||||
taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t));
|
int32_t ret = taosHashRemove(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t));
|
||||||
|
tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId);
|
||||||
if(pHandle->msg != NULL) {
|
if(pHandle->msg != NULL) {
|
||||||
rpcFreeCont(pHandle->msg->pCont);
|
rpcFreeCont(pHandle->msg->pCont);
|
||||||
taosMemoryFree(pHandle->msg);
|
taosMemoryFree(pHandle->msg);
|
||||||
|
@ -1069,22 +1069,24 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
taosWLockLatch(&pTq->lock);
|
taosWLockLatch(&pTq->lock);
|
||||||
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
if(taosHashGetSize(pTq->pPushMgr) > 0){
|
||||||
while(pIter){
|
void *pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
while(pIter){
|
||||||
tqDebug("vgId:%d start set submit for pHandle:%p", vgId, pHandle);
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
if(ASSERT(pHandle->msg != NULL)){
|
tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId);
|
||||||
tqError("pHandle->msg should not be null");
|
if(ASSERT(pHandle->msg != NULL)){
|
||||||
break;
|
tqError("pHandle->msg should not be null");
|
||||||
}else{
|
break;
|
||||||
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
|
}else{
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
SRpcMsg msg = {.msgType = TDMT_VND_TMQ_CONSUME, .pCont = pHandle->msg->pCont, .contLen = pHandle->msg->contLen, .info = pHandle->msg->info};
|
||||||
taosMemoryFree(pHandle->msg);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, QUERY_QUEUE, &msg);
|
||||||
pHandle->msg = NULL;
|
taosMemoryFree(pHandle->msg);
|
||||||
|
pHandle->msg = NULL;
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
}
|
}
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
taosHashClear(pTq->pPushMgr);
|
||||||
}
|
}
|
||||||
taosHashClear(pTq->pPushMgr);
|
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
|
|
||||||
|
|
|
@ -190,8 +190,9 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
pHandle->msg->pCont = rpcMallocCont(pMsg->contLen);
|
||||||
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen);
|
||||||
pHandle->msg->contLen = pMsg->contLen;
|
pHandle->msg->contLen = pMsg->contLen;
|
||||||
tqDebug("data is over, register to handle:%p, pCont:%p, len:%d", pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
int32_t ret = taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES);
|
||||||
taosHashPut(pTq->pPushMgr, &pHandle->consumerId, sizeof(int64_t), &pHandle, POINTER_BYTES);
|
tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen);
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->lock);
|
taosWUnLockLatch(&pTq->lock);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -232,7 +232,7 @@ void saveConfigToLogFile() {
|
||||||
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
|
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
|
||||||
}
|
}
|
||||||
taosFprintfFile(g_fp, "\n");
|
taosFprintfFile(g_fp, "\n");
|
||||||
taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
|
taosFprintfFile(g_fp, " expect rows: %" PRId64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
|
|
Loading…
Reference in New Issue