diff --git a/src/system/detail/src/vnodeFile.c b/src/system/detail/src/vnodeFile.c index 96b97f4059..c598be59ee 100644 --- a/src/system/detail/src/vnodeFile.c +++ b/src/system/detail/src/vnodeFile.c @@ -181,29 +181,24 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) { return 0; } -int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { - char name[TSDB_FILENAME_LEN]; - char dHeadName[TSDB_FILENAME_LEN] = "\0"; - char dLastName[TSDB_FILENAME_LEN] = "\0"; - int len = 0; - struct stat filestat; - int vnode = pVnode->vnode; - int fileId, numOfFiles, filesAdded = 0; - SVnodeCfg * pCfg = &pVnode->cfg; +int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) { + int numOfFiles = 0, fileId, filesAdded = 0; + int vnode = pVnode->vnode; + SVnodeCfg *pCfg = &(pVnode->cfg); if (pVnode->lastKeyOnFile == 0) { if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10; pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; - pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1; + pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1; pVnode->numOfFiles = 1; - vnodeCreateEmptyCompFile(vnode, pVnode->fileId); + if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1; } numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1; - dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", - vnode, pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles); + dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode, + pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles); if (numOfFiles >= pVnode->numOfFiles) { // create empty header files backward @@ -215,7 +210,7 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { #ifdef CLUSTER return vnodeRecoverFromPeer(pVnode, fileId); #else - return -1; + return -1; #endif } } else if (numOfFiles < 0) { @@ -225,20 +220,36 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { #ifdef CLUSTER return vnodeRecoverFromPeer(pVnode, pVnode->fileId); #else - return -1; + return -1; #endif - pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; + pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; filesAdded = 1; numOfFiles = 0; // hacker way } fileId = pVnode->fileId - numOfFiles; pVnode->commitLastKey = - pVnode->lastKeyOnFile - (int64_t)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; - pVnode->commitFirstKey = pVnode->commitLastKey - (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1; + pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; + pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1; pVnode->commitFileId = fileId; pVnode->numOfFiles = pVnode->numOfFiles + filesAdded; + return 0; +} + + +int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) { + char name[TSDB_FILENAME_LEN]; + char dHeadName[TSDB_FILENAME_LEN] = "\0"; + char dLastName[TSDB_FILENAME_LEN] = "\0"; + int len = 0; + struct stat filestat; + int vnode = pVnode->vnode; + int fileId, numOfFiles, filesAdded = 0; + SVnodeCfg * pCfg = &pVnode->cfg; + + if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1; + dTrace("vid:%d, commit fileId:%d, commitLastKey:%ld, vnodeLastKey:%ld, lastKeyOnFile:%ld numOfFiles:%d", vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles); diff --git a/src/system/detail/src/vnodeImport.c b/src/system/detail/src/vnodeImport.c index 906e5daf4e..fa14a53c58 100644 --- a/src/system/detail/src/vnodeImport.c +++ b/src/system/detail/src/vnodeImport.c @@ -31,6 +31,7 @@ extern void vnodeGetHeadDataLname(char *headName, char *dataName, char * extern int vnodeCreateEmptyCompFile(int vnode, int fileId); extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode); extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode); +extern int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode); #define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx))) typedef struct { @@ -169,7 +170,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi TSKEY firstKey, lastKey; payload = pSubmit->payLoad; - firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); rows = htons(pSubmit->numOfRows); assert(rows > 0); @@ -182,6 +182,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi // Check timestamp context. TSKEY minKey = 0, maxKey = 0; + firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0); lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1); assert(firstKey <= lastKey); vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey); @@ -242,7 +243,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi } else { pPool->commitInProcess = 1; pthread_mutex_unlock(&pPool->vmutex); - int code = vnodeImportData(pObj, &import); + code = vnodeImportData(pObj, &import); *pNumOfPoints = import.importedRows; } pVnode->version++; @@ -408,7 +409,7 @@ int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid } vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid); - symlink(dHeadName, pVnode->nfn); + if (symlink(dHeadName, pVnode->nfn) < 0) return -1; pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); if (pVnode->nfd < 0) { @@ -441,7 +442,7 @@ int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid lseek(pVnode->hfd, 0, SEEK_SET); lseek(pVnode->nfd, 0, SEEK_SET); if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) { - // TODO : deal with ERROR here + return -1; } // Leave a SCompInfo space here @@ -454,10 +455,10 @@ typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadM /* Function to load a block data at the requirement of mod */ -static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod) { +static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod, int *code) { size_t size; - int code = 0; SCompBlock *pBlock = pHandle->pBlocks + blockId; + *code = TSDB_CODE_SUCCESS; assert(pBlock->sversion == pObj->sversion); @@ -477,6 +478,7 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int if (pHandle->pField == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, size); + *code = TSDB_CODE_SERV_OUT_OF_MEMORY; return -1; } pHandle->pFieldSize = size; @@ -486,12 +488,14 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) { dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, pHandle->pFieldSize, strerror(errno)); + *code = TSDB_CODE_FILE_CORRUPTED; return -1; } if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) { dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->lfn); + *code = TSDB_CODE_FILE_CORRUPTED; return -1; } } @@ -504,6 +508,7 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int if (pHandle->buffer == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, size); + *code = TSDB_CODE_SERV_OUT_OF_MEMORY; return -1; } @@ -520,16 +525,18 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int if (pHandle->temp == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, size); + *code = TSDB_CODE_SERV_OUT_OF_MEMORY; return -1; } } if (pHandle->tempBuffer == NULL) { - pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES; + pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES + sizeof(TSCKSUM); pHandle->tempBuffer = malloc(pHandle->tempBufferSize); if (pHandle->tempBuffer == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, pHandle->tempBufferSize); + *code = TSDB_CODE_SERV_OUT_OF_MEMORY; return -1; } } @@ -537,21 +544,24 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int if ((loadMod & DATA_LOAD_TIMESTAMP) && (~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part - code = - vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX, + if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX, pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints, - pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize); + pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize) < 0) { + *code = TSDB_CODE_FILE_CORRUPTED; + return -1; + } - if (code != 0) return -1; pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP; } if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns for (int col = 1; col < pBlock->numOfCols; col++) { - code = vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data, - pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer, - pHandle->tempBufferSize); - if (code != 0) return -1; + if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data, + pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer, + pHandle->tempBufferSize) < 0) { + *code = TSDB_CODE_FILE_CORRUPTED; + return -1; + } } pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA; @@ -651,52 +661,6 @@ void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SDat } } -// TODO : Check the correctness -int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) { - int numOfFiles = 0, fileId, filesAdded = 0; - int vnode = pVnode->vnode; - SVnodeCfg *pCfg = &(pVnode->cfg); - - if (pVnode->lastKeyOnFile == 0) { - if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10; - pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; - pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1; - pVnode->numOfFiles = 1; - if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1; - } - - numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile; - if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1; - - dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode, - pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles); - - if (numOfFiles >= pVnode->numOfFiles) { - // create empty header files backward - filesAdded = numOfFiles - pVnode->numOfFiles + 1; - for (int i = 0; i < filesAdded; ++i) { - fileId = pVnode->fileId - pVnode->numOfFiles - i; - if (vnodeCreateEmptyCompFile(vnode, fileId) < 0) return -1; - } - } else if (numOfFiles < 0) { - // create empty header files forward - pVnode->fileId++; - if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1; - pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; - filesAdded = 1; - numOfFiles = 0; // hacker way - } - - fileId = pVnode->fileId - numOfFiles; - pVnode->commitLastKey = - pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile; - pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1; - pVnode->commitFileId = fileId; - pVnode->numOfFiles = pVnode->numOfFiles + filesAdded; - - return 0; -} - static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) { SMeterObj * pObj = (SMeterObj *)(pImport->pObj); SVnodeObj * pVnode = vnodeList + pObj->vnode; @@ -709,6 +673,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int SCompBlock compBlock; TSCKSUM checksum = 0; int pointsImported = 0; + int code = TSDB_CODE_SUCCESS; TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision]; TSKEY minFileKey = fid * delta; @@ -720,12 +685,12 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int // create neccessary files pVnode->commitFirstKey = firstKey; - if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1; + if (vnodeCreateNeccessaryFiles(pVnode) < 0) return TSDB_CODE_OTHERS; assert(pVnode->commitFileId == fid); // Open least files to import .head(hfd) .data(dfd) .last(lfd) - if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return -1; + if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return TSDB_CODE_OTHERS; memset(&importHandle, 0, sizeof(SImportHandle)); @@ -735,6 +700,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (importHandle.pHeader == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, importHandle.pHeaderSize); + code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error_merge; } @@ -742,12 +708,14 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) { dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode, pObj->sid, pObj->meterId, fid, strerror(errno)); + code = TSDB_CODE_OTHERS; goto _error_merge; } if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) { dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId, fid); + code = TSDB_CODE_FILE_CORRUPTED; goto _error_merge; } } @@ -759,6 +727,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (buffer == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, size); + code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error_merge; } @@ -766,6 +735,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (cbuffer == NULL) { dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid, pObj->meterId, size); + code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error_merge; } @@ -783,6 +753,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it _write_empty_point: if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { + code = TSDB_CODE_OTHERS; goto _error_merge; } importHandle.oldNumOfBlocks = 0; @@ -793,7 +764,6 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0); pointsImported += rowsToWrite; - // TODO : Write the block to the file compBlock.last = 1; if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) { // TODO: deal with ERROR here @@ -816,6 +786,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) { dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, strerror(errno)); + code = TSDB_CODE_FILE_CORRUPTED; goto _error_merge; } @@ -823,13 +794,13 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int (!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) { dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter); + code = TSDB_CODE_FILE_CORRUPTED; goto _error_merge; } - { // Check the context of SCompInfo part - if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter - goto _write_empty_point; - } + // Check the context of SCompInfo part + if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter + goto _write_empty_point; } importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks; @@ -840,18 +811,21 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (importHandle.pBlocks == NULL) { dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid, pObj->meterId, size); + code = TSDB_CODE_SERV_OUT_OF_MEMORY; goto _error_merge; } if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) { dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, strerror(errno)); + code = TSDB_CODE_FILE_CORRUPTED; goto _error_merge; } if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) { dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->cfn); + code = TSDB_CODE_FILE_CORRUPTED; goto _error_merge; } } @@ -876,6 +850,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int sizeof(SCompBlock) * blocksLeft) < 0) { dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno)); + code = TSDB_CODE_OTHERS; goto _error_merge; } } @@ -883,6 +858,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) { dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno)); + code = TSDB_CODE_OTHERS; goto _error_merge; } } @@ -1001,7 +977,8 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot; if (pBlock->sversion != pObj->sversion) { /*TODO*/ } - if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP) < 0) { + if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP, &code) < 0) { + goto _error_merge; } int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])( importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC); @@ -1027,6 +1004,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int // Open the new .t file if not opened yet. if (pVnode->nfd <= 0) { if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) { + code = TSDB_CODE_OTHERS; goto _error_merge; } } @@ -1039,6 +1017,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot), strerror(errno)); + code = TSDB_CODE_OTHERS; goto _error_merge; } @@ -1071,7 +1050,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int } else { // Merge block and payload from payloadIter if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, - DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA) < 0) { // Load neccessary blocks + DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA, &code) < 0) { // Load neccessary blocks goto _error_merge; } @@ -1172,6 +1151,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int // Write the SCompInfo part if (vnodeCloseImportFiles(pObj, &importHandle) < 0) { + code = TSDB_CODE_OTHERS; goto _error_merge; } @@ -1187,7 +1167,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int tfree(importHandle.temp); tfree(importHandle.tempBuffer); - return 0; + return code; _error_merge: tfree(buffer); @@ -1214,7 +1194,7 @@ _error_merge: remove(pVnode->nfn); } - return -1; + return code; } #define FORWARD_ITER(iter, step, slotLimit, posLimit) \ @@ -1316,7 +1296,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int pBuffer = (SMergeBuffer *)malloc(size); if (pBuffer == NULL) { dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size); - return code; + return TSDB_CODE_SERV_OUT_OF_MEMORY; } pBuffer->spos = 0; pBuffer->epos = 0; @@ -1532,7 +1512,7 @@ int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows) KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1))); code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid); - if (code != 0) break; + if (code != TSDB_CODE_SUCCESS) break; } return code; @@ -1548,6 +1528,7 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { // 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX, &srow, &nrows) >= 0) { + assert(nrows > 0); code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows); if (pImport->commit) { // Need to commit now pPool->commitInProcess = 0; @@ -1555,12 +1536,13 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) { return code; } - if (code != 0) return code; + if (code != TSDB_CODE_SUCCESS) return code; } // 2. import data (0, pObj->lastKeyOnFile) into files if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow, &nrows) >= 0) { + assert(nrows > 0); code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows); }