opti:change push mgr to consume msg for subscribe
This commit is contained in:
parent
168e6f8936
commit
1c63408b3e
|
@ -149,8 +149,6 @@ typedef struct SWalReader {
|
||||||
int64_t capacity;
|
int64_t capacity;
|
||||||
// int8_t curInvalid;
|
// int8_t curInvalid;
|
||||||
// int8_t curStopped;
|
// int8_t curStopped;
|
||||||
int64_t bodyCnt;
|
|
||||||
int64_t bodyTotalSize;
|
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SWalFilterCond cond;
|
SWalFilterCond cond;
|
||||||
// TODO remove it
|
// TODO remove it
|
||||||
|
|
|
@ -1702,7 +1702,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 100ms
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
|
||||||
tmq->epoch, pVg->vgId);
|
tmq->epoch, pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1710,7 +1710,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||||
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
||||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||||
tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
|
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
|
||||||
pVg->vgId, vgSkipCnt);
|
pVg->vgId, vgSkipCnt);
|
||||||
continue;
|
continue;
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -1805,12 +1805,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
" total:%" PRId64 " reqId:0x%" PRIx64,
|
" total:%" PRId64 " reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||||
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
|
||||||
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
} else { // build rsp
|
} else { // build rsp
|
||||||
int64_t numOfRows = 0;
|
int64_t numOfRows = 0;
|
||||||
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows);
|
||||||
tmq->totalRows += numOfRows;
|
tmq->totalRows += numOfRows;
|
||||||
|
pVg->emptyBlockReceiveTs = 0;
|
||||||
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
|
" vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||||
|
|
|
@ -262,8 +262,8 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||||
SWalCont *pReadHead = &pReader->pHead->head;
|
SWalCont *pReadHead = &pReader->pHead->head;
|
||||||
int64_t ver = pReadHead->version;
|
int64_t ver = pReadHead->version;
|
||||||
|
|
||||||
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total cnt:%"PRId64 ", total size:%"PRId64, pReader->pWal->cfg.vgId, ver,
|
wDebug("vgId:%d, wal starts to fetch body, ver:%" PRId64 " ,len:%d, total", pReader->pWal->cfg.vgId, ver,
|
||||||
pReadHead->bodyLen, pReader->bodyCnt, pReader->bodyTotalSize);
|
pReadHead->bodyLen);
|
||||||
|
|
||||||
if (pReader->capacity < pReadHead->bodyLen) {
|
if (pReader->capacity < pReadHead->bodyLen) {
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pReader->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
|
@ -300,8 +300,6 @@ static int32_t walFetchBodyNew(SWalReader *pReader) {
|
||||||
|
|
||||||
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver);
|
wDebug("vgId:%d, index:%" PRId64 " is fetched, cursor advance", pReader->pWal->cfg.vgId, ver);
|
||||||
pReader->curVersion = ver + 1;
|
pReader->curVersion = ver + 1;
|
||||||
pReader->bodyCnt++;
|
|
||||||
pReader->bodyTotalSize += pReadHead->bodyLen;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue