opti:modify tmq logic
This commit is contained in:
parent
0e2cf0ed73
commit
5b75ee4396
|
@ -1243,7 +1243,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
/*pRspWrapper->vgHandle = pVg;*/
|
/*pRspWrapper->vgHandle = pVg;*/
|
||||||
/*pRspWrapper->topicHandle = pTopic;*/
|
/*pRspWrapper->topicHandle = pTopic;*/
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
tsem_post(&tmq->rspSem);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
|
@ -1923,7 +1922,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||||
int32_t retryCnt = 0;
|
int32_t retryCnt = 0;
|
||||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||||
if (retryCnt++ > 40) {
|
if (retryCnt++ > 40) {
|
||||||
|
|
|
@ -1078,7 +1078,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
// let's seek to the next version in wal file
|
// let's seek to the next version in wal file
|
||||||
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
|
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, pTaskInfo->id.str) < 0) {
|
||||||
qError("tqSeekVer failed ver:" PRId64, pOffset->version + 1);
|
qError("tqSeekVer failed ver:%" PRId64, pOffset->version + 1);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
|
|
Loading…
Reference in New Issue