Merge pull request #14832 from taosdata/feat/refactorClientQ
enh: refactor client queue
This commit is contained in:
commit
aadd4192dd
|
@ -325,7 +325,8 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery, SStmtC
|
||||||
|
|
||||||
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
|
||||||
|
|
||||||
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql, SRequestObj** pRequest);
|
int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param, bool validateSql,
|
||||||
|
SRequestObj** pRequest);
|
||||||
|
|
||||||
void taos_close_internal(void* taos);
|
void taos_close_internal(void* taos);
|
||||||
|
|
||||||
|
@ -359,9 +360,6 @@ int32_t removeMeta(STscObj* pTscObj, SArray* tbList); // todo move to clie
|
||||||
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
|
int32_t handleAlterTbExecRes(void* res, struct SCatalog* pCatalog); // todo move to xxx
|
||||||
bool qnodeRequired(SRequestObj* pRequest);
|
bool qnodeRequired(SRequestObj* pRequest);
|
||||||
|
|
||||||
void initTscQhandle();
|
|
||||||
void cleanupTscQhandle();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -35,22 +35,10 @@ SAppInfo appInfo;
|
||||||
int32_t clientReqRefPool = -1;
|
int32_t clientReqRefPool = -1;
|
||||||
int32_t clientConnRefPool = -1;
|
int32_t clientConnRefPool = -1;
|
||||||
|
|
||||||
void *tscQhandle = NULL;
|
|
||||||
|
|
||||||
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
|
||||||
volatile int32_t tscInitRes = 0;
|
volatile int32_t tscInitRes = 0;
|
||||||
|
|
||||||
void initTscQhandle() {
|
static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
|
||||||
// init handle
|
|
||||||
tscQhandle = taosInitScheduler(4096, 5, "tsc");
|
|
||||||
}
|
|
||||||
|
|
||||||
void cleanupTscQhandle() {
|
|
||||||
// destroy handle
|
|
||||||
taosCleanUpScheduler(tscQhandle);
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
|
|
||||||
// connection has been released already, abort creating request.
|
// connection has been released already, abort creating request.
|
||||||
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
|
pRequest->self = taosAddRef(clientReqRefPool, pRequest);
|
||||||
|
|
||||||
|
@ -72,7 +60,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj* pTscObj) {
|
||||||
static void deregisterRequest(SRequestObj *pRequest) {
|
static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
assert(pRequest != NULL);
|
assert(pRequest != NULL);
|
||||||
|
|
||||||
STscObj * pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
SAppClusterSummary *pActivity = &pTscObj->pAppInfo->summary;
|
||||||
|
|
||||||
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
|
||||||
|
@ -97,7 +85,8 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
|
||||||
|
|
||||||
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (NEED_REDIRECT_ERROR(code)) {
|
if (NEED_REDIRECT_ERROR(code)) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH ||
|
||||||
|
msgType == TDMT_SCH_MERGE_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
@ -251,7 +240,7 @@ void *createRequest(uint64_t connId, int32_t type) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj* pTscObj = acquireTscObj(connId);
|
STscObj *pTscObj = acquireTscObj(connId);
|
||||||
if (pTscObj == NULL) {
|
if (pTscObj == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -348,7 +337,6 @@ void taos_init_imp(void) {
|
||||||
// In the APIs of other program language, taos_cleanup is not available yet.
|
// In the APIs of other program language, taos_cleanup is not available yet.
|
||||||
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
|
// So, to make sure taos_cleanup will be invoked to clean up the allocated resource to suppress the valgrind warning.
|
||||||
atexit(taos_cleanup);
|
atexit(taos_cleanup);
|
||||||
initTscQhandle();
|
|
||||||
errno = TSDB_CODE_SUCCESS;
|
errno = TSDB_CODE_SUCCESS;
|
||||||
taosSeedRand(taosGetTimestampSec());
|
taosSeedRand(taosGetTimestampSec());
|
||||||
|
|
||||||
|
@ -407,7 +395,7 @@ int taos_options_imp(TSDB_OPTION option, const char *str) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SConfig * pCfg = taosGetCfg();
|
SConfig *pCfg = taosGetCfg();
|
||||||
SConfigItem *pItem = NULL;
|
SConfigItem *pItem = NULL;
|
||||||
|
|
||||||
switch (option) {
|
switch (option) {
|
||||||
|
|
|
@ -1274,8 +1274,8 @@ typedef struct SchedArg {
|
||||||
SEpSet* pEpset;
|
SEpSet* pEpset;
|
||||||
} SchedArg;
|
} SchedArg;
|
||||||
|
|
||||||
void doProcessMsgFromServer(SSchedMsg* schedMsg) {
|
int32_t doProcessMsgFromServer(void* param) {
|
||||||
SchedArg* arg = (SchedArg*)schedMsg->ahandle;
|
SchedArg* arg = (SchedArg*)param;
|
||||||
SRpcMsg* pMsg = &arg->msg;
|
SRpcMsg* pMsg = &arg->msg;
|
||||||
SEpSet* pEpSet = arg->pEpset;
|
SEpSet* pEpSet = arg->pEpset;
|
||||||
|
|
||||||
|
@ -1328,11 +1328,10 @@ void doProcessMsgFromServer(SSchedMsg* schedMsg) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
destroySendMsgInfo(pSendInfo);
|
destroySendMsgInfo(pSendInfo);
|
||||||
taosMemoryFree(arg);
|
taosMemoryFree(arg);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
SSchedMsg schedMsg = {0};
|
|
||||||
|
|
||||||
SEpSet* tEpSet = NULL;
|
SEpSet* tEpSet = NULL;
|
||||||
if (pEpSet != NULL) {
|
if (pEpSet != NULL) {
|
||||||
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
|
tEpSet = taosMemoryCalloc(1, sizeof(SEpSet));
|
||||||
|
@ -1343,9 +1342,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
arg->msg = *pMsg;
|
arg->msg = *pMsg;
|
||||||
arg->pEpset = tEpSet;
|
arg->pEpset = tEpSet;
|
||||||
|
|
||||||
schedMsg.fp = doProcessMsgFromServer;
|
taosAsyncExec(doProcessMsgFromServer, arg, NULL);
|
||||||
schedMsg.ahandle = arg;
|
|
||||||
taosScheduleTask(tscQhandle, &schedMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
|
TAOS* taos_connect_auth(const char* ip, const char* user, const char* auth, const char* db, uint16_t port) {
|
||||||
|
|
|
@ -72,7 +72,6 @@ void taos_cleanup(void) {
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
schedulerDestroy();
|
schedulerDestroy();
|
||||||
|
|
||||||
cleanupTscQhandle();
|
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
tscInfo("all local resources released");
|
tscInfo("all local resources released");
|
||||||
taosCleanupCfg();
|
taosCleanupCfg();
|
||||||
|
@ -242,7 +241,7 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
} else if (TD_RES_TMQ(res)) {
|
} else if (TD_RES_TMQ(res)) {
|
||||||
SMqRspObj * msg = ((SMqRspObj *)res);
|
SMqRspObj *msg = ((SMqRspObj *)res);
|
||||||
SReqResultInfo *pResultInfo;
|
SReqResultInfo *pResultInfo;
|
||||||
if (msg->resIter == -1) {
|
if (msg->resIter == -1) {
|
||||||
pResultInfo = tmqGetNextResInfo(res, true);
|
pResultInfo = tmqGetNextResInfo(res, true);
|
||||||
|
@ -418,7 +417,7 @@ int taos_affected_rows(TAOS_RES *res) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRequestObj * pRequest = (SRequestObj *)res;
|
SRequestObj *pRequest = (SRequestObj *)res;
|
||||||
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
|
||||||
return pResInfo->numOfRows;
|
return pResInfo->numOfRows;
|
||||||
}
|
}
|
||||||
|
@ -601,7 +600,7 @@ int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
|
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
|
||||||
TAOS_FIELD * pField = &pResInfo->userFields[columnIndex];
|
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
|
||||||
if (!IS_VAR_DATA_TYPE(pField->type)) {
|
if (!IS_VAR_DATA_TYPE(pField->type)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -645,8 +644,8 @@ const char *taos_get_server_info(TAOS *taos) {
|
||||||
typedef struct SqlParseWrapper {
|
typedef struct SqlParseWrapper {
|
||||||
SParseContext *pCtx;
|
SParseContext *pCtx;
|
||||||
SCatalogReq catalogReq;
|
SCatalogReq catalogReq;
|
||||||
SRequestObj * pRequest;
|
SRequestObj *pRequest;
|
||||||
SQuery * pQuery;
|
SQuery *pQuery;
|
||||||
} SqlParseWrapper;
|
} SqlParseWrapper;
|
||||||
|
|
||||||
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
||||||
|
@ -665,8 +664,8 @@ static void destorySqlParseWrapper(SqlParseWrapper *pWrapper) {
|
||||||
|
|
||||||
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
SqlParseWrapper *pWrapper = (SqlParseWrapper *)param;
|
||||||
SQuery * pQuery = pWrapper->pQuery;
|
SQuery *pQuery = pWrapper->pQuery;
|
||||||
SRequestObj * pRequest = pWrapper->pRequest;
|
SRequestObj *pRequest = pWrapper->pRequest;
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
|
code = qAnalyseSqlSemantic(pWrapper->pCtx, &pWrapper->catalogReq, pResultMeta, pQuery);
|
||||||
|
@ -684,7 +683,8 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
|
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
|
|
||||||
tscDebug("0x%"PRIx64" analysis semantics completed, start async query, reqId:0x%"PRIx64, pRequest->self, pRequest->requestId);
|
tscDebug("0x%" PRIx64 " analysis semantics completed, start async query, reqId:0x%" PRIx64, pRequest->self,
|
||||||
|
pRequest->requestId);
|
||||||
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
launchAsyncQuery(pRequest, pQuery, pResultMeta);
|
||||||
} else {
|
} else {
|
||||||
destorySqlParseWrapper(pWrapper);
|
destorySqlParseWrapper(pWrapper);
|
||||||
|
@ -705,7 +705,7 @@ void retrieveMetaCallback(SMetaData *pResultMeta, void *param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param) {
|
||||||
int64_t connId = *(int64_t*)taos;
|
int64_t connId = *(int64_t *)taos;
|
||||||
taosAsyncQueryImpl(connId, sql, fp, param, false);
|
taosAsyncQueryImpl(connId, sql, fp, param, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -739,7 +739,7 @@ int32_t createParseContext(const SRequestObj *pRequest, SParseContext **pCxt) {
|
||||||
|
|
||||||
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
SParseContext *pCxt = NULL;
|
SParseContext *pCxt = NULL;
|
||||||
STscObj * pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
if (pRequest->retry++ > REQUEST_TOTAL_EXEC_TIMES) {
|
||||||
|
@ -928,7 +928,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
int64_t connId = *(int64_t *)taos;
|
int64_t connId = *(int64_t *)taos;
|
||||||
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
const int32_t MAX_TABLE_NAME_LENGTH = 12 * 1024 * 1024; // 12MB list
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRequestObj * pRequest = NULL;
|
SRequestObj *pRequest = NULL;
|
||||||
SCatalogReq catalogReq = {0};
|
SCatalogReq catalogReq = {0};
|
||||||
|
|
||||||
if (NULL == tableNameList) {
|
if (NULL == tableNameList) {
|
||||||
|
@ -950,7 +950,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj *pTscObj = pRequest->pTscObj;
|
||||||
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
|
code = transferTableNameList(tableNameList, pTscObj->acctId, pTscObj->db, &catalogReq.pTableMeta);
|
||||||
if (code) {
|
if (code) {
|
||||||
goto _return;
|
goto _return;
|
||||||
|
@ -972,7 +972,7 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncQueryParam* pParam = pRequest->body.param;
|
SSyncQueryParam *pParam = pRequest->body.param;
|
||||||
tsem_wait(&pParam->sem);
|
tsem_wait(&pParam->sem);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
|
@ -282,7 +282,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
*str = '"';
|
*str = '"';
|
||||||
int32_t length = taosUcs4ToMbs((TdUcs4 *)buf, bufSize, str + 1);
|
int32_t length = taosUcs4ToMbs((TdUcs4*)buf, bufSize, str + 1);
|
||||||
if (length <= 0) {
|
if (length <= 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
@ -310,7 +310,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(len) *len = n;
|
if (len) *len = n;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -390,7 +390,7 @@ char* parseTagDatatoJson(void* p) {
|
||||||
end:
|
end:
|
||||||
cJSON_Delete(json);
|
cJSON_Delete(json);
|
||||||
taosArrayDestroy(pTagVals);
|
taosArrayDestroy(pTagVals);
|
||||||
if(string == NULL){
|
if (string == NULL) {
|
||||||
string = strdup(TSDB_DATA_NULL_STR_L);
|
string = strdup(TSDB_DATA_NULL_STR_L);
|
||||||
}
|
}
|
||||||
return string;
|
return string;
|
||||||
|
|
Loading…
Reference in New Issue