enh: insert parser refactor
This commit is contained in:
parent
2aed458bf7
commit
292f0c6db7
|
@ -46,7 +46,6 @@ extern "C" {
|
|||
|
||||
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
|
||||
#define HEARTBEAT_INTERVAL 1500 // ms
|
||||
#define SYNC_ON_TOP_OF_ASYNC 1
|
||||
|
||||
enum {
|
||||
RES_TYPE__QUERY = 1,
|
||||
|
@ -271,9 +270,8 @@ void doFreeReqResultInfo(SReqResultInfo* pResInfo);
|
|||
int32_t transferTableNameList(const char* tbList, int32_t acctId, char* dbName, SArray** pReq);
|
||||
void syncCatalogFn(SMetaData* pResult, void* param, int32_t code);
|
||||
|
||||
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly);
|
||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||
TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly);
|
||||
void taosAsyncQueryImpl(uint64_t connId, const char* sql, __taos_async_fn_t fp, void* param, bool validateOnly);
|
||||
|
||||
int32_t getVersion1BlockMetaSize(const char* p, int32_t numOfCols);
|
||||
|
||||
|
@ -349,8 +347,6 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet);
|
|||
STscObj* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||
uint16_t port, int connType);
|
||||
|
||||
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly, bool inRetry);
|
||||
|
||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||
|
||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||
|
|
|
@ -1013,28 +1013,6 @@ SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQue
|
|||
return pRequest;
|
||||
}
|
||||
|
||||
SRequestObj* launchQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly, bool inRetry) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
SQuery* pQuery = NULL;
|
||||
|
||||
int32_t code = buildRequest(connId, sql, sqlLen, NULL, validateOnly, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
code = parseSql(pRequest, false, &pQuery, NULL);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pRequest->code = code;
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
pRequest->inRetry = inRetry;
|
||||
pRequest->stableQuery = pQuery->stableQuery;
|
||||
|
||||
return launchQueryImpl(pRequest, pQuery, false, NULL);
|
||||
}
|
||||
|
||||
static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultMeta,
|
||||
SSqlCallbackWrapper* pWrapper) {
|
||||
pRequest->type = pQuery->msgType;
|
||||
|
@ -1197,32 +1175,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// todo remove it soon
|
||||
SRequestObj* execQuery(uint64_t connId, const char* sql, int sqlLen, bool validateOnly) {
|
||||
SRequestObj* pRequest = NULL;
|
||||
int32_t retryNum = 0;
|
||||
int32_t code = 0;
|
||||
bool inRetry = false;
|
||||
|
||||
do {
|
||||
destroyRequest(pRequest);
|
||||
pRequest = launchQuery(connId, sql, sqlLen, validateOnly, inRetry);
|
||||
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
||||
break;
|
||||
}
|
||||
|
||||
code = refreshMeta(pRequest->pTscObj, pRequest);
|
||||
if (code) {
|
||||
pRequest->code = code;
|
||||
break;
|
||||
}
|
||||
|
||||
inRetry = true;
|
||||
} while (retryNum++ < REQUEST_TOTAL_EXEC_TIMES);
|
||||
|
||||
return pRequest;
|
||||
}
|
||||
|
||||
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
|
||||
pEpSet->version = 0;
|
||||
|
||||
|
@ -2291,7 +2243,6 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
SSyncQueryParam* param = taosMemoryCalloc(1, sizeof(SSyncQueryParam));
|
||||
tsem_init(¶m->sem, 0, 0);
|
||||
|
||||
|
@ -2301,15 +2252,4 @@ TAOS_RES* taosQueryImpl(TAOS* taos, const char* sql, bool validateOnly) {
|
|||
param->pRequest->syncQuery = true;
|
||||
}
|
||||
return param->pRequest;
|
||||
#else
|
||||
size_t sqlLen = strlen(sql);
|
||||
if (sqlLen > (size_t)TSDB_MAX_ALLOWED_SQL_LEN) {
|
||||
tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN);
|
||||
terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = execQuery(*(int64_t*)taos, sql, sqlLen, validateOnly);
|
||||
return pRes;
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -261,12 +261,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
return doAsyncFetchRows(pRequest, true, true);
|
||||
#else
|
||||
return doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
} else if (TD_RES_TMQ(res) || TD_RES_TMQ_METADATA(res)) {
|
||||
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||
SReqResultInfo *pResultInfo;
|
||||
|
@ -549,11 +544,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
doAsyncFetchRows(pRequest, false, true);
|
||||
#else
|
||||
doFetchRows(pRequest, true, true);
|
||||
#endif
|
||||
|
||||
// TODO refactor
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
@ -601,11 +592,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#if SYNC_ON_TOP_OF_ASYNC
|
||||
doAsyncFetchRows(pRequest, false, false);
|
||||
#else
|
||||
doFetchRows(pRequest, false, false);
|
||||
#endif
|
||||
|
||||
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
|
||||
|
||||
|
@ -989,7 +976,7 @@ const void *taos_get_raw_block(TAOS_RES *res) {
|
|||
return pRequest->body.resInfo.pData;
|
||||
}
|
||||
|
||||
int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInfo) {
|
||||
int taos_get_db_route_info(TAOS *taos, const char *db, TAOS_DB_ROUTE_INFO *dbInfo) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return terrno;
|
||||
|
@ -1001,16 +988,16 @@ int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInf
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_db_route_info";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_db_route_info";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SCatalog *pCtg = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -1024,7 +1011,7 @@ int taos_get_db_route_info(TAOS* taos, const char* db, TAOS_DB_ROUTE_INFO* dbInf
|
|||
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
snprintf(dbFName, sizeof(dbFName), "%d.%s", pTscObj->acctId, db);
|
||||
|
||||
|
||||
code = catalogGetDBVgInfo(pCtg, &conn, dbFName, dbInfo);
|
||||
if (code) {
|
||||
goto _return;
|
||||
|
@ -1038,7 +1025,7 @@ _return:
|
|||
return code;
|
||||
}
|
||||
|
||||
int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId) {
|
||||
int taos_get_table_vgId(TAOS *taos, const char *db, const char *table, int *vgId) {
|
||||
if (NULL == taos) {
|
||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||
return terrno;
|
||||
|
@ -1050,15 +1037,15 @@ int taos_get_table_vgId(TAOS* taos, const char* db, const char* table, int* vgId
|
|||
return terrno;
|
||||
}
|
||||
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_table_vgId";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
int64_t connId = *(int64_t *)taos;
|
||||
SRequestObj *pRequest = NULL;
|
||||
char *sql = "taos_get_table_vgId";
|
||||
int32_t code = buildRequest(connId, sql, strlen(sql), NULL, false, &pRequest);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
STscObj *pTscObj = pRequest->pTscObj;
|
||||
SCatalog *pCtg = NULL;
|
||||
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCtg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
|
|
Loading…
Reference in New Issue