Merge pull request #27677 from taosdata/feat/queryTblNotExistAsEmpty
feat: add query table not exist as empty
This commit is contained in:
commit
a00a6a6a88
|
@ -158,6 +158,7 @@ extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row lo
|
|||
|
||||
// query client
|
||||
extern int32_t tsQueryPolicy;
|
||||
extern bool tsQueryTbNotExistAsEmpty;
|
||||
extern int32_t tsQueryRspPolicy;
|
||||
extern int64_t tsQueryMaxConcurrentTables;
|
||||
extern int32_t tsQuerySmaOptimize;
|
||||
|
|
|
@ -65,6 +65,8 @@ typedef struct SParseCsvCxt {
|
|||
const char* pLastSqlPos; // the location of the last parsed sql
|
||||
} SParseCsvCxt;
|
||||
|
||||
typedef void(*setQueryFn)(int64_t);
|
||||
|
||||
typedef struct SParseContext {
|
||||
uint64_t requestId;
|
||||
int64_t requestRid;
|
||||
|
@ -98,6 +100,7 @@ typedef struct SParseContext {
|
|||
void* parseSqlParam;
|
||||
int8_t biMode;
|
||||
SArray* pSubMetaList;
|
||||
setQueryFn setQueryFp;
|
||||
} SParseContext;
|
||||
|
||||
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
|
||||
|
|
|
@ -52,11 +52,11 @@ enum {
|
|||
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||
#define SHOW_VARIABLES_RESULT_FIELD3_LEN (TSDB_CONFIG_SCOPE_LEN + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)res == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)res == RES_TYPE__TMQ_BATCH_META)
|
||||
#define TD_RES_QUERY(res) (*(int8_t*)(res) == RES_TYPE__QUERY)
|
||||
#define TD_RES_TMQ(res) (*(int8_t*)(res) == RES_TYPE__TMQ)
|
||||
#define TD_RES_TMQ_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_META)
|
||||
#define TD_RES_TMQ_METADATA(res) (*(int8_t*)(res) == RES_TYPE__TMQ_METADATA)
|
||||
#define TD_RES_TMQ_BATCH_META(res) (*(int8_t*)(res) == RES_TYPE__TMQ_BATCH_META)
|
||||
|
||||
typedef struct SAppInstInfo SAppInstInfo;
|
||||
|
||||
|
@ -284,6 +284,7 @@ typedef struct SRequestObj {
|
|||
bool isSubReq;
|
||||
bool inCallback;
|
||||
bool isStmtBind; // is statement bind parameter
|
||||
bool isQuery;
|
||||
uint32_t prevCode; // previous error code: todo refactor, add update flag for catalog
|
||||
uint32_t retry;
|
||||
int64_t allocatorRefId;
|
||||
|
@ -420,6 +421,7 @@ typedef struct SSqlCallbackWrapper {
|
|||
void* pPlanInfo;
|
||||
} SSqlCallbackWrapper;
|
||||
|
||||
void setQueryRequest(int64_t rId);
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void** res);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||
void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta, SSqlCallbackWrapper* pWrapper);
|
||||
|
|
|
@ -31,6 +31,15 @@
|
|||
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
|
||||
static int32_t buildConnectMsg(SRequestObj* pRequest, SMsgSendInfo** pMsgSendInfo);
|
||||
|
||||
void setQueryRequest(int64_t rId) {
|
||||
SRequestObj* pReq = acquireRequest(rId);
|
||||
if (pReq != NULL) {
|
||||
pReq->isQuery = true;
|
||||
(void)releaseRequest(rId);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static bool stringLengthCheck(const char* str, size_t maxsize) {
|
||||
if (str == NULL) {
|
||||
return false;
|
||||
|
@ -286,7 +295,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
|
|||
.enableSysInfo = pTscObj->sysInfo,
|
||||
.svrVer = pTscObj->sVer,
|
||||
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
|
||||
.isStmtBind = pRequest->isStmtBind};
|
||||
.isStmtBind = pRequest->isStmtBind,
|
||||
.setQueryFp = setQueryRequest};
|
||||
|
||||
cxt.mgmtEpSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
int32_t code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &cxt.pCatalog);
|
||||
|
@ -2953,6 +2963,10 @@ void taosAsyncFetchImpl(SRequestObj* pRequest, __taos_async_fn_t fp, void* param
|
|||
void doRequestCallback(SRequestObj* pRequest, int32_t code) {
|
||||
pRequest->inCallback = true;
|
||||
int64_t this = pRequest->self;
|
||||
if (tsQueryTbNotExistAsEmpty && TD_RES_QUERY(&pRequest->resType) && pRequest->isQuery && (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_TDB_TABLE_NOT_EXIST)) {
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
pRequest->type = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
}
|
||||
pRequest->body.queryFp(((SSyncQueryParam*)pRequest->body.interParam)->userParam, pRequest, code);
|
||||
SRequestObj* pReq = acquireRequest(this);
|
||||
if (pReq != NULL) {
|
||||
|
|
|
@ -1235,7 +1235,8 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt, SS
|
|||
.nodeOffline = (pTscObj->pAppInfo->onlineDnodes < pTscObj->pAppInfo->totalDnodes),
|
||||
.allocatorId = pRequest->allocatorRefId,
|
||||
.parseSqlFp = clientParseSql,
|
||||
.parseSqlParam = pWrapper};
|
||||
.parseSqlParam = pWrapper,
|
||||
.setQueryFp = setQueryRequest};
|
||||
int8_t biMode = atomic_load_8(&((STscObj *)pTscObj)->biMode);
|
||||
(*pCxt)->biMode = biMode;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -1241,7 +1241,9 @@ int stmtBindBatch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int32_t colIdx) {
|
|||
.msgLen = ERROR_MSG_BUF_DEFAULT_SIZE,
|
||||
.pTransporter = pStmt->taos->pAppInfo->pTransporter,
|
||||
.pStmtCb = NULL,
|
||||
.pUser = pStmt->taos->user};
|
||||
.pUser = pStmt->taos->user,
|
||||
.setQueryFp = setQueryRequest};
|
||||
|
||||
ctx.mgmtEpSet = getEpSet_s(&pStmt->taos->pAppInfo->mgmtEp);
|
||||
STMT_ERR_RET(catalogGetHandle(pStmt->taos->pAppInfo->clusterId, &ctx.pCatalog));
|
||||
|
||||
|
|
|
@ -162,6 +162,7 @@ int32_t tmqMaxTopicNum = 20;
|
|||
int32_t tmqRowSize = 4096;
|
||||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
bool tsQueryTbNotExistAsEmpty = false;
|
||||
int32_t tsQueryRspPolicy = 0;
|
||||
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
|
||||
bool tsEnableQueryHb = true;
|
||||
|
@ -569,6 +570,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(
|
||||
cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, CFG_SCOPE_BOTH, CFG_DYN_CLIENT));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 4, CFG_SCOPE_CLIENT, CFG_DYN_ENT_CLIENT));
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryTableNotExistAsEmpty", tsQueryTbNotExistAsEmpty, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableQueryHb", tsEnableQueryHb, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableScience", tsEnableScience, CFG_SCOPE_CLIENT, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT));
|
||||
|
@ -1181,6 +1183,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryPolicy");
|
||||
tsQueryPolicy = pItem->i32;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryTableNotExistAsEmpty");
|
||||
tsQueryTbNotExistAsEmpty = pItem->bval;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableQueryHb");
|
||||
tsEnableQueryHb = pItem->bval;
|
||||
|
||||
|
@ -2218,6 +2223,7 @@ static int32_t taosCfgDynamicOptionsForClient(SConfig *pCfg, const char *name) {
|
|||
{"numOfLogLines", &tsNumOfLogLines},
|
||||
{"querySmaOptimize", &tsQuerySmaOptimize},
|
||||
{"queryPolicy", &tsQueryPolicy},
|
||||
{"queryTableNotExistAsEmpty", &tsQueryTbNotExistAsEmpty},
|
||||
{"queryPlannerTrace", &tsQueryPlannerTrace},
|
||||
{"queryNodeChunkSize", &tsQueryNodeChunkSize},
|
||||
{"queryUseNodeAllocator", &tsQueryUseNodeAllocator},
|
||||
|
|
|
@ -4681,7 +4681,8 @@ int32_t translateTable(STranslateContext* pCxt, SNode** pTable, SNode* pJoinPare
|
|||
pCxt, toName(pCxt->pParseCxt->acctId, pRealTable->table.dbName, pRealTable->table.tableName, &name),
|
||||
&(pRealTable->pMeta), true);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
|
||||
(void)generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GET_META_ERROR, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
#ifdef TD_ENTERPRISE
|
||||
if (TSDB_VIEW_TABLE == pRealTable->pMeta->tableType && (!pCurrSmt->tagScan || pCxt->pParseCxt->biMode)) {
|
||||
|
@ -6783,6 +6784,10 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
|
|||
}
|
||||
|
||||
static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||
if (pCxt->pParseCxt && pCxt->pParseCxt->setQueryFp) {
|
||||
(*pCxt->pParseCxt->setQueryFp)(pCxt->pParseCxt->requestRid);
|
||||
}
|
||||
|
||||
if (NULL == pSelect->pFromTable) {
|
||||
return translateSelectWithoutFrom(pCxt, pSelect);
|
||||
} else {
|
||||
|
@ -6907,6 +6912,10 @@ static int32_t checkSetOperLimit(STranslateContext* pCxt, SLimitNode* pLimit) {
|
|||
}
|
||||
|
||||
static int32_t translateSetOperator(STranslateContext* pCxt, SSetOperator* pSetOperator) {
|
||||
if (pCxt->pParseCxt && pCxt->pParseCxt->setQueryFp) {
|
||||
(*pCxt->pParseCxt->setQueryFp)(pCxt->pParseCxt->requestRid);
|
||||
}
|
||||
|
||||
int32_t code = translateQuery(pCxt, pSetOperator->pLeft);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = resetHighLevelTranslateNamespace(pCxt);
|
||||
|
|
|
@ -366,9 +366,9 @@ TEST_F(ParserSelectTest, semanticCheck) {
|
|||
run("SELECT t1.c1, t1.cc1 FROM t1", TSDB_CODE_PAR_INVALID_COLUMN);
|
||||
|
||||
// TSDB_CODE_PAR_GET_META_ERROR
|
||||
run("SELECT * FROM t10", TSDB_CODE_PAR_GET_META_ERROR);
|
||||
run("SELECT * FROM t10", TSDB_CODE_PAR_TABLE_NOT_EXIST);
|
||||
|
||||
run("SELECT * FROM test.t10", TSDB_CODE_PAR_GET_META_ERROR);
|
||||
run("SELECT * FROM test.t10", TSDB_CODE_PAR_TABLE_NOT_EXIST);
|
||||
|
||||
// TSDB_CODE_PAR_TABLE_NOT_EXIST
|
||||
run("SELECT t2.c1 FROM t1", TSDB_CODE_PAR_TABLE_NOT_EXIST);
|
||||
|
|
Loading…
Reference in New Issue