From 6e48ca27d3735aba7fb18d20a89ac93b39e2b854 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 12 Nov 2019 15:11:36 +0800 Subject: [PATCH] Fix several bug --- src/system/detail/src/vnodeImport.c | 87 ++++++++++++++++------------- 1 file changed, 47 insertions(+), 40 deletions(-) diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index bc1cbd14f7..d774a55f38 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -230,54 +230,21 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi int *pNumOfPoints, TSKEY now) { SSubmitMsg *pSubmit = (SSubmitMsg *)cont; SVnodeObj * pVnode = vnodeList + pObj->vnode; - int rows; - char * payload; + int rows = 0; + char * payload = NULL; int code = TSDB_CODE_ACTION_IN_PROGRESS; SCachePool *pPool = (SCachePool *)(pVnode->pCachePool); SShellObj * pShell = (SShellObj *)param; int pointsImported = 0; - TSKEY minKey, maxKey; + TSKEY firstKey, lastKey; - rows = htons(pSubmit->numOfRows); - int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); - if (expectedLen != contLen) { - dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, - expectedLen, contLen); - return TSDB_CODE_WRONG_MSG_SIZE; - } - - // FIXME: check sversion here should not be here (Take import convert to insert case into consideration) - if (sversion != pObj->sversion) { - dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, - pObj->sversion, sversion); - return TSDB_CODE_OTHERS; - } - - // Check timestamp context. payload = pSubmit->payLoad; - TSKEY firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); - TSKEY lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); - assert(firstKey <= lastKey); - vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); - if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) { - dError( - "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld " - "maxAllowedKey:%ld", - pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey); - return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; - } - - // FIXME: Commit log here is invalid (Take retry into consideration) - if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { - if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; - code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); - if (code != 0) return code; - } + firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); if (firstKey > pObj->lastKey) { // Just call insert vnodeClearMeterState(pObj, TSDB_METER_STATE_IMPORTING); vnodeSetMeterState(pObj, TSDB_METER_STATE_INSERT); - code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, pObj->sversion, &pointsImported, now); + code = vnodeInsertPoints(pObj, cont, contLen, TSDB_DATA_SOURCE_LOG, NULL, sversion, &pointsImported, now); if (pShell) { pShell->code = code; @@ -286,6 +253,43 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi vnodeClearMeterState(pObj, TSDB_METER_STATE_INSERT); } else { // trigger import + { + rows = htons(pSubmit->numOfRows); + assert(rows > 0); + int expectedLen = rows * pObj->bytesPerPoint + sizeof(pSubmit->numOfRows); + if (expectedLen != contLen) { + dError("vid:%d sid:%d id:%s, invalid import, expected:%d, contLen:%d", pObj->vnode, pObj->sid, pObj->meterId, + expectedLen, contLen); + return TSDB_CODE_WRONG_MSG_SIZE; + } + + if (sversion != pObj->sversion) { + dError("vid:%d sid:%d id:%s, invalid sversion, expected:%d received:%d", pObj->vnode, pObj->sid, pObj->meterId, + pObj->sversion, sversion); + return TSDB_CODE_OTHERS; + } + + // Check timestamp context. + TSKEY minKey = 0, maxKey = 0; + lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); + assert(firstKey <= lastKey); + vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); + if (firstKey < minKey || firstKey > maxKey || lastKey < minKey || lastKey > maxKey) { + dError( + "vid:%d sid:%d id:%s, invalid timestamp to import, rows:%d firstKey: %ld lastKey: %ld minAllowedKey:%ld " + "maxAllowedKey:%ld", + pObj->vnode, pObj->sid, pObj->meterId, rows, firstKey, lastKey, minKey, maxKey); + return TSDB_CODE_TIMESTAMP_OUT_OF_RANGE; + } + + // TODO: Retry here will cause duplicate commit log written + if (pVnode->cfg.commitLog && source != TSDB_DATA_SOURCE_LOG) { + if (pVnode->logFd < 0) return TSDB_CODE_INVALID_COMMIT_LOG; + code = vnodeWriteToCommitLog(pObj, TSDB_ACTION_IMPORT, cont, contLen, sversion); + if (code != 0) return code; + } + } + SImportInfo *pNew, import; dTrace("vid:%d sid:%d id:%s, try to import %d rows data, firstKey:%ld, lastKey:%ld, object lastKey:%ld", @@ -1343,7 +1347,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int int code = -1; SCacheInfo * pInfo = (SCacheInfo *)(pObj->pCache); int payloadIter; - SCachePool * pPool = pVnode->pCachePool; + SCachePool * pPool = (SCachePool *)(pVnode->pCachePool); int isCacheIterEnd = 0; int spayloadIter = 0; int isAppendData = 0; @@ -1369,7 +1373,10 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int if (pInfo->numOfBlocks == 0) { if (vnodeAllocateCacheBlock(pObj) < 0) { - // TODO: deal with the ERROR here + pImport->importedRows = 0; + pImport->commit = 1; + code = TSDB_CODE_ACTION_IN_PROGRESS; + return code; } }