From a39ec185dc4716a1db5d5612f32b0d1359498531 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 23 Dec 2021 16:10:46 +0800 Subject: [PATCH 1/7] [td-11818] enable some unittest cases. --- source/client/test/clientTests.cpp | 184 ++++++++++++++--------------- 1 file changed, 92 insertions(+), 92 deletions(-) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 7a1e8ca0c3..2938d0180a 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -49,101 +49,101 @@ int main(int argc, char** argv) { TEST(testCase, driverInit_Test) { taos_init(); } -// TEST(testCase, connect_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -// taos_close(pConn); -//} -// -// TEST(testCase, create_user_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + TEST(testCase, connect_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -// TEST(testCase, create_account_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + taos_close(pConn); +} + + TEST(testCase, create_user_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create user abc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + + TEST(testCase, create_account_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + + TEST(testCase, drop_account_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + + TEST(testCase, show_user_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "create account aabc pass 'abc'"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -// TEST(testCase, drop_account_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + + TAOS_RES* pRes = taos_query(pConn, "show users"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} + + TEST(testCase, drop_user_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "drop user abc"); + if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + printf("failed to create user, reason:%s\n", taos_errstr(pRes)); + } + + taos_free_result(pRes); + taos_close(pConn); +} + + TEST(testCase, show_db_Test) { + TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); // assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop account aabc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -// TEST(testCase, show_user_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show users"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} -// -// TEST(testCase, drop_user_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "drop user abc"); -// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { -// printf("failed to create user, reason:%s\n", taos_errstr(pRes)); -// } -// -// taos_free_result(pRes); -// taos_close(pConn); -//} -// -// TEST(testCase, show_db_Test) { -// TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); -//// assert(pConn != NULL); -// -// TAOS_RES* pRes = taos_query(pConn, "show databases"); -// TAOS_ROW pRow = NULL; -// -// TAOS_FIELD* pFields = taos_fetch_fields(pRes); -// int32_t numOfFields = taos_num_fields(pRes); -// -// char str[512] = {0}; -// while((pRow = taos_fetch_row(pRes)) != NULL) { -// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); -// printf("%s\n", str); -// } -// -// taos_close(pConn); -//} + + TAOS_RES* pRes = taos_query(pConn, "show databases"); + TAOS_ROW pRow = NULL; + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + int32_t numOfFields = taos_num_fields(pRes); + + char str[512] = {0}; + while((pRow = taos_fetch_row(pRes)) != NULL) { + int32_t code = taos_print_row(str, pRow, pFields, numOfFields); + printf("%s\n", str); + } + + taos_close(pConn); +} TEST(testCase, create_db_Test) { TAOS* pConn = taos_connect("ubuntu", "root", "taosdata", NULL, 0); From af96f5b5030a5f50f33d28cda39ff58547b63698 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 23 Dec 2021 20:01:51 +0800 Subject: [PATCH 2/7] feature/qnode --- include/common/taosmsg.h | 4 - source/libs/qworker/src/qworker.c | 12 ++ source/libs/scheduler/inc/schedulerInt.h | 29 +++- source/libs/scheduler/src/scheduler.c | 168 +++++++++++++++++++---- 4 files changed, 172 insertions(+), 41 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index ec51a67808..8fd1ca9e9f 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -587,10 +587,6 @@ typedef struct { typedef struct { int32_t code; - union { - uint64_t qhandle; - uint64_t qId; - }; // query handle } SQueryTableRsp; // todo: the show handle should be replaced with id diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 628077a020..6955da8a8c 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -553,7 +553,19 @@ _return: int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { + SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 3fab91edac..f3de499dcd 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -31,17 +31,24 @@ extern "C" { #define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA +enum { + SCH_READ = 1, + SCH_WRITE, +}; + typedef struct SSchedulerMgmt { uint64_t taskId; uint64_t schedulerId; SSchedulerCfg cfg; - SHashObj *Jobs; // key: queryId, value: SQueryJob* + SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; typedef struct SQueryTask { uint64_t taskId; // task id + SQueryLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree + int32_t msgLen; // msg length int8_t status; // task status SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary @@ -51,10 +58,13 @@ typedef struct SQueryTask { } SQueryTask; typedef struct SQueryLevel { - int32_t level; - int8_t status; - int32_t taskNum; - SArray *subTasks; // Element is SQueryTask + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + SArray *subTasks; // Element is SQueryTask } SQueryLevel; typedef struct SQueryJob { @@ -63,8 +73,8 @@ typedef struct SQueryJob { int32_t levelIdx; int8_t status; SQueryProfileSummary summary; - SEpSet dataSrcEps; - SEpAddr resEp; + SEpSet dataSrcEps; + SEpAddr resEp; void *transport; SArray *qnodeList; tsem_t rspSem; @@ -74,6 +84,7 @@ typedef struct SQueryJob { SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* + SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* SArray *levels; // Element is SQueryLevel, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. @@ -82,6 +93,7 @@ typedef struct SQueryJob { #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE #define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) +#define SCH_TASK_NEED_WAIT_ALL(type) (task->plan->type == QUERY_TYPE_MODIFY) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) @@ -91,6 +103,9 @@ typedef struct SQueryJob { #define SCH_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { qError(__VA_ARGS__); terrno = _code; return _code; } } while (0) #define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) +#define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) +#define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) + extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 83507c8dd7..a2fbdbe924 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -160,11 +160,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SQueryLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; + SQueryLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; for (int32_t i = 0; i < levelNum; ++i) { - level.level = i; + if (NULL == taosArrayPush(job->levels, &level)) { + qError("taosArrayPush failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + pLevel = taosArrayGet(job->levels, i); + + pLevel->level = i; levelPlans = taosArrayGetP(dag->pSubplans, i); if (NULL == levelPlans) { qError("no level plans for level %d", i); @@ -177,10 +185,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - level.taskNum = levelPlanNum; + pLevel->taskNum = levelPlanNum; - level.subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); - if (NULL == level.subTasks) { + pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + if (NULL == pLevel->subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -191,9 +199,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; + task.level = pLevel; task.status = JOB_TASK_STATUS_NOT_START; - void *p = taosArrayPush(level.subTasks, &task); + void *p = taosArrayPush(pLevel->subTasks, &task); if (NULL == p) { qError("taosArrayPush failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -205,10 +214,6 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { } } - if (NULL == taosArrayPush(job->levels, &level)) { - qError("taosArrayPush failed"); - SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } } SCH_ERR_JRET(schBuildTaskRalation(job, planToTask)); @@ -220,8 +225,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { return TSDB_CODE_SUCCESS; _return: - if (level.subTasks) { - taosArrayDestroy(level.subTasks); + if (pLevel->subTasks) { + taosArrayDestroy(pLevel->subTasks); } if (planToTask) { @@ -273,7 +278,23 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + if (0 != taosHashPut(job->succTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { + qError("taosHashPut failed"); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + *moved = true; + + return TSDB_CODE_SUCCESS; +} + +int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { + if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { + qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); + return TSDB_CODE_SUCCESS; + } + + if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -289,14 +310,23 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { void *msg = NULL; switch (msgType) { + case TSDB_MSG_TYPE_SUBMIT: { + if (NULL == task->msg || task->msgLen <= 0) { + qError("submit msg is NULL"); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + + msgSize = task->msgLen; + msg = task->msg; + break; + } case TSDB_MSG_TYPE_QUERY: { if (NULL == task->msg) { qError("query msg is NULL"); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - int32_t len = strlen(task->msg); - msgSize = sizeof(SSubQueryMsg) + len + 1; + msgSize = sizeof(SSubQueryMsg) + task->msgLen; msg = calloc(1, msgSize); if (NULL == msg) { qError("calloc %d failed", msgSize); @@ -308,11 +338,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); - pMsg->contentLen = htonl(len); - memcpy(pMsg->msg, task->msg, len); - pMsg->msg[len] = 0; + pMsg->contentLen = htonl(task->msgLen); + memcpy(pMsg->msg, task->msg, task->msgLen); break; - } + } case TSDB_MSG_TYPE_RES_READY: { msgSize = sizeof(SResReadyMsg); msg = calloc(1, msgSize); @@ -322,6 +351,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SResReadyMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -335,6 +365,21 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } SResFetchMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); + pMsg->queryId = htobe64(job->queryId); + pMsg->taskId = htobe64(task->taskId); + break; + } + case TSDB_MSG_TYPE_DROP_TASK:{ + msgSize = sizeof(STaskDropMsg); + msg = calloc(1, msgSize); + if (NULL == msg) { + qError("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + STaskDropMsg *pMsg = msg; + pMsg->schedulerId = htobe64(schMgmt.schedulerId); pMsg->queryId = htobe64(job->queryId); pMsg->taskId = htobe64(task->taskId); break; @@ -345,6 +390,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { } //TODO SEND MSG + //taosAsyncExec(rpcSendRequest(void * shandle, const SEpSet * pEpSet, SRpcMsg * pMsg, int64_t * pRid), p, &code); return TSDB_CODE_SUCCESS; } @@ -425,8 +471,29 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); - job->resEp.port = task->execAddr.port; + int32_t taskDone = 0; + + if (SCH_TASK_NEED_WAIT_ALL(task)) { + SCH_LOCK(SCH_WRITE, &task->level->lock); + task->level->taskFailed++; + taskDone = task->level->taskSucceed + task->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &task->level->lock); + + if (taskDone < task->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + return TSDB_CODE_SUCCESS; + } + + if (task->level->taskFailed > 0) { + job->status = JOB_TASK_STATUS_FAILED; + SCH_ERR_RET(schProcessOnJobFailure(job)); + + return TSDB_CODE_SUCCESS; + } + } else { + strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.port; + } SCH_ERR_RET(schProcessOnJobSuccess(job)); @@ -457,10 +524,30 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { bool needRetry = false; + bool moved = false; + int32_t taskDone = 0; SCH_ERR_RET(schTaskCheckAndSetRetry(job, task, errCode, &needRetry)); if (!needRetry) { SCH_TASK_ERR_LOG("task failed[%x], no more retry", errCode); + + SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); + if (!moved) { + SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); + return TSDB_CODE_SUCCESS; + } + + if (SCH_TASK_NEED_WAIT_ALL(task)) { + SCH_LOCK(SCH_WRITE, &task->level->lock); + task->level->taskFailed++; + taskDone = task->level->taskSucceed + task->level->taskFailed; + SCH_UNLOCK(SCH_WRITE, &task->level->lock); + + if (taskDone < task->level->taskNum) { + qDebug("wait all tasks, done:%d, all:%d", taskDone, task->level->taskNum); + return TSDB_CODE_SUCCESS; + } + } job->status = JOB_TASK_STATUS_FAILED; SCH_ERR_RET(schProcessOnJobFailure(job)); @@ -522,8 +609,7 @@ _return: int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SSubplan *plan = task->plan; - int32_t len = 0; - SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &len)); + SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); if (plan->execEpSet.numOfEps <= 0) { SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); } @@ -532,8 +618,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } + + int32_t msgType = (plan->type == QUERY_TYPE_MODIFY) ? TSDB_MSG_TYPE_SUBMIT : TSDB_MSG_TYPE_QUERY; - SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); + SCH_ERR_RET(schAsyncSendMsg(job, task, msgType)); SCH_ERR_RET(schPushTaskToExecList(job, task)); @@ -554,6 +642,16 @@ int32_t schLaunchJob(SQueryJob *job) { return TSDB_CODE_SUCCESS; } +void schDropJobAllTasks(SQueryJob *job) { + void *pIter = taosHashIterate(job->succTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + schAsyncSendMsg(job, task, int32_t msgType); + + pIter = taosHashIterate(schStatus->tasksHash, pIter); + } +} int32_t schedulerInit(SSchedulerCfg *cfg) { if (cfg) { @@ -562,8 +660,8 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } - schMgmt.Jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); - if (NULL == schMgmt.Jobs) { + schMgmt.jobs = taosHashInit(schMgmt.cfg.maxJobNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == schMgmt.jobs) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } @@ -605,9 +703,15 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->failTasks = taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == job->failTasks) { + qError("taosHashInit %d failed", pDag->numOfSubplans); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + tsem_init(&job->rspSem, 0, 0); - if (0 != taosHashPut(schMgmt.Jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { + if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -659,6 +763,8 @@ _return: int32_t scheduleCancelJob(void *pJob) { //TODO + //TODO MOVE ALL TASKS FROM EXEC LIST TO FAIL LIST + return TSDB_CODE_SUCCESS; } @@ -670,7 +776,7 @@ void scheduleFreeJob(void *pJob) { SQueryJob *job = pJob; if (job->status > 0) { - if (0 != taosHashRemove(schMgmt.Jobs, &job->queryId, sizeof(job->queryId))) { + if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { qError("remove job:%"PRIx64"from mgmt failed", job->queryId); // maybe already freed return; } @@ -678,15 +784,17 @@ void scheduleFreeJob(void *pJob) { if (job->status == JOB_TASK_STATUS_EXECUTING) { scheduleCancelJob(pJob); } + + schDropJobAllTasks(job); } //TODO free job } void schedulerDestroy(void) { - if (schMgmt.Jobs) { - taosHashCleanup(schMgmt.Jobs); //TODO - schMgmt.Jobs = NULL; + if (schMgmt.jobs) { + taosHashCleanup(schMgmt.jobs); //TODO + schMgmt.jobs = NULL; } } From fddd11ed470e043f85eb198d40ce6295256decda Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 24 Dec 2021 09:41:09 +0800 Subject: [PATCH 3/7] feature/qnode --- include/common/taosmsg.h | 12 ++ include/libs/qworker/qworker.h | 12 +- source/dnode/vnode/impl/CMakeLists.txt | 3 +- source/dnode/vnode/impl/inc/vnodeDef.h | 2 + source/dnode/vnode/impl/inc/vnodeQuery.h | 31 +++++ source/dnode/vnode/impl/src/vnodeInt.c | 10 -- source/dnode/vnode/impl/src/vnodeMain.c | 5 + source/dnode/vnode/impl/src/vnodeQuery.c | 35 ++++++ source/dnode/vnode/meta/inc/metaQuery.h | 6 +- source/libs/qworker/src/qworker.c | 149 +++++++++++++++++++---- source/libs/scheduler/inc/schedulerInt.h | 25 ++-- source/libs/scheduler/src/scheduler.c | 13 +- 12 files changed, 246 insertions(+), 57 deletions(-) create mode 100644 source/dnode/vnode/impl/inc/vnodeQuery.h create mode 100644 source/dnode/vnode/impl/src/vnodeQuery.c diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8fd1ca9e9f..8cafa39d8c 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -1121,6 +1121,10 @@ typedef struct SResReadyMsg { uint64_t taskId; } SResReadyMsg; +typedef struct SResReadyRsp { + int32_t code; +} SResReadyRsp; + typedef struct SResFetchMsg { uint64_t schedulerId; uint64_t queryId; @@ -1149,12 +1153,20 @@ typedef struct STaskCancelMsg { uint64_t taskId; } STaskCancelMsg; +typedef struct STaskCancelRsp { + int32_t code; +} STaskCancelRsp; + typedef struct STaskDropMsg { uint64_t schedulerId; uint64_t queryId; uint64_t taskId; } STaskDropMsg; +typedef struct STaskDropRsp { + int32_t code; +} STaskDropRsp; + #pragma pack(pop) diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 8e36178497..83047a44de 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -42,17 +42,17 @@ typedef struct { int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt); -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp); +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp); +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); void qWorkerDestroy(void **qWorkerMgmt); diff --git a/source/dnode/vnode/impl/CMakeLists.txt b/source/dnode/vnode/impl/CMakeLists.txt index 6972605afd..9e892bc4c4 100644 --- a/source/dnode/vnode/impl/CMakeLists.txt +++ b/source/dnode/vnode/impl/CMakeLists.txt @@ -15,9 +15,10 @@ target_link_libraries( PUBLIC wal PUBLIC sync PUBLIC cjson + PUBLIC qworker ) # test if(${BUILD_TEST}) add_subdirectory(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/dnode/vnode/impl/inc/vnodeDef.h b/source/dnode/vnode/impl/inc/vnodeDef.h index 605557d4ea..c5a57b02a6 100644 --- a/source/dnode/vnode/impl/inc/vnodeDef.h +++ b/source/dnode/vnode/impl/inc/vnodeDef.h @@ -34,6 +34,7 @@ #include "vnodeRequest.h" #include "vnodeStateMgr.h" #include "vnodeSync.h" +#include "vnodeQuery.h" #ifdef __cplusplus extern "C" { @@ -72,6 +73,7 @@ struct SVnode { SVnodeSync* pSync; SVnodeFS* pFs; tsem_t canCommit; + void* pQuery; }; int vnodeScheduleTask(SVnodeTask* task); diff --git a/source/dnode/vnode/impl/inc/vnodeQuery.h b/source/dnode/vnode/impl/inc/vnodeQuery.h new file mode 100644 index 0000000000..59bab42f62 --- /dev/null +++ b/source/dnode/vnode/impl/inc/vnodeQuery.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VNODE_READ_H_ +#define _TD_VNODE_READ_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "vnodeInt.h" +#include "qworker.h" + +int vnodeQueryOpen(SVnode *pVnode); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VNODE_READ_H_*/ diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 5deaffe6d2..65185f4a16 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -24,16 +24,6 @@ int32_t vnodeSync(SVnode *pVnode) { return 0; } int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } -int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("query message is processed"); - return 0; -} - -int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - vInfo("fetch message is processed"); - return 0; -} - int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("sync message is processed"); return 0; diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index c98f3e0800..2b0363c97f 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -127,6 +127,11 @@ static int vnodeOpenImpl(SVnode *pVnode) { return -1; } + // Open Query + if (vnodeQueryOpen(pVnode)) { + return -1; + } + // TODO return 0; } diff --git a/source/dnode/vnode/impl/src/vnodeQuery.c b/source/dnode/vnode/impl/src/vnodeQuery.c new file mode 100644 index 0000000000..31481bf7c4 --- /dev/null +++ b/source/dnode/vnode/impl/src/vnodeQuery.c @@ -0,0 +1,35 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeDef.h" +#include "vnodeQuery.h" + +int vnodeQueryOpen(SVnode *pVnode) { + return qWorkerInit(NULL, &pVnode->pQuery); +} + +int vnodeProcessQueryReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("query message is processed"); + qWorkerProcessQueryMsg(pVnode, pVnode->pQuery, pMsg); + return 0; +} + +int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { + vInfo("fetch message is processed"); + qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); + return 0; +} + + diff --git a/source/dnode/vnode/meta/inc/metaQuery.h b/source/dnode/vnode/meta/inc/metaQuery.h index 110df8dd45..ca3b68b415 100644 --- a/source/dnode/vnode/meta/inc/metaQuery.h +++ b/source/dnode/vnode/meta/inc/metaQuery.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_META_QUERY_H_ -#define _TD_META_QUERY_H_ +#ifndef _VNODE_QUERY_H_ +#define _VNODE_QUERY_H_ #ifdef __cplusplus extern "C" { @@ -24,4 +24,4 @@ extern "C" { } #endif -#endif /*_TD_META_QUERY_H_*/ \ No newline at end of file +#endif /*_VNODE_QUERY_H_*/ \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 6955da8a8c..37d3e655c2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -558,6 +558,7 @@ int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { SRpcMsg rpcRsp = { .handle = pMsg->handle, + .ahandle = pMsg->ahandle, .pCont = pRsp, .contLen = sizeof(*pRsp), .code = code, @@ -569,24 +570,105 @@ int32_t qwBuildAndSendQueryRsp(SRpcMsg *pMsg, int32_t code) { } int32_t qwBuildAndSendReadyRsp(SRpcMsg *pMsg, int32_t code) { + SResReadyRsp *pRsp = (SResReadyRsp *)rpcMallocCont(sizeof(SResReadyRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { + int32_t size = 0; + + if (sStatus) { + size = sizeof(SSchedulerStatusRsp) + sizeof(sStatus->status[0]) * sStatus->num; + } else { + size = sizeof(SSchedulerStatusRsp); + } + + SSchedulerStatusRsp *pRsp = (SSchedulerStatusRsp *)rpcMallocCont(size); + if (sStatus) { + memcpy(pRsp, sStatus, size); + } else { + pRsp->num = 0; + } + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = size, + .code = 0, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } int32_t qwBuildAndSendFetchRsp(SRpcMsg *pMsg, void *data) { + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); + memset(pRsp, 0, sizeof(SRetrieveTableRsp)); + //TODO fill msg + pRsp->completed = true; + + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = 0, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg) { +int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { + STaskCancelRsp *pRsp = (STaskCancelRsp *)rpcMallocCont(sizeof(STaskCancelRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } -int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg) { +int32_t qwBuildAndSendDropRsp(SRpcMsg *pMsg, int32_t code) { + STaskDropRsp *pRsp = (STaskDropRsp *)rpcMallocCont(sizeof(STaskDropRsp)); + pRsp->code = code; + SRpcMsg rpcRsp = { + .handle = pMsg->handle, + .ahandle = pMsg->ahandle, + .pCont = pRsp, + .contLen = sizeof(*pRsp), + .code = code, + }; + + rpcSendResponse(&rpcRsp); + + return TSDB_CODE_SUCCESS; } @@ -724,8 +806,10 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu SQWorkerSchStatus *sch = NULL; SQWorkerTaskStatus *task = NULL; int32_t code = 0; + int32_t needRsp = true; + void *data = NULL; - QW_ERR_RET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); + QW_ERR_JRET(qwAcquireScheduler(QW_READ, mgmt, schedulerId, &sch)); QW_ERR_JRET(qwAcquireTask(QW_READ, sch, queryId, taskId, &task)); QW_LOCK(QW_READ, &task->lock); @@ -736,7 +820,7 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu } if (QW_GOT_RES_DATA(res->data)) { - QW_ERR_JRET(qwBuildAndSendFetchRsp(pMsg, res->data)); + data = res->data; if (QW_LOW_RES_DATA(res->data)) { if (task->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { //TODO add query back to queue @@ -749,6 +833,8 @@ int32_t qwHandleFetch(SQWorkerResCache *res, SQWorkerMgmt *mgmt, uint64_t schedu } //TODO SET FLAG FOR QUERY TO SEND RSP WHEN RES READY + + needRsp = false; } _return: @@ -758,9 +844,12 @@ _return: if (sch) { qwReleaseTask(QW_READ, sch); + qwReleaseScheduler(QW_READ, mgmt); } - qwReleaseScheduler(QW_READ, mgmt); + if (needRsp) { + qwBuildAndSendFetchRsp(pMsg, res->data); + } QW_RET(code); } @@ -844,13 +933,14 @@ int32_t qWorkerInit(SQWorkerCfg *cfg, void **qWorkerMgmt) { return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg **rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SSubQueryMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid query msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -863,7 +953,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp QW_ERR_JRET(qwCheckTaskCancelDrop(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId, &needStop)); if (needStop) { qWarn("task need stop"); - QW_ERR_RET(TSDB_CODE_QRY_TASK_CANCELLED); + QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } code = qStringToSubplan(msg->msg, &plan); @@ -922,13 +1012,14 @@ _return: QW_RET(code); } -int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp){ - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg){ + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } SResReadyMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -937,27 +1028,31 @@ int32_t qWorkerProcessReadyMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRp return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; SSchTasksStatusMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task status msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SSchedulerStatusRsp *sStatus = NULL; - QW_ERR_RET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + QW_ERR_JRET(qwGetSchTasksStatus(qWorkerMgmt, msg->schedulerId, &sStatus)); + +_return: QW_ERR_RET(qwBuildAndSendStatusRsp(pMsg, sStatus)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } @@ -983,36 +1078,44 @@ _return: QW_RET(code); } -int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; STaskCancelMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task cancel msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + QW_ERR_JRET(qwCancelTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); - QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg)); +_return: + + QW_ERR_RET(qwBuildAndSendCancelRsp(pMsg, code)); return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SRpcMsg *rsp) { - if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg || NULL == rsp) { +int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { return TSDB_CODE_QRY_INVALID_INPUT; } + int32_t code = 0; STaskDropMsg *msg = pMsg->pCont; if (NULL == msg || pMsg->contLen <= sizeof(*msg)) { + qError("invalid task drop msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - QW_ERR_RET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); + QW_ERR_JRET(qwCancelDropTask(qWorkerMgmt, msg->schedulerId, msg->queryId, msg->taskId)); - QW_ERR_RET(qwBuildAndSendDropRsp(pMsg)); +_return: + + QW_ERR_RET(qwBuildAndSendDropRsp(pMsg, code)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index f3de499dcd..bc7bc44350 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,6 +43,17 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; +typedef struct SQueryLevel { + int32_t level; + int8_t status; + SRWLatch lock; + int32_t taskFailed; + int32_t taskSucceed; + int32_t taskNum; + SArray *subTasks; // Element is SQueryTask +} SQueryLevel; + + typedef struct SQueryTask { uint64_t taskId; // task id SQueryLevel *level; // level @@ -57,16 +68,6 @@ typedef struct SQueryTask { SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; -typedef struct SQueryLevel { - int32_t level; - int8_t status; - SRWLatch lock; - int32_t taskFailed; - int32_t taskSucceed; - int32_t taskNum; - SArray *subTasks; // Element is SQueryTask -} SQueryLevel; - typedef struct SQueryJob { uint64_t queryId; int32_t levelNum; @@ -92,8 +93,8 @@ typedef struct SQueryJob { #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE -#define SCH_IS_DATA_SRC_TASK(task) (task->plan->type == QUERY_TYPE_SCAN) -#define SCH_TASK_NEED_WAIT_ALL(type) (task->plan->type == QUERY_TYPE_MODIFY) +#define SCH_IS_DATA_SRC_TASK(task) ((task)->plan->type == QUERY_TYPE_SCAN) +#define SCH_TASK_NEED_WAIT_ALL(task) ((task)->plan->type == QUERY_TYPE_MODIFY) #define SCH_JOB_ERR_LOG(param, ...) qError("QID:%"PRIx64 param, job->queryId, __VA_ARGS__) #define SCH_TASK_ERR_LOG(param, ...) qError("QID:%"PRIx64",TID:%"PRIx64 param, job->queryId, task->taskId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a2fbdbe924..4185b3176c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -647,9 +647,18 @@ void schDropJobAllTasks(SQueryJob *job) { while (pIter) { SQueryTask *task = *(SQueryTask **)pIter; - schAsyncSendMsg(job, task, int32_t msgType); + schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); - pIter = taosHashIterate(schStatus->tasksHash, pIter); + pIter = taosHashIterate(job->succTasks, pIter); + } + + pIter = taosHashIterate(job->failTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); + + pIter = taosHashIterate(job->succTasks, pIter); } } From 4426e88098566b8ffaadb7f75960b6d7f9b768b1 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 18:24:24 -0800 Subject: [PATCH 4/7] minor changes --- source/dnode/mgmt/impl/test/db/db.cpp | 73 ++++++++++++++------------- 1 file changed, 38 insertions(+), 35 deletions(-) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 378f46aa4d..823f58e3a8 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -162,51 +162,52 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { CheckBinary("ms", 3); // precision CheckInt8(0); // update - // restart - test.Restart(); + // // restart + // test.Restart(); - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + // test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); + // CHECK_META("show databases", 17); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 1); + // test.SendShowRetrieveMsg(); + // EXPECT_EQ(test.GetShowRows(), 1); - CheckBinary("d1", TSDB_DB_NAME_LEN - 1); - CheckTimestamp(); - CheckInt16(2); // vgroups - CheckInt16(1); // replica - CheckInt16(2); // quorum - CheckInt16(10); // days - CheckBinary("300,400,500", 24); // days - CheckInt32(16); // cache - CheckInt32(12); // blocks - CheckInt32(100); // minrows - CheckInt32(4096); // maxrows - CheckInt8(2); // wallevel - CheckInt32(4000); // fsync - CheckInt8(2); // comp - CheckInt8(1); // cachelast - CheckBinary("ms", 3); // precision - CheckInt8(0); // update + // CheckBinary("d1", TSDB_DB_NAME_LEN - 1); + // CheckTimestamp(); + // CheckInt16(2); // vgroups + // CheckInt16(1); // replica + // CheckInt16(2); // quorum + // CheckInt16(10); // days + // CheckBinary("300,400,500", 24); // days + // CheckInt32(16); // cache + // CheckInt32(12); // blocks + // CheckInt32(100); // minrows + // CheckInt32(4096); // maxrows + // CheckInt8(2); // wallevel + // CheckInt32(4000); // fsync + // CheckInt8(2); // comp + // CheckInt8(1); // cachelast + // CheckBinary("ms", 3); // precision + // CheckInt8(0); // update - { - int32_t contLen = sizeof(SDropDbMsg); + // { + // int32_t contLen = sizeof(SDropDbMsg); - SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen); - strcpy(pReq->db, "1.d1"); + // SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen); + // strcpy(pReq->db, "1.d1"); - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DB, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - } + // SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DB, pReq, contLen); + // ASSERT_NE(pMsg, nullptr); + // ASSERT_EQ(pMsg->code, 0); + // } - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + // test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); + // CHECK_META("show databases", 17); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 0); + // test.SendShowRetrieveMsg(); + // EXPECT_EQ(test.GetShowRows(), 0); } +#if 0 TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -298,3 +299,5 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { } } } + +#endif \ No newline at end of file From a698284093c7cf603b2ac2d3939854f2ff81412b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 18:27:59 -0800 Subject: [PATCH 5/7] minor changes --- source/dnode/mgmt/impl/test/db/db.cpp | 73 +++++++++++++-------------- 1 file changed, 35 insertions(+), 38 deletions(-) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 823f58e3a8..378f46aa4d 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -162,52 +162,51 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { CheckBinary("ms", 3); // precision CheckInt8(0); // update - // // restart - // test.Restart(); + // restart + test.Restart(); - // test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - // CHECK_META("show databases", 17); + test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); + CHECK_META("show databases", 17); - // test.SendShowRetrieveMsg(); - // EXPECT_EQ(test.GetShowRows(), 1); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 1); - // CheckBinary("d1", TSDB_DB_NAME_LEN - 1); - // CheckTimestamp(); - // CheckInt16(2); // vgroups - // CheckInt16(1); // replica - // CheckInt16(2); // quorum - // CheckInt16(10); // days - // CheckBinary("300,400,500", 24); // days - // CheckInt32(16); // cache - // CheckInt32(12); // blocks - // CheckInt32(100); // minrows - // CheckInt32(4096); // maxrows - // CheckInt8(2); // wallevel - // CheckInt32(4000); // fsync - // CheckInt8(2); // comp - // CheckInt8(1); // cachelast - // CheckBinary("ms", 3); // precision - // CheckInt8(0); // update + CheckBinary("d1", TSDB_DB_NAME_LEN - 1); + CheckTimestamp(); + CheckInt16(2); // vgroups + CheckInt16(1); // replica + CheckInt16(2); // quorum + CheckInt16(10); // days + CheckBinary("300,400,500", 24); // days + CheckInt32(16); // cache + CheckInt32(12); // blocks + CheckInt32(100); // minrows + CheckInt32(4096); // maxrows + CheckInt8(2); // wallevel + CheckInt32(4000); // fsync + CheckInt8(2); // comp + CheckInt8(1); // cachelast + CheckBinary("ms", 3); // precision + CheckInt8(0); // update - // { - // int32_t contLen = sizeof(SDropDbMsg); + { + int32_t contLen = sizeof(SDropDbMsg); - // SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen); - // strcpy(pReq->db, "1.d1"); + SDropDbMsg* pReq = (SDropDbMsg*)rpcMallocCont(contLen); + strcpy(pReq->db, "1.d1"); - // SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DB, pReq, contLen); - // ASSERT_NE(pMsg, nullptr); - // ASSERT_EQ(pMsg->code, 0); - // } + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DB, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, 0); + } - // test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - // CHECK_META("show databases", 17); + test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); + CHECK_META("show databases", 17); - // test.SendShowRetrieveMsg(); - // EXPECT_EQ(test.GetShowRows(), 0); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 0); } -#if 0 TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -299,5 +298,3 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { } } } - -#endif \ No newline at end of file From 067116330bfb66c156f9f68ef6f17fcfec0e8574 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 24 Dec 2021 10:54:15 +0800 Subject: [PATCH 6/7] [td-11818] merge 3.0 and fix a bug. --- source/client/src/clientImpl.c | 18 +++++++++++++----- source/client/src/clientMsgHandler.c | 6 +++--- source/libs/parser/src/astValidate.c | 2 -- 3 files changed, 16 insertions(+), 10 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 992d93f39b..f00e7bd578 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -113,6 +113,13 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, return taosConnectImpl(ip, user, &secretEncrypt[0], db, port, NULL, NULL, *pInst); } +static bool supportedQueryType(int32_t type) { + return (type == TSDB_MSG_TYPE_CREATE_USER || type == TSDB_MSG_TYPE_SHOW || type == TSDB_MSG_TYPE_DROP_USER || + type == TSDB_MSG_TYPE_DROP_ACCT || type == TSDB_MSG_TYPE_CREATE_DB || type == TSDB_MSG_TYPE_CREATE_ACCT || + type == TSDB_MSG_TYPE_CREATE_TABLE || type == TSDB_MSG_TYPE_CREATE_STB || type == TSDB_MSG_TYPE_USE_DB || + type == TSDB_MSG_TYPE_DROP_DB || type == TSDB_MSG_TYPE_DROP_STB); +} + TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { STscObj *pTscObj = (STscObj *)taos; if (sqlLen > (size_t) tsMaxSQLStringLen) { @@ -145,16 +152,17 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { tscDebugL("0x%"PRIx64" SQL: %s", pRequest->requestId, pRequest->sqlstr); SParseContext cxt = { - .ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)}, - .pSql = pRequest->sqlstr, + .ctx = {.requestId = pRequest->requestId, .acctId = pTscObj->acctId, .db = getConnectionDB(pTscObj)}, + .pSql = pRequest->sqlstr, .sqlLen = sqlLen, - .pMsg = pRequest->msgBuf, + .pMsg = pRequest->msgBuf, .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE }; + SQueryNode* pQuery = NULL; int32_t code = qParseQuerySql(&cxt, &pQuery); if (qIsDclQuery(pQuery)) { - SDclStmtInfo* pDcl = (SDclStmtInfo*)pQuery; + SDclStmtInfo* pDcl = (SDclStmtInfo*) pQuery; pRequest->type = pDcl->msgType; pRequest->body.requestMsg = (SReqMsgInfo){.pMsg = pDcl->pMsg, .len = pDcl->msgLen}; @@ -215,7 +223,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { int initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet) { pEpSet->version = 0; - // init mgmt ip set + // init mnode ip set SEpSet *mgmtEpSet = &(pEpSet->epSet); mgmtEpSet->numOfEps = 0; mgmtEpSet->inUse = 0; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 2fc5e84bff..548ea3d725 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -110,9 +110,9 @@ SRequestMsgBody buildRequestMsgImpl(SRequestObj *pRequest) { assert(pRequest != NULL); SRequestMsgBody body = { .requestObjRefId = pRequest->self, - .msgInfo = pRequest->body.requestMsg, - .msgType = pRequest->type, - .requestId = pRequest->requestId, + .msgInfo = pRequest->body.requestMsg, + .msgType = pRequest->type, + .requestId = pRequest->requestId, }; return body; } diff --git a/source/libs/parser/src/astValidate.c b/source/libs/parser/src/astValidate.c index 5d64323332..cd11607326 100644 --- a/source/libs/parser/src/astValidate.c +++ b/source/libs/parser/src/astValidate.c @@ -4457,8 +4457,6 @@ int32_t qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseBasicCtx* pCtx, SDclStm return TSDB_CODE_TSC_INVALID_OPERATION; } - strncpy(pCreateMsg->db, token.z, token.n); - pDcl->pMsg = (char*)pCreateMsg; pDcl->msgLen = sizeof(SCreateDbMsg); pDcl->msgType = (pInfo->type == TSDB_SQL_CREATE_DB)? TSDB_MSG_TYPE_CREATE_DB:TSDB_MSG_TYPE_ALTER_DB; From b54291201eded090dc3763eb4c421bef648ab49d Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 19:27:36 -0800 Subject: [PATCH 7/7] add test case --- source/dnode/mgmt/impl/test/db/db.cpp | 42 +++++++++++++++------------ source/dnode/mnode/impl/src/mndDb.c | 10 +++++++ tests/script/general/db/basic1.sim | 4 +-- tests/script/jenkins/basic.txt | 4 +++ 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mgmt/impl/test/db/db.cpp index 378f46aa4d..79a7477cea 100644 --- a/source/dnode/mgmt/impl/test/db/db.cpp +++ b/source/dnode/mgmt/impl/test/db/db.cpp @@ -27,24 +27,25 @@ Testbase DndTestDb::test; TEST_F(DndTestDb, 01_ShowDb) { test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + CHECK_META("show databases", 18); CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); CHECK_SCHEMA(1, TSDB_DATA_TYPE_TIMESTAMP, 8, "create_time"); CHECK_SCHEMA(2, TSDB_DATA_TYPE_SMALLINT, 2, "vgroups"); - CHECK_SCHEMA(3, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); - CHECK_SCHEMA(4, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); - CHECK_SCHEMA(5, TSDB_DATA_TYPE_SMALLINT, 2, "days"); - CHECK_SCHEMA(6, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); - CHECK_SCHEMA(7, TSDB_DATA_TYPE_INT, 4, "cache"); - CHECK_SCHEMA(8, TSDB_DATA_TYPE_INT, 4, "blocks"); - CHECK_SCHEMA(9, TSDB_DATA_TYPE_INT, 4, "minrows"); - CHECK_SCHEMA(10, TSDB_DATA_TYPE_INT, 4, "maxrows"); - CHECK_SCHEMA(11, TSDB_DATA_TYPE_TINYINT, 1, "wallevel"); - CHECK_SCHEMA(12, TSDB_DATA_TYPE_INT, 4, "fsync"); - CHECK_SCHEMA(13, TSDB_DATA_TYPE_TINYINT, 1, "comp"); - CHECK_SCHEMA(14, TSDB_DATA_TYPE_TINYINT, 1, "cachelast"); - CHECK_SCHEMA(15, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision"); - CHECK_SCHEMA(16, TSDB_DATA_TYPE_TINYINT, 1, "update"); + CHECK_SCHEMA(3, TSDB_DATA_TYPE_INT, 4, "ntables"); + CHECK_SCHEMA(4, TSDB_DATA_TYPE_SMALLINT, 2, "replica"); + CHECK_SCHEMA(5, TSDB_DATA_TYPE_SMALLINT, 2, "quorum"); + CHECK_SCHEMA(6, TSDB_DATA_TYPE_SMALLINT, 2, "days"); + CHECK_SCHEMA(7, TSDB_DATA_TYPE_BINARY, 24 + VARSTR_HEADER_SIZE, "keep0,keep1,keep2"); + CHECK_SCHEMA(8, TSDB_DATA_TYPE_INT, 4, "cache"); + CHECK_SCHEMA(9, TSDB_DATA_TYPE_INT, 4, "blocks"); + CHECK_SCHEMA(10, TSDB_DATA_TYPE_INT, 4, "minrows"); + CHECK_SCHEMA(11, TSDB_DATA_TYPE_INT, 4, "maxrows"); + CHECK_SCHEMA(12, TSDB_DATA_TYPE_TINYINT, 1, "wallevel"); + CHECK_SCHEMA(13, TSDB_DATA_TYPE_INT, 4, "fsync"); + CHECK_SCHEMA(14, TSDB_DATA_TYPE_TINYINT, 1, "comp"); + CHECK_SCHEMA(15, TSDB_DATA_TYPE_TINYINT, 1, "cachelast"); + CHECK_SCHEMA(16, TSDB_DATA_TYPE_BINARY, 3 + VARSTR_HEADER_SIZE, "precision"); + CHECK_SCHEMA(17, TSDB_DATA_TYPE_TINYINT, 1, "update"); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 0); @@ -82,13 +83,14 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { } test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + CHECK_META("show databases", 18); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); CheckInt16(2); // vgroups + CheckInt32(0); // ntables CheckInt16(1); // replica CheckInt16(1); // quorum CheckInt16(10); // days @@ -147,6 +149,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); CheckInt16(2); // vgroups + CheckInt32(0); CheckInt16(1); // replica CheckInt16(2); // quorum CheckInt16(10); // days @@ -166,7 +169,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { test.Restart(); test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + CHECK_META("show databases", 18); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); @@ -174,6 +177,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { CheckBinary("d1", TSDB_DB_NAME_LEN - 1); CheckTimestamp(); CheckInt16(2); // vgroups + CheckInt32(0); CheckInt16(1); // replica CheckInt16(2); // quorum CheckInt16(10); // days @@ -201,7 +205,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { } test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + CHECK_META("show databases", 18); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 0); @@ -239,7 +243,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { } test.SendShowMetaMsg(TSDB_MGMT_TABLE_DB, ""); - CHECK_META("show databases", 17); + CHECK_META("show databases", 18); test.SendShowRetrieveMsg(); EXPECT_EQ(test.GetShowRows(), 1); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index dd474d85f3..7c12714a46 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -876,6 +876,12 @@ static int32_t mndGetDbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMe pSchema[cols].bytes = htonl(pShow->bytes[cols]); cols++; + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "ntables"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + pShow->bytes[cols] = 2; pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; strcpy(pSchema[cols].name, "replica"); @@ -1017,6 +1023,10 @@ static int32_t mndRetrieveDbs(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int3 *(int16_t *)pWrite = pDb->cfg.numOfVgroups; cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = 0; // todo + cols++; + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; *(int16_t *)pWrite = pDb->cfg.replications; cols++; diff --git a/tests/script/general/db/basic1.sim b/tests/script/general/db/basic1.sim index 9ec1aabe98..dc62c33842 100644 --- a/tests/script/general/db/basic1.sim +++ b/tests/script/general/db/basic1.sim @@ -15,7 +15,7 @@ if $data00 != d1 then return -1 endi -if $data02 != 0 then +if $data02 != 2 then return -1 endi @@ -51,7 +51,7 @@ if $data00 != d4 then return -1 endi -if $data02 != 0 then +if $data02 != 2 then return -1 endi diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 2408faefa0..1cc15f731d 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -1,7 +1,11 @@ #======================b1-start=============== +# ---- user ./test.sh -f general/user/basic1.sim +# ---- db +./test.sh -f general/db/basic1.sim + #======================b1-end===============