refactor(query): do some internal refactor.
This commit is contained in:
parent
e673041127
commit
95eb0c9f65
|
@ -122,6 +122,24 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList);
|
int32_t catalogGetDBVgInfo(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, SArray** pVgroupList);
|
||||||
|
|
||||||
|
typedef void *__async_cb_fn_t(void* pResult, void* param, int32_t code);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SCatalog* pCatalog;
|
||||||
|
void* pTransporter;
|
||||||
|
SEpSet* pMgmtEps;
|
||||||
|
char* pDbname;
|
||||||
|
} CatalogParamWrapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param pCatalogWrapper
|
||||||
|
* @param fp
|
||||||
|
* @param param
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t catalogGetDBVgInfo_a(CatalogParamWrapper* pCatalogWrapper, __async_cb_fn_t fp, void* param);
|
||||||
|
|
||||||
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
|
int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgInfo* dbInfo);
|
||||||
|
|
||||||
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
||||||
|
|
|
@ -121,7 +121,7 @@ struct SAppInstInfo {
|
||||||
SCorEpSet mgmtEp;
|
SCorEpSet mgmtEp;
|
||||||
SInstanceSummary summary;
|
SInstanceSummary summary;
|
||||||
SList* pConnList; // STscObj linked list
|
SList* pConnList; // STscObj linked list
|
||||||
int64_t clusterId;
|
uint64_t clusterId;
|
||||||
void* pTransporter;
|
void* pTransporter;
|
||||||
SAppHbMgr* pAppHbMgr;
|
SAppHbMgr* pAppHbMgr;
|
||||||
};
|
};
|
||||||
|
@ -282,6 +282,8 @@ void initMsgHandleFp();
|
||||||
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
|
||||||
uint16_t port, int connType);
|
uint16_t port, int connType);
|
||||||
|
|
||||||
|
SRequestObj* launchQuery(STscObj* pTscObj, const char* sql, int sqlLen);
|
||||||
|
|
||||||
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtCallback* pStmtCb);
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||||
|
|
|
@ -572,8 +572,10 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
|
||||||
int32_t retryNum = 0;
|
int32_t retryNum = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
size_t sqlLen = strlen(sql);
|
||||||
|
|
||||||
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
|
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
|
||||||
// pRequest = launchQuery(pTscObj, sql, sqlLen);
|
pRequest = launchQuery(taos, sql, sqlLen);
|
||||||
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
if (pRequest == NULL || TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -125,10 +125,10 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
|
||||||
if (usedbRsp.vgVersion >= 0) {
|
if (usedbRsp.vgVersion >= 0) {
|
||||||
int32_t code1 = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
uint64_t clusterId = pRequest->pTscObj->pAppInfo->clusterId;
|
||||||
|
int32_t code1 = catalogGetHandle(clusterId, &pCatalog);
|
||||||
if (code1 != TSDB_CODE_SUCCESS) {
|
if (code1 != TSDB_CODE_SUCCESS) {
|
||||||
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->pTscObj->pAppInfo->clusterId,
|
tscWarn("0x%" PRIx64 "catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pRequest->requestId, clusterId, tstrerror(code1));
|
||||||
tstrerror(code1));
|
|
||||||
} else {
|
} else {
|
||||||
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
|
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
|
||||||
}
|
}
|
||||||
|
@ -158,7 +158,7 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
|
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
|
||||||
taosMemoryFreeClear(output.dbVgroup);
|
taosMemoryFreeClear(output.dbVgroup);
|
||||||
|
|
||||||
tscError("failed to build use db output since %s", terrstr());
|
tscError("0x%" PRIx64" failed to build use db output since %s", pRequest->requestId, terrstr());
|
||||||
} else if (output.dbVgroup) {
|
} else if (output.dbVgroup) {
|
||||||
struct SCatalog* pCatalog = NULL;
|
struct SCatalog* pCatalog = NULL;
|
||||||
|
|
||||||
|
|
|
@ -563,6 +563,10 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//typedef void __taos_async_internal_fn_t(void* param, )
|
||||||
|
void ctgGetDBVgInfoFromMnode_a(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, __taos_async_fn_t fp) {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, SBuildUseDBInput *input, SUseDbOutput *out) {
|
||||||
char *msg = NULL;
|
char *msg = NULL;
|
||||||
|
@ -1768,7 +1772,7 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ctgGetDBVgInfo_a();
|
||||||
|
|
||||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
|
|
Loading…
Reference in New Issue