enh: rsma checkpoint
This commit is contained in:
parent
96b5024347
commit
722777f8c9
|
@ -1106,7 +1106,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
}
|
}
|
||||||
// stream state: process checkpoint response in async mode
|
// stream state: process checkpoint response in async mode
|
||||||
int32_t nStreamFlushed = 0;
|
int32_t nStreamFlushed = 0;
|
||||||
int32_t nMSleep = 0;
|
int32_t nSleep = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
|
while ((infoHash = taosHashIterate(pInfoHash, infoHash))) {
|
||||||
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash;
|
||||||
|
@ -1120,16 +1120,16 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
pRSmaInfo->suid, &pResList, &streamFlushed);
|
pRSmaInfo->suid, &pResList, &streamFlushed);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
|
if (streamFlushed && (++nStreamFlushed >= nTaskInfo)) {
|
||||||
smaInfo("%s:%d checkpoint ready, %d ms consumed, received/total: %d/%d", __func__, __LINE__, nMSleep,
|
smaInfo("%s:%d checkpoint ready, %d us consumed, received/total: %d/%d", __func__, __LINE__, nSleep * 10,
|
||||||
nStreamFlushed, nTaskInfo);
|
nStreamFlushed, nTaskInfo);
|
||||||
goto _checkpoint;
|
goto _checkpoint;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMsleep(1);
|
taosUsleep(10);
|
||||||
++nMSleep;
|
++nSleep;
|
||||||
smaInfo("%s:%d wait for checkpoint ready, %d ms elapsed, received/total: %d/%d", __func__, __LINE__, nMSleep,
|
smaInfo("%s:%d wait for checkpoint ready, %d us elapsed, received/total: %d/%d", __func__, __LINE__, nSleep * 10,
|
||||||
nStreamFlushed, nTaskInfo);
|
nStreamFlushed, nTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue