refactor error code return

This commit is contained in:
Hongze Cheng 2019-11-25 15:25:04 +08:00
parent 534aa8b11d
commit 50c507c870
2 changed files with 86 additions and 93 deletions

View File

@ -181,29 +181,24 @@ int vnodeCreateEmptyCompFile(int vnode, int fileId) {
return 0;
}
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char name[TSDB_FILENAME_LEN];
char dHeadName[TSDB_FILENAME_LEN] = "\0";
char dLastName[TSDB_FILENAME_LEN] = "\0";
int len = 0;
struct stat filestat;
int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
int numOfFiles = 0, fileId, filesAdded = 0;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
SVnodeCfg *pCfg = &(pVnode->cfg);
if (pVnode->lastKeyOnFile == 0) {
if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
pVnode->lastKeyOnFile = (int64_t)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->numOfFiles = 1;
vnodeCreateEmptyCompFile(vnode, pVnode->fileId);
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
}
numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d",
vnode, pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
if (numOfFiles >= pVnode->numOfFiles) {
// create empty header files backward
@ -227,18 +222,34 @@ int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
#else
return -1;
#endif
pVnode->lastKeyOnFile += (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
filesAdded = 1;
numOfFiles = 0; // hacker way
}
fileId = pVnode->fileId - numOfFiles;
pVnode->commitLastKey =
pVnode->lastKeyOnFile - (int64_t)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
pVnode->commitFirstKey = pVnode->commitLastKey - (int64_t)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1;
pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1;
pVnode->commitFileId = fileId;
pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;
return 0;
}
int vnodeOpenCommitFiles(SVnodeObj *pVnode, int noTempLast) {
char name[TSDB_FILENAME_LEN];
char dHeadName[TSDB_FILENAME_LEN] = "\0";
char dLastName[TSDB_FILENAME_LEN] = "\0";
int len = 0;
struct stat filestat;
int vnode = pVnode->vnode;
int fileId, numOfFiles, filesAdded = 0;
SVnodeCfg * pCfg = &pVnode->cfg;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;
dTrace("vid:%d, commit fileId:%d, commitLastKey:%ld, vnodeLastKey:%ld, lastKeyOnFile:%ld numOfFiles:%d",
vnode, fileId, pVnode->commitLastKey, pVnode->lastKey, pVnode->lastKeyOnFile, pVnode->numOfFiles);

View File

@ -31,6 +31,7 @@ extern void vnodeGetHeadDataLname(char *headName, char *dataName, char *
extern int vnodeCreateEmptyCompFile(int vnode, int fileId);
extern int vnodeUpdateFreeSlot(SVnodeObj *pVnode);
extern SCacheBlock *vnodeGetFreeCacheBlock(SVnodeObj *pVnode);
extern int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode);
#define KEY_AT_INDEX(payload, step, idx) (*(TSKEY *)((char *)(payload) + (step) * (idx)))
typedef struct {
@ -169,7 +170,6 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
TSKEY firstKey, lastKey;
payload = pSubmit->payLoad;
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
rows = htons(pSubmit->numOfRows);
assert(rows > 0);
@ -182,6 +182,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
// Check timestamp context.
TSKEY minKey = 0, maxKey = 0;
firstKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, 0);
lastKey = KEY_AT_INDEX(payload, pObj->bytesPerPoint, rows - 1);
assert(firstKey <= lastKey);
vnodeGetValidDataRange(pObj->vnode, now, &minKey, &maxKey);
@ -242,7 +243,7 @@ int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
} else {
pPool->commitInProcess = 1;
pthread_mutex_unlock(&pPool->vmutex);
int code = vnodeImportData(pObj, &import);
code = vnodeImportData(pObj, &import);
*pNumOfPoints = import.importedRows;
}
pVnode->version++;
@ -408,7 +409,7 @@ int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid
}
vnodeGetHeadTname(pVnode->nfn, NULL, pVnode->vnode, fid);
symlink(dHeadName, pVnode->nfn);
if (symlink(dHeadName, pVnode->nfn) < 0) return -1;
pVnode->nfd = open(pVnode->nfn, O_RDWR | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
if (pVnode->nfd < 0) {
@ -441,7 +442,7 @@ int vnodeOpenTempFilesForImport(SImportHandle *pHandle, SMeterObj *pObj, int fid
lseek(pVnode->hfd, 0, SEEK_SET);
lseek(pVnode->nfd, 0, SEEK_SET);
if (tsendfile(pVnode->nfd, pVnode->hfd, NULL, pHandle->compInfoOffset) < 0) {
// TODO : deal with ERROR here
return -1;
}
// Leave a SCompInfo space here
@ -454,10 +455,10 @@ typedef enum { DATA_LOAD_TIMESTAMP = 0x1, DATA_LOAD_OTHER_DATA = 0x2 } DataLoadM
/* Function to load a block data at the requirement of mod
*/
static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod) {
static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int blockId, uint8_t loadMod, int *code) {
size_t size;
int code = 0;
SCompBlock *pBlock = pHandle->pBlocks + blockId;
*code = TSDB_CODE_SUCCESS;
assert(pBlock->sversion == pObj->sversion);
@ -477,6 +478,7 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int
if (pHandle->pField == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
pHandle->pFieldSize = size;
@ -486,12 +488,14 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int
if (read(dfd, (void *)(pHandle->pField), pHandle->pFieldSize) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to read data file, size:%ld reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pHandle->pFieldSize, strerror(errno));
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (!taosCheckChecksumWhole((uint8_t *)(pHandle->pField), pHandle->pFieldSize)) {
dError("vid:%d sid:%d meterId:%s, data file %s is broken since checksum mismatch", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->lfn);
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
}
@ -504,6 +508,7 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int
if (pHandle->buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
@ -520,16 +525,18 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int
if (pHandle->temp == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
}
if (pHandle->tempBuffer == NULL) {
pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES;
pHandle->tempBufferSize = pObj->maxBytes * pObj->pointsPerFileBlock + EXTRA_BYTES + sizeof(TSCKSUM);
pHandle->tempBuffer = malloc(pHandle->tempBufferSize);
if (pHandle->tempBuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, pHandle->tempBufferSize);
*code = TSDB_CODE_SERV_OUT_OF_MEMORY;
return -1;
}
}
@ -537,21 +544,24 @@ static int vnodeLoadNeededBlockData(SMeterObj *pObj, SImportHandle *pHandle, int
if ((loadMod & DATA_LOAD_TIMESTAMP) &&
(~(pHandle->blockLoadState & DATA_LOAD_TIMESTAMP))) { // load only timestamp part
code =
vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), PRIMARYKEY_TIMESTAMP_COL_INDEX,
pHandle->data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, sizeof(TSKEY) * pBlock->numOfPoints,
pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize);
pHandle->temp, pHandle->tempBuffer, pHandle->tempBufferSize) < 0) {
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
if (code != 0) return -1;
pHandle->blockLoadState |= DATA_LOAD_TIMESTAMP;
}
if ((loadMod & DATA_LOAD_OTHER_DATA) && (~(pHandle->blockLoadState & DATA_LOAD_OTHER_DATA))) { // load other columns
for (int col = 1; col < pBlock->numOfCols; col++) {
code = vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
if (vnodeReadColumnToMem(dfd, pBlock, &(pHandle->pField), col, pHandle->data[col]->data,
pBlock->numOfPoints * pObj->schema[col].bytes, pHandle->temp, pHandle->tempBuffer,
pHandle->tempBufferSize);
if (code != 0) return -1;
pHandle->tempBufferSize) < 0) {
*code = TSDB_CODE_FILE_CORRUPTED;
return -1;
}
}
pHandle->blockLoadState |= DATA_LOAD_OTHER_DATA;
@ -651,52 +661,6 @@ void vnodeConvertRowsToCols(SMeterObj *pObj, const char *payload, int rows, SDat
}
}
// TODO : Check the correctness
int vnodeCreateNeccessaryFiles(SVnodeObj *pVnode) {
int numOfFiles = 0, fileId, filesAdded = 0;
int vnode = pVnode->vnode;
SVnodeCfg *pCfg = &(pVnode->cfg);
if (pVnode->lastKeyOnFile == 0) {
if (pCfg->daysPerFile == 0) pCfg->daysPerFile = 10;
pVnode->fileId = pVnode->firstKey / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
pVnode->lastKeyOnFile = (long)(pVnode->fileId + 1) * pCfg->daysPerFile * tsMsPerDay[pVnode->cfg.precision] - 1;
pVnode->numOfFiles = 1;
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
}
numOfFiles = (pVnode->lastKeyOnFile - pVnode->commitFirstKey) / tsMsPerDay[pVnode->cfg.precision] / pCfg->daysPerFile;
if (pVnode->commitFirstKey > pVnode->lastKeyOnFile) numOfFiles = -1;
dTrace("vid:%d, commitFirstKey:%ld lastKeyOnFile:%ld numOfFiles:%d fileId:%d vnodeNumOfFiles:%d", pVnode->vnode,
pVnode->commitFirstKey, pVnode->lastKeyOnFile, numOfFiles, pVnode->fileId, pVnode->numOfFiles);
if (numOfFiles >= pVnode->numOfFiles) {
// create empty header files backward
filesAdded = numOfFiles - pVnode->numOfFiles + 1;
for (int i = 0; i < filesAdded; ++i) {
fileId = pVnode->fileId - pVnode->numOfFiles - i;
if (vnodeCreateEmptyCompFile(vnode, fileId) < 0) return -1;
}
} else if (numOfFiles < 0) {
// create empty header files forward
pVnode->fileId++;
if (vnodeCreateEmptyCompFile(vnode, pVnode->fileId) < 0) return -1;
pVnode->lastKeyOnFile += (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
filesAdded = 1;
numOfFiles = 0; // hacker way
}
fileId = pVnode->fileId - numOfFiles;
pVnode->commitLastKey =
pVnode->lastKeyOnFile - (long)numOfFiles * tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile;
pVnode->commitFirstKey = pVnode->commitLastKey - (long)tsMsPerDay[pVnode->cfg.precision] * pCfg->daysPerFile + 1;
pVnode->commitFileId = fileId;
pVnode->numOfFiles = pVnode->numOfFiles + filesAdded;
return 0;
}
static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int rows, int fid) {
SMeterObj * pObj = (SMeterObj *)(pImport->pObj);
SVnodeObj * pVnode = vnodeList + pObj->vnode;
@ -709,6 +673,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
SCompBlock compBlock;
TSCKSUM checksum = 0;
int pointsImported = 0;
int code = TSDB_CODE_SUCCESS;
TSKEY delta = pVnode->cfg.daysPerFile * tsMsPerDay[pVnode->cfg.precision];
TSKEY minFileKey = fid * delta;
@ -720,12 +685,12 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
// create neccessary files
pVnode->commitFirstKey = firstKey;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return -1;
if (vnodeCreateNeccessaryFiles(pVnode) < 0) return TSDB_CODE_OTHERS;
assert(pVnode->commitFileId == fid);
// Open least files to import .head(hfd) .data(dfd) .last(lfd)
if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return -1;
if (vnodeOpenMinFilesForImport(pObj->vnode, fid) < 0) return TSDB_CODE_OTHERS;
memset(&importHandle, 0, sizeof(SImportHandle));
@ -735,6 +700,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (importHandle.pHeader == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, importHandle.pHeaderSize);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
@ -742,12 +708,14 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (read(pVnode->hfd, (void *)(importHandle.pHeader), importHandle.pHeaderSize) < importHandle.pHeaderSize) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d failed to read SCompHeader part, reason:%s", pObj->vnode,
pObj->sid, pObj->meterId, fid, strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pHeader), importHandle.pHeaderSize)) {
dError("vid: %d, sid: %d, meterId: %s, fid: %d SCompHeader part is broken", pObj->vnode, pObj->sid, pObj->meterId,
fid);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
}
@ -759,6 +727,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (buffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
@ -766,6 +735,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (cbuffer == NULL) {
dError("vid: %d, sid: %d, meterId: %s, failed to allocate memory, size: %ul", pObj->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
@ -783,6 +753,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (importHandle.pHeader[pObj->sid].compInfoOffset == 0) { // No data in this file, just write it
_write_empty_point:
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
importHandle.oldNumOfBlocks = 0;
@ -793,7 +764,6 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
vnodeConvertRowsToCols(pObj, payload + rowsWritten * pObj->bytesPerPoint, rowsToWrite, data, 0);
pointsImported += rowsToWrite;
// TODO : Write the block to the file
compBlock.last = 1;
if (vnodeWriteBlockToFile(pObj, &compBlock, data, cdata, rowsToWrite) < 0) {
// TODO: deal with ERROR here
@ -816,6 +786,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (read(pVnode->hfd, (void *)(&(importHandle.compInfo)), sizeof(SCompInfo)) < sizeof(SCompInfo)) {
dError("vid:%d sid:%d meterId:%s, failed to read .head file, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
@ -823,14 +794,14 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
(!taosCheckChecksumWhole((uint8_t *)(&(importHandle.compInfo)), sizeof(SCompInfo)))) {
dError("vid:%d sid:%d meterId:%s, .head file %s is broken, delemeter:%x", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->cfn, importHandle.compInfo.delimiter);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
{ // Check the context of SCompInfo part
// Check the context of SCompInfo part
if (importHandle.compInfo.uid != pObj->uid) { // The data belongs to the other meter
goto _write_empty_point;
}
}
importHandle.oldNumOfBlocks = importHandle.compInfo.numOfBlocks;
importHandle.last = importHandle.compInfo.last;
@ -840,18 +811,21 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (importHandle.pBlocks == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate importHandle.pBlock, size:%ul", pVnode->vnode, pObj->sid,
pObj->meterId, size);
code = TSDB_CODE_SERV_OUT_OF_MEMORY;
goto _error_merge;
}
if (read(pVnode->hfd, (void *)(importHandle.pBlocks), size) < size) {
dError("vid:%d sid:%d meterId:%s, failed to read importHandle.pBlock, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, strerror(errno));
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
if (!taosCheckChecksumWhole((uint8_t *)(importHandle.pBlocks), size)) {
dError("vid:%d sid:%d meterId:%s, pBlock part is broken in %s", pVnode->vnode, pObj->sid, pObj->meterId,
pVnode->cfn);
code = TSDB_CODE_FILE_CORRUPTED;
goto _error_merge;
}
}
@ -876,6 +850,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
sizeof(SCompBlock) * blocksLeft) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode,
pObj->sid, pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * blocksLeft, strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
@ -883,6 +858,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
if (twrite(pVnode->nfd, (void *)(&checksum), sizeof(TSCKSUM)) < 0) {
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(TSCKSUM), strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
@ -1001,7 +977,8 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
SCompBlock *pBlock = importHandle.pBlocks + blockIter.slot;
if (pBlock->sversion != pObj->sversion) { /*TODO*/
}
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP) < 0) {
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot, DATA_LOAD_TIMESTAMP, &code) < 0) {
goto _error_merge;
}
int pos = (*vnodeSearchKeyFunc[pObj->searchAlgorithm])(
importHandle.data[PRIMARYKEY_TIMESTAMP_COL_INDEX]->data, pBlock->numOfPoints, key, TSQL_SO_ASC);
@ -1027,6 +1004,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
// Open the new .t file if not opened yet.
if (pVnode->nfd <= 0) {
if (vnodeOpenTempFilesForImport(&importHandle, pObj, fid) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
}
@ -1039,6 +1017,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
dError("vid:%d sid:%d meterId:%s, failed to write %s file, size:%ul, reason:%s", pVnode->vnode, pObj->sid,
pObj->meterId, pVnode->nfn, sizeof(SCompBlock) * (blockIter.slot - blockIter.oslot),
strerror(errno));
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
@ -1071,7 +1050,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
} else { // Merge block and payload from payloadIter
if (vnodeLoadNeededBlockData(pObj, &importHandle, blockIter.slot,
DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA) < 0) { // Load neccessary blocks
DATA_LOAD_TIMESTAMP | DATA_LOAD_OTHER_DATA, &code) < 0) { // Load neccessary blocks
goto _error_merge;
}
@ -1172,6 +1151,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
// Write the SCompInfo part
if (vnodeCloseImportFiles(pObj, &importHandle) < 0) {
code = TSDB_CODE_OTHERS;
goto _error_merge;
}
@ -1187,7 +1167,7 @@ static int vnodeMergeDataIntoFile(SImportInfo *pImport, const char *payload, int
tfree(importHandle.temp);
tfree(importHandle.tempBuffer);
return 0;
return code;
_error_merge:
tfree(buffer);
@ -1214,7 +1194,7 @@ _error_merge:
remove(pVnode->nfn);
}
return -1;
return code;
}
#define FORWARD_ITER(iter, step, slotLimit, posLimit) \
@ -1316,7 +1296,7 @@ int vnodeImportDataToCache(SImportInfo *pImport, const char *payload, const int
pBuffer = (SMergeBuffer *)malloc(size);
if (pBuffer == NULL) {
dError("vid:%d sid:%d meterId:%s, failed to allocate memory, size:%d", pObj->vnode, pObj->sid, pObj->meterId, size);
return code;
return TSDB_CODE_SERV_OUT_OF_MEMORY;
}
pBuffer->spos = 0;
pBuffer->epos = 0;
@ -1532,7 +1512,7 @@ int vnodeImportDataToFiles(SImportInfo *pImport, char *payload, const int rows)
KEY_AT_INDEX(payload, pObj->bytesPerPoint, (srow + nrows - 1)));
code = vnodeMergeDataIntoFile(pImport, payload + (srow * pObj->bytesPerPoint), nrows, fid);
if (code != 0) break;
if (code != TSDB_CODE_SUCCESS) break;
}
return code;
@ -1548,6 +1528,7 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
// 1. import data in range (pObj->lastKeyOnFile, INT64_MAX) into cache
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, pObj->lastKeyOnFile + 1, INT64_MAX,
&srow, &nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToCache(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
if (pImport->commit) { // Need to commit now
pPool->commitInProcess = 0;
@ -1555,12 +1536,13 @@ int vnodeImportData(SMeterObj *pObj, SImportInfo *pImport) {
return code;
}
if (code != 0) return code;
if (code != TSDB_CODE_SUCCESS) return code;
}
// 2. import data (0, pObj->lastKeyOnFile) into files
if (vnodeSearchKeyInRange(pImport->payload, pObj->bytesPerPoint, pImport->rows, 0, pObj->lastKeyOnFile - 1, &srow,
&nrows) >= 0) {
assert(nrows > 0);
code = vnodeImportDataToFiles(pImport, pImport->payload + pObj->bytesPerPoint * srow, nrows);
}