fix modification error
This commit is contained in:
parent
528b601287
commit
534aa8b11d
|
@ -171,15 +171,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
payload = pSubmit->payLoad;
|
payload = pSubmit->payLoad;
|
||||||
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
|
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
|
||||||
|
|
||||||
if (firstKey > pObj->lastKey) { // Just call insert
|
|
||||||
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
|
|
||||||
// TODO: Here may fail to set the state, add error handling.
|
|
||||||
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
|
|
||||||
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
|
|
||||||
// TODO: outside clear state function is invalid for this structure
|
|
||||||
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
|
|
||||||
} else { // trigger import
|
|
||||||
{
|
|
||||||
rows = htons(pSubmit->numOfRows);
|
rows = htons(pSubmit->numOfRows);
|
||||||
assert(rows > 0);
|
assert(rows > 0);
|
||||||
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
|
int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows);
|
||||||
|
@ -189,12 +180,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
return TSDB_CODE_WRONG_MSG_SIZE;
|
return TSDB_CODE_WRONG_MSG_SIZE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sversion != pObj->sversion) {
|
|
||||||
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
|
||||||
pObj->sversion, sversion);
|
|
||||||
return TSDB_CODE_OTHERS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check timestamp context.
|
// Check timestamp context.
|
||||||
TSKEY minKey = 0, maxKey = 0;
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
|
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
|
||||||
|
@ -208,12 +193,24 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
|
||||||
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
|
return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Retry here will cause duplicate commit log written
|
|
||||||
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
|
if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) {
|
||||||
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
|
if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG;
|
||||||
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
|
code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion);
|
||||||
if (code != 0) return code;
|
if (code != 0) return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (firstKey > pObj->lastKey) { // Just call insert
|
||||||
|
vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING);
|
||||||
|
// TODO: Here may fail to set the state, add error handling.
|
||||||
|
vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT);
|
||||||
|
code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, pNumOfPoints, now);
|
||||||
|
// TODO: outside clear state function is invalid for this structure
|
||||||
|
vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT);
|
||||||
|
} else { // trigger import
|
||||||
|
if (sversion != pObj->sversion) {
|
||||||
|
dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId,
|
||||||
|
pObj->sversion, sversion);
|
||||||
|
return TSDB_CODE_OTHERS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SImportInfo import;
|
SImportInfo import;
|
||||||
|
|
Loading…
Reference in New Issue