diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 94de72f86e..906e5daf4e 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -171,6 +171,34 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi payload = pSubmit->payLoad; firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); + rows = htons(pSubmit->numOfRows); + assert(rows > 0); + int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); + if (expectedLen != contLen) { + dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, + expectedLen, contLen); + return TSDB_CODE_WRONG_MSG_SIZE; + } + + // Check timestamp context. + TSKEY minKey = 0, maxKey = 0; + lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); + assert(firstKey <= lastKey); + vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); + if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) { + dError( + "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld " + "maxAllowedKey:%ld", + pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey); + return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; + } + + if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { + if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; + code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); + if (code != 0) return code; + } + if (firstKey > pObj->lastKey) { // Just call insert vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); // TODO: Here may fail to set the state, add error handling. @@ -179,41 +207,10 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi // TODO: outside clear state function is invalid for this structure vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { // trigger import - { - rows = htons(pSubmit->numOfRows); - assert(rows > 0); - int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); - if (expectedLen != contLen) { - dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, - expectedLen, contLen); - return TSDB_CODE_WRONG_MSG_SIZE; - } - - if (sversion != pObj->sversion) { - dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->sversion, sversion); - return TSDB_CODE_OTHERS; - } - - // Check timestamp context. - TSKEY minKey = 0, maxKey = 0; - lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); - assert(firstKey <= lastKey); - vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); - if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) { - dError( - "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld " - "maxAllowedKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey); - return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; - } - - // TODO: Retry here will cause duplicate commit log written - if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { - if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; - code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); - if (code != 0) return code; - } + if (sversion != pObj->sversion) { + dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, + pObj->sversion, sversion); + return TSDB_CODE_OTHERS; } SImportInfo import;