[td-225] return error code in tsdb to client during the data retrieval.
This commit is contained in:
parent
607d4fd238
commit
7bcd7447eb
|
@ -211,8 +211,8 @@ static void *dnodeProcessReadQueue(void *param) {
|
||||||
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
||||||
} else {
|
} else {
|
||||||
if (code == TSDB_CODE_QRY_HAS_RSP) {
|
if (code == TSDB_CODE_QRY_HAS_RSP) {
|
||||||
dnodeSendRpcReadRsp(pVnode, pReadMsg, TSDB_CODE_SUCCESS);
|
dnodeSendRpcReadRsp(pVnode, pReadMsg, pReadMsg->rpcMsg.code);
|
||||||
} else {
|
} else { // code == TSDB_CODE_NOT_READY, do not return msg to client
|
||||||
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
|
dnodeDispatchNonRspMsg(pVnode, pReadMsg, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2338,6 +2338,11 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) { // load data block failed, abort query
|
||||||
|
longjmp(pRuntimeEnv->env, terrno);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
|
// query start position can not move into tableApplyFunctionsOnBlock due to limit/offset condition
|
||||||
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
|
pQuery->pos = QUERY_IS_ASC_QUERY(pQuery)? 0 : blockInfo.rows - 1;
|
||||||
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
int32_t numOfRes = tableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, binarySearchForKey, pDataBlock);
|
||||||
|
|
|
@ -679,7 +679,13 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
|
|
||||||
if (pCheckInfo->pDataCols == NULL) {
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
||||||
|
|
||||||
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
|
tsdbError("%p failed to malloc buf, %p", pQueryHandle, pQueryHandle->qinfo);
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return blockLoaded;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
|
STSchema* pSchema = tsdbGetTableSchema(pCheckInfo->pTableObj);
|
||||||
|
@ -745,7 +751,11 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot);
|
// return error, add test cases
|
||||||
|
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
|
@ -1714,9 +1724,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
|
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
|
||||||
STableCheckInfo info = {
|
STableCheckInfo info = {
|
||||||
.lastKey = pSecQueryHandle->window.skey,
|
.lastKey = pSecQueryHandle->window.skey,
|
||||||
//.tableId = pCheckInfo->tableId,
|
|
||||||
.pTableObj = pCheckInfo->pTableObj,
|
.pTableObj = pCheckInfo->pTableObj,
|
||||||
};
|
};
|
||||||
|
|
||||||
info.tableId = pCheckInfo->tableId;
|
info.tableId = pCheckInfo->tableId;
|
||||||
|
|
||||||
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
|
taosArrayPush(pSecQueryHandle->pTableCheckInfo, &info);
|
||||||
|
@ -1726,8 +1736,9 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
|
tsdbInitCompBlockLoadInfo(&pSecQueryHandle->compBlockLoadInfo);
|
||||||
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
|
pSecQueryHandle->defaultLoadColumn = taosArrayClone(pQueryHandle->defaultLoadColumn);
|
||||||
|
|
||||||
bool ret = tsdbNextDataBlock((void*) pSecQueryHandle);
|
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
|
||||||
assert(ret);
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
|
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
|
||||||
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
|
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
|
||||||
|
@ -1770,7 +1781,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
bool exists = true;
|
bool exists = true;
|
||||||
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (exists) {
|
if (exists) {
|
||||||
|
@ -2048,8 +2059,10 @@ SArray* tsdbRetrieveDataBlock(TsdbQueryHandleT* pQueryHandle, SArray* pIdList) {
|
||||||
return pHandle->pColumns;
|
return pHandle->pColumns;
|
||||||
} else { // only load the file block
|
} else { // only load the file block
|
||||||
SCompBlock* pBlock = pBlockInfo->compBlock;
|
SCompBlock* pBlock = pBlockInfo->compBlock;
|
||||||
doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot);
|
|
||||||
|
|
||||||
|
if (!doLoadFileDataBlock(pHandle, pBlock, pCheckInfo, pHandle->cur.slot)) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
// todo refactor
|
// todo refactor
|
||||||
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
int32_t numOfRows = copyDataFromFileBlock(pHandle, pHandle->outputCapacity, 0, 0, pBlock->numOfRows - 1);
|
||||||
|
|
||||||
|
|
|
@ -93,8 +93,11 @@ static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle,
|
||||||
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
|
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
SRetrieveTableRsp* pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
|
pRsp->completed = true;
|
||||||
|
|
||||||
|
pRet->rsp = pRsp;
|
||||||
*freeHandle = true;
|
*freeHandle = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,18 +203,18 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
|
||||||
pReadMsg->rpcMsg.handle);
|
pReadMsg->rpcMsg.handle);
|
||||||
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
|
|
||||||
|
|
||||||
// todo test the error code case
|
// set the real rsp error code
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
pReadMsg->rpcMsg.code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
|
||||||
code = TSDB_CODE_QRY_HAS_RSP;
|
|
||||||
}
|
// NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client
|
||||||
|
code = TSDB_CODE_QRY_HAS_RSP;
|
||||||
} else {
|
} else {
|
||||||
freehandle = qQueryCompleted(*qhandle);
|
freehandle = qQueryCompleted(*qhandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
|
||||||
// if not build result, free it not by forced.
|
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
|
||||||
if (freehandle || (!buildRes)) {
|
if (freehandle || (!buildRes)) {
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue