[td-225] handle error code.

This commit is contained in:
Haojun Liao 2020-06-28 16:58:25 +08:00
parent 1d3a13b4c9
commit d748013039
3 changed files with 48 additions and 25 deletions

View File

@ -120,7 +120,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) { if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno)); TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
errno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
goto _err; goto _err;
} }
@ -143,7 +143,7 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
return tsdbLoadCompIdx(pHelper, NULL); return tsdbLoadCompIdx(pHelper, NULL);
_err: _err:
return -1; return terrno;
} }
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {

View File

@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->type = TSDB_QUERY_TYPE_ALL; pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = -1; pQueryHandle->cur.fid = -1;
pQueryHandle->cur.win = TSWINDOW_INITIALIZER; pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order); pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qinfo = qinfo; pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock; pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
} }
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) { static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
// todo check open file failed
SFileGroup* fileGroup = pQueryHandle->pFileGroup; SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0); assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
//open file failed, return error code to client
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// load all the comp offset value for all tables in this file // load all the comp offset value for all tables in this file
*numOfBlocks = 0; *numOfBlocks = 0;
@ -593,8 +597,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
if (pCheckInfo->pDataCols == NULL) { if (pCheckInfo->pDataCols == NULL) {
STsdbMeta* pMeta = tsdbGetMeta(pRepo); STsdbMeta* pMeta = tsdbGetMeta(pRepo);
// TODO // TODO
pCheckInfo->pDataCols = pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
} }
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj)); tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
@ -1359,16 +1362,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
} }
// todo opt for only one table case // todo opt for only one table case
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
pQueryHandle->numOfBlocks = 0; pQueryHandle->numOfBlocks = 0;
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
int32_t numOfBlocks = 0; int32_t numOfBlocks = 0;
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL; int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) { if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
break; break;
} }
@ -1393,20 +1398,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
// no data in file anymore // no data in file anymore
if (pQueryHandle->numOfBlocks <= 0) { if (pQueryHandle->numOfBlocks <= 0) {
assert(pQueryHandle->pFileGroup == NULL); if (code == TSDB_CODE_SUCCESS) {
assert(pQueryHandle->pFileGroup == NULL);
}
cur->fid = -1; // denote that there are no data in file anymore cur->fid = -1; // denote that there are no data in file anymore
*exists = false;
return false; return code;
} }
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1; cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
cur->fid = pQueryHandle->pFileGroup->fileId; cur->fid = pQueryHandle->pFileGroup->fileId;
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) { static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb); STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
SQueryFilePos* cur = &pQueryHandle->cur; SQueryFilePos* cur = &pQueryHandle->cur;
@ -1419,7 +1429,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order); tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid); tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// check if current file block is all consumed // check if current file block is all consumed
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
@ -1430,7 +1440,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) || if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) { (cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists // all data blocks in current file has been checked already, try next file if exists
return getDataBlocksInFilesImpl(pQueryHandle); return getDataBlocksInFilesImpl(pQueryHandle, exists);
} else { } else {
// next block of the same file // next block of the same file
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1; int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
@ -1440,11 +1450,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
cur->blockCompleted = false; cur->blockCompleted = false;
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot]; STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo); *exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
return TSDB_CODE_SUCCESS;
} }
} else { } else {
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo); handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
return pQueryHandle->realNumOfRows > 0; *exists = pQueryHandle->realNumOfRows > 0;
return TSDB_CODE_SUCCESS;
} }
} }
} }
@ -1576,8 +1590,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
} }
if (pQueryHandle->checkFiles) { if (pQueryHandle->checkFiles) {
if (getDataBlocksInFiles(pQueryHandle)) { bool exists = true;
return true; int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
return false;
}
if (exists) {
return exists;
} }
pQueryHandle->activeIndex = 0; pQueryHandle->activeIndex = 0;

View File

@ -110,11 +110,14 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
pRet->rsp = pRsp; pRet->rsp = pRsp;
// current connect is broken // current connect is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) { if ((code == TSDB_CODE_SUCCESS) &&
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle); (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS)) {
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo,
pReadMsg->rpcMsg.handle);
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
//NOTE: there two refcount, needs to kill twice, todo refactor // NOTE: there two refcount, needs to kill twice, todo refactor
qKillQuery(pQInfo, vnodeRelease, pVnode); qKillQuery(pQInfo, vnodeRelease, pVnode);
qKillQuery(pQInfo, vnodeRelease, pVnode); qKillQuery(pQInfo, vnodeRelease, pVnode);