enh: rsma code optimization
This commit is contained in:
parent
f9e0aad827
commit
0de7f3e3a7
|
@ -123,7 +123,6 @@ struct SRSmaInfoItem {
|
|||
uint8_t nSkipped; // number of skipped to fetch data from all active window
|
||||
int8_t fetchLevel;
|
||||
int32_t maxDelay; // ms
|
||||
int64_t lastFetch; // ms
|
||||
tmr_h tmrId;
|
||||
};
|
||||
|
||||
|
|
|
@ -19,9 +19,8 @@
|
|||
#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid
|
||||
#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt
|
||||
#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt
|
||||
#define RSMA_FETCH_DELAY_MAX (180000) // ms
|
||||
#define RSMA_FETCH_SKIP_MAX (10) // cnt
|
||||
#define RSMA_FETCH_ACTIVE_MAX (180) // ms
|
||||
#define RSMA_FETCH_DELAY_MAX (900000) // ms
|
||||
#define RSMA_FETCH_ACTIVE_MAX (1800) // ms
|
||||
|
||||
SSmaMgmt smaMgmt = {
|
||||
.inited = 0,
|
||||
|
@ -1646,26 +1645,20 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm
|
|||
}
|
||||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
// if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
|
||||
// pItem->nSkipped = 0;
|
||||
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed",
|
||||
// SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
|
||||
// } else {
|
||||
// if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
|
||||
// ++pItem->nSkipped;
|
||||
// smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
|
||||
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
|
||||
// continue;
|
||||
// } else {
|
||||
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
|
||||
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
|
||||
// }
|
||||
// }
|
||||
if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) {
|
||||
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed",
|
||||
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
|
||||
} else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) {
|
||||
++pItem->nSkipped;
|
||||
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ",
|
||||
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
|
||||
continue;
|
||||
} else {
|
||||
smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
|
||||
SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
|
||||
}
|
||||
|
||||
pItem->lastFetch = curMs;
|
||||
|
||||
// smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ",
|
||||
// SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv);
|
||||
pItem->nSkipped = 0;
|
||||
|
||||
if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) {
|
||||
goto _err;
|
||||
|
@ -1680,7 +1673,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm
|
|||
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay);
|
||||
} else {
|
||||
smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8
|
||||
" maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8,
|
||||
" maxDelay:%d, fetch not executed as fetch level is %" PRIi8,
|
||||
SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue