fix:[TD24010] lost data if apply ver is small than commit ver
This commit is contained in:
parent
261f2736f7
commit
f9142c0ddb
|
@ -132,7 +132,7 @@ typedef struct {
|
||||||
} SWalRef;
|
} SWalRef;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t scanUncommited;
|
// int8_t scanUncommited;
|
||||||
int8_t scanNotApplied;
|
int8_t scanNotApplied;
|
||||||
int8_t scanMeta;
|
int8_t scanMeta;
|
||||||
int8_t enableRef;
|
int8_t enableRef;
|
||||||
|
|
|
@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
code = smlModifyDBSchemas(info);
|
code = smlModifyDBSchemas(info);
|
||||||
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS
|
||||||
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|
|| code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH
|
||||||
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break;
|
|| code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum);
|
||||||
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
} while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES);
|
||||||
|
|
|
@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||||
if (cond) {
|
if (cond) {
|
||||||
pReader->cond = *cond;
|
pReader->cond = *cond;
|
||||||
} else {
|
} else {
|
||||||
pReader->cond.scanUncommited = 0;
|
// pReader->cond.scanUncommited = 0;
|
||||||
pReader->cond.scanNotApplied = 0;
|
pReader->cond.scanNotApplied = 0;
|
||||||
pReader->cond.scanMeta = 0;
|
pReader->cond.scanMeta = 0;
|
||||||
pReader->cond.enableRef = 0;
|
pReader->cond.enableRef = 0;
|
||||||
|
@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) {
|
||||||
int64_t lastVer = walGetLastVer(pReader->pWal);
|
int64_t lastVer = walGetLastVer(pReader->pWal);
|
||||||
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
int64_t committedVer = walGetCommittedVer(pReader->pWal);
|
||||||
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
int64_t appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010]
|
||||||
|
wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer);
|
||||||
|
taosMsleep(1);
|
||||||
|
appliedVer = walGetAppliedVer(pReader->pWal);
|
||||||
|
}
|
||||||
|
// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer;
|
||||||
// endVer = TMIN(appliedVer, endVer);
|
// endVer = TMIN(appliedVer, endVer);
|
||||||
|
|
||||||
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64
|
||||||
", applied index:%" PRId64 ", end index:%" PRId64,
|
", applied index:%" PRId64,
|
||||||
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer);
|
pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer);
|
||||||
while (fetchVer <= endVer) {
|
while (fetchVer <= committedVer) {
|
||||||
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
if (walFetchHeadNew(pReader, fetchVer) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue