refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2022-06-02 23:33:38 +08:00
parent 6b49352a81
commit ad5be523e6
7 changed files with 36 additions and 54 deletions

View File

@ -46,7 +46,7 @@ extern "C" {
#define ERROR_MSG_BUF_DEFAULT_SIZE 512 #define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms #define HEARTBEAT_INTERVAL 1500 // ms
#define SYNC_ON_TOP_OF_ASYNC 0 #define SYNC_ON_TOP_OF_ASYNC 1
enum { enum {
RES_TYPE__QUERY = 1, RES_TYPE__QUERY = 1,

View File

@ -706,18 +706,8 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
pWrapper->pRequest = pRequest; pWrapper->pRequest = pRequest;
pWrapper->catalogReq = catalogReq; pWrapper->catalogReq = catalogReq;
//todo refactor move to asyncGetAllMeta function code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId,
bool allNull = (catalogReq.pUdf == NULL && catalogReq.pUser == NULL && catalogReq.pDbCfg == NULL &&
catalogReq.pIndex == NULL && catalogReq.pDbInfo == NULL && catalogReq.pDbVgroup == NULL &&
catalogReq.pTableHash == NULL && catalogReq.pTableMeta == NULL && catalogReq.qNodeRequired == false);
if (allNull) {
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
retrieveMetaCallback(pMetaData, pWrapper, TSDB_CODE_SUCCESS);
} else {
code = catalogAsyncGetAllMeta(pCxt->pCatalog, pCxt->pTransporter, &pCxt->mgmtEpSet, pRequest->requestId,
&catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob); &catalogReq, retrieveMetaCallback, pWrapper, &pRequest->body.queryJob);
}
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;

View File

@ -104,7 +104,7 @@ TEST(testCase, connect_Test) {
} }
taos_close(pConn); taos_close(pConn);
} }
#if 0
TEST(testCase, create_user_Test) { TEST(testCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
@ -773,6 +773,7 @@ TEST(testCase, agg_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
} }
#endif
/* /*
--- copy the following script in the shell to setup the environment --- --- copy the following script in the shell to setup the environment ---
@ -789,7 +790,7 @@ TEST(testCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr); ASSERT_NE(pConn, nullptr);
taos_query_a(pConn, "select ts from test.m1", queryCallback, pConn); taos_query_a(pConn, "drop table test.tm0", queryCallback, pConn);
getchar(); getchar();
taos_close(pConn); taos_close(pConn);
} }

View File

@ -490,7 +490,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STabl
int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnode(CTG_PARAMS, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask);
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param); int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum);
int32_t ctgLaunchJob(SCtgJob *pJob); int32_t ctgLaunchJob(SCtgJob *pJob);
int32_t ctgMakeAsyncRes(SCtgJob *pJob); int32_t ctgMakeAsyncRes(SCtgJob *pJob);

View File

@ -1028,9 +1028,14 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmt
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
int32_t code = 0; int32_t code = 0, taskNum = 0;
SCtgJob *pJob = NULL; SCtgJob *pJob = NULL;
CTG_ERR_JRET(ctgInitJob(CTG_PARAMS_LIST(), &pJob, reqId, pReq, fp, param)); CTG_ERR_JRET(ctgInitJob(CTG_PARAMS_LIST(), &pJob, reqId, pReq, fp, param, &taskNum));
if (taskNum <= 0) {
SMetaData* pMetaData = taosMemoryCalloc(1, sizeof(SMetaData));
fp(pMetaData, param, TSDB_CODE_SUCCESS);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
CTG_ERR_JRET(ctgLaunchJob(pJob)); CTG_ERR_JRET(ctgLaunchJob(pJob));

View File

@ -233,7 +233,7 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
} }
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param) { int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
int32_t code = 0; int32_t code = 0;
int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta); int32_t tbMetaNum = (int32_t)taosArrayGetSize(pReq->pTableMeta);
int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup); int32_t dbVgNum = (int32_t)taosArrayGetSize(pReq->pDbVgroup);
@ -245,15 +245,15 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser); int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo); int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum; *taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum;
if (taskNum <= 0) { if (*taskNum <= 0) {
ctgError("empty input for job, taskNum:%d", taskNum); ctgError("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); return TSDB_CODE_SUCCESS;
} }
*job = taosMemoryCalloc(1, sizeof(SCtgJob)); *job = taosMemoryCalloc(1, sizeof(SCtgJob));
if (NULL == *job) { if (NULL == *job) {
ctgError("calloc %d failed", (int32_t)sizeof(SCtgJob)); ctgError("failed to calloc, size:%d, reqId:0x%" PRIx64, (int32_t)sizeof(SCtgJob), reqId);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -275,52 +275,52 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
pJob->indexNum = indexNum; pJob->indexNum = indexNum;
pJob->userNum = userNum; pJob->userNum = userNum;
pJob->dbInfoNum = dbInfoNum; pJob->dbInfoNum = dbInfoNum;
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask)); pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
if (NULL == pJob->pTasks) { if (NULL == pJob->pTasks) {
ctgError("taosArrayInit %d tasks failed", taskNum); ctgError("taosArrayInit %d tasks failed", *taskNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
int32_t taskIdx = 0; int32_t taskIdx = 0;
for (int32_t i = 0; i < dbVgNum; ++i) { for (int32_t i = 0; i < dbVgNum; ++i) {
char *dbFName = taosArrayGet(pReq->pDbVgroup, i); char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
} }
for (int32_t i = 0; i < dbCfgNum; ++i) { for (int32_t i = 0; i < dbCfgNum; ++i) {
char *dbFName = taosArrayGet(pReq->pDbCfg, i); char* dbFName = taosArrayGet(pReq->pDbCfg, i);
CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName)); CTG_ERR_JRET(ctgInitGetDbCfgTask(pJob, taskIdx++, dbFName));
} }
for (int32_t i = 0; i < dbInfoNum; ++i) { for (int32_t i = 0; i < dbInfoNum; ++i) {
char *dbFName = taosArrayGet(pReq->pDbInfo, i); char* dbFName = taosArrayGet(pReq->pDbInfo, i);
CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName)); CTG_ERR_JRET(ctgInitGetDbInfoTask(pJob, taskIdx++, dbFName));
} }
for (int32_t i = 0; i < tbMetaNum; ++i) { for (int32_t i = 0; i < tbMetaNum; ++i) {
SName *name = taosArrayGet(pReq->pTableMeta, i); SName* name = taosArrayGet(pReq->pTableMeta, i);
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
} }
for (int32_t i = 0; i < tbHashNum; ++i) { for (int32_t i = 0; i < tbHashNum; ++i) {
SName *name = taosArrayGet(pReq->pTableHash, i); SName* name = taosArrayGet(pReq->pTableHash, i);
CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name)); CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name));
} }
for (int32_t i = 0; i < indexNum; ++i) { for (int32_t i = 0; i < indexNum; ++i) {
char *indexName = taosArrayGet(pReq->pIndex, i); char* indexName = taosArrayGet(pReq->pIndex, i);
CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName)); CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName));
} }
for (int32_t i = 0; i < udfNum; ++i) { for (int32_t i = 0; i < udfNum; ++i) {
char *udfName = taosArrayGet(pReq->pUdf, i); char* udfName = taosArrayGet(pReq->pUdf, i);
CTG_ERR_JRET(ctgInitGetUdfTask(pJob, taskIdx++, udfName)); CTG_ERR_JRET(ctgInitGetUdfTask(pJob, taskIdx++, udfName));
} }
for (int32_t i = 0; i < userNum; ++i) { for (int32_t i = 0; i < userNum; ++i) {
SUserAuthInfo *user = taosArrayGet(pReq->pUser, i); SUserAuthInfo* user = taosArrayGet(pReq->pUser, i);
CTG_ERR_JRET(ctgInitGetUserTask(pJob, taskIdx++, user)); CTG_ERR_JRET(ctgInitGetUserTask(pJob, taskIdx++, user));
} }
@ -328,22 +328,8 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++)); CTG_ERR_JRET(ctgInitGetQnodeTask(pJob, taskIdx++));
} }
pJob->refId = taosAddRef(gCtgMgmt.jobPool, pJob);
if (pJob->refId < 0) {
ctgError("add job to ref failed, error: %s", tstrerror(terrno));
CTG_ERR_JRET(terrno);
}
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
qDebug("QID:%" PRIx64 ", job %" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, taskNum);
return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFreeClear(*job); taosMemoryFreeClear(*job);
CTG_RET(code); CTG_RET(code);
} }

View File

@ -1,7 +1,7 @@
#run tsim/user/pass_alter.sim run tsim/user/pass_alter.sim
#run tsim/user/basic1.sim run tsim/user/basic1.sim
#run tsim/user/privilege2.sim run tsim/user/privilege2.sim
#run tsim/user/user_len.sim run tsim/user/user_len.sim
run tsim/user/privilege1.sim run tsim/user/privilege1.sim
run tsim/user/pass_len.sim run tsim/user/pass_len.sim
run tsim/table/basic1.sim run tsim/table/basic1.sim