fix:[TD-28202]move tq timer to write thread in mnode
This commit is contained in:
parent
bba8532a96
commit
d3659db2ff
|
@ -1968,7 +1968,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
void* rspObj = NULL;
|
void* rspObj = NULL;
|
||||||
int64_t startTime = taosGetTimestampMs();
|
int64_t startTime = taosGetTimestampMs();
|
||||||
|
|
||||||
tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
|
tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
|
||||||
timeout);
|
timeout);
|
||||||
|
|
||||||
// in no topic status, delayed task also need to be processed
|
// in no topic status, delayed task also need to be processed
|
||||||
|
@ -2015,7 +2015,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
int64_t currentTime = taosGetTimestampMs();
|
int64_t currentTime = taosGetTimestampMs();
|
||||||
int64_t elapsedTime = currentTime - startTime;
|
int64_t elapsedTime = currentTime - startTime;
|
||||||
if (elapsedTime > timeout) {
|
if (elapsedTime > timeout) {
|
||||||
tscInfo("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||||
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -145,7 +145,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
void *pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TMQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -747,7 +747,7 @@ static int32_t mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
|
||||||
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
int32_t hbStatus = atomic_add_fetch_32(&pConsumer->hbStatus, 1);
|
||||||
int32_t status = atomic_load_32(&pConsumer->status);
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
|
||||||
mInfo("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
mDebug("check for consumer:0x%" PRIx64 " status:%d(%s), sub-time:%" PRId64 ", createTime:%" PRId64 ", hbstatus:%d",
|
||||||
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
pConsumer->consumerId, status, mndConsumerStatusName(status), pConsumer->subscribeTime, pConsumer->createTime,
|
||||||
hbStatus);
|
hbStatus);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue