From e2d8dfafe8b17272f9baee1b9fb102ff36770ce5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Jan 2022 19:32:29 +0800 Subject: [PATCH] feature/qnode --- include/libs/scheduler/scheduler.h | 8 + include/util/taoserror.h | 1 + source/libs/scheduler/src/scheduler.c | 123 +++++++-- source/libs/scheduler/test/schedulerTests.cpp | 249 +++++++++++++++++- source/util/src/terror.c | 1 + 5 files changed, 359 insertions(+), 23 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 1f369067d6..3262b9437c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -114,6 +114,14 @@ void schedulerDestroy(void); */ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks); +/** + * make one task info's multiple copies + * @param src + * @param dst SArray** + * @return + */ +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum); + void schedulerFreeTaskList(SArray *taskList); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a855e6d881..fc895069af 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -361,6 +361,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping") #define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation") #define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error") +#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired") diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c53926f8c1..ccf0c58ac0 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,12 +20,38 @@ static SSchedulerMgmt schMgmt = {0}; +uint64_t schGenTaskId(void) { + return atomic_add_fetch_64(&schMgmt.taskId, 1); +} + +uint64_t schGenUUID(void) { + static uint64_t hashId = 0; + static int32_t requestSerialId = 0; + + if (hashId == 0) { + char uid[64]; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + } + + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + + uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + return id; +} + int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + pTask->taskId = schGenTaskId(); pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->execAddrs) { SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); @@ -41,7 +67,7 @@ void schFreeTask(SSchTask* pTask) { } // TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE - //tfree(pTask->msg); + tfree(pTask->msg); if (pTask->children) { taosArrayDestroy(pTask->children); @@ -141,7 +167,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); break; default: @@ -541,12 +567,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b return TSDB_CODE_SUCCESS; } - - -// Note: no more error processing, handled in function internal -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); if (errCode) { atomic_store_32(&pJob->errCode, errCode); @@ -563,6 +586,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); +} + +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode)); +} + + // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; @@ -871,7 +905,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } pJob = *job; @@ -880,13 +914,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t s = taosHashGetSize(pJob->execTasks); if (s <= 0) { - qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("QID:%"PRIx64",TID:%"PRIx64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -1434,7 +1468,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(plan->id.queryId); - pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1)); + pMsg->taskId = htobe64(schGenUUID()); pMsg->contentLen = htonl(msgLen); memcpy(pMsg->msg, msg, msgLen); @@ -1457,6 +1491,52 @@ _return: SCH_RET(code); } +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { + if (NULL == src || NULL == dst || copyNum <= 0) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + + *dst = taosArrayInit(copyNum, sizeof(STaskInfo)); + if (NULL == *dst) { + qError("taosArrayInit %d taskInfo failed", copyNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); + STaskInfo info = {0}; + + info.addr = src->addr; + + for (int32_t i = 0; i < copyNum; ++i) { + info.msg = malloc(msgSize); + if (NULL == info.msg) { + qError("malloc %d failed", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + memcpy(info.msg, src->msg, msgSize); + + info.msg->taskId = schGenUUID(); + + if (NULL == taosArrayPush(*dst, &info)) { + qError("taosArrayPush failed, idx:%d", i); + free(info.msg); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + return TSDB_CODE_SUCCESS; + +_return: + + schedulerFreeTaskList(*dst); + *dst = NULL; + + SCH_RET(code); +} + int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { @@ -1464,14 +1544,15 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { } int32_t code = 0; + atomic_add_fetch_32(&pJob->ref, 1); + int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - return TSDB_CODE_SCH_STATUS_ERROR; + atomic_sub_fetch_32(&pJob->ref, 1); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_add_fetch_32(&pJob->ref, 1); - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); atomic_sub_fetch_32(&pJob->ref, 1); @@ -1484,7 +1565,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (status == JOB_TASK_STATUS_FAILED) { + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { *pData = atomic_load_ptr(&pJob->res); atomic_store_ptr(&pJob->res, NULL); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); @@ -1500,7 +1581,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_FAILED) { + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { code = atomic_load_32(&pJob->errCode); SCH_ERR_JRET(code); } @@ -1547,6 +1628,7 @@ void scheduleFreeJob(void *job) { SSchJob *pJob = job; uint64_t queryId = pJob->queryId; + bool setJobFree = false; if (SCH_GET_JOB_STATUS(pJob) > 0) { if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { @@ -1554,8 +1636,6 @@ void scheduleFreeJob(void *job) { return; } - schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); - SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); while (true) { @@ -1563,6 +1643,11 @@ void scheduleFreeJob(void *job) { if (0 == ref) { break; } else if (ref > 0) { + if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) { + schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); + setJobFree = true; + } + usleep(1); } else { assert(0); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 5332c6fcd1..58159f2306 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -38,6 +38,20 @@ namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode); + +struct SSchJob *pInsertJob = NULL; +struct SSchJob *pQueryJob = NULL; + +uint64_t schtMergeTemplateId = 0x4; +uint64_t schtFetchTaskId = 0; +uint64_t schtQueryId = 1; + +bool schtTestStop = false; +bool schtTestDeadLoop = false; +int32_t schtTestMTRunSec = 60; +int32_t schtTestPrintNum = 1000; +int32_t schtStartFetch = 0; void schtInitLogFile() { @@ -57,7 +71,7 @@ void schtInitLogFile() { void schtBuildQueryDag(SQueryDag *dag) { - uint64_t qId = 0x0000000000000001; + uint64_t qId = schtQueryId; dag->queryId = qId; dag->numOfSubplans = 2; @@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; - mergePlan->id.templateId = 0x4444444444; + mergePlan->id.templateId = schtMergeTemplateId; mergePlan->id.subplanId = 0x5555555555; mergePlan->type = QUERY_TYPE_MERGE; mergePlan->level = 0; @@ -269,15 +283,225 @@ void *schtCreateFetchRspThread(void *param) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); rsp->completed = 1; rsp->numOfRows = 10; + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); assert(code == 0); } +void *schtFetchRspThread(void *aa) { + SDataBuf dataBuf = {0}; + SSchCallbackParam* param = NULL; + + while (!schtTestStop) { + if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) { + continue; + } + + usleep(1); + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + + param->queryId = schtQueryId; + param->taskId = schtFetchTaskId; + + int32_t code = 0; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + rsp->completed = 1; + rsp->numOfRows = 10; + + dataBuf.pData = rsp; + dataBuf.len = sizeof(*rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0); + + assert(code == 0 || code); + } +} + +void schtFreeQueryJob(int32_t freeThread) { + static uint32_t freeNum = 0; + SSchJob *job = atomic_load_ptr(&pQueryJob); + + if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { + scheduleFreeJob(job); + if (freeThread) { + if (++freeNum % schtTestPrintNum == 0) { + printf("FreeNum:%d\n", freeNum); + } + } + } +} + +void* schtRunJobThread(void *aa) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + + schtInitLogFile(); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + assert(code == 0); + + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + SSchJob *job = NULL; + SSchCallbackParam *param = NULL; + SHashObj *execTasks = NULL; + SDataBuf dataBuf = {0}; + uint32_t jobFinished = 0; + + while (!schtTestStop) { + schtBuildQueryDag(&dag); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); + assert(code == 0); + + execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + schtFetchTaskId = task->taskId - 1; + + taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); + pIter = taosHashIterate(job->execTasks, pIter); + } + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pQueryJob = job; + + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + atomic_store_32(&schtStartFetch, 1); + + void *data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + if (0 == code) { + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + assert(pRsp->completed == 1); + assert(pRsp->numOfRows == 10); + } + + data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + assert(data == (void*)NULL); + + schtFreeQueryJob(0); + + taosHashCleanup(execTasks); + + schtFreeQueryDag(&dag); + + if (++jobFinished % schtTestPrintNum == 0) { + printf("jobFinished:%d\n", jobFinished); + } + + ++schtQueryId; + } + + schedulerDestroy(); + +} + +void* schtFreeJobThread(void *aa) { + while (!schtTestStop) { + usleep(rand() % 2000); + schtFreeQueryJob(1); + } +} -struct SSchJob *pInsertJob = NULL; } TEST(queryTest, normalCase) { @@ -427,13 +651,30 @@ TEST(insertTest, normalCase) { } TEST(multiThread, forceFree) { + pthread_attr_t thattr; + pthread_attr_init(&thattr); - schtInitLogFile(); + pthread_t thread1, thread2, thread3; + pthread_create(&(thread1), &thattr, schtRunJobThread, NULL); + pthread_create(&(thread2), &thattr, schtFreeJobThread, NULL); + pthread_create(&(thread3), &thattr, schtFetchRspThread, NULL); + while (true) { + if (schtTestDeadLoop) { + sleep(1); + } else { + sleep(schtTestMTRunSec); + break; + } + } + + schtTestStop = true; + sleep(3); } int main(int argc, char** argv) { + srand(time(NULL)); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 144de08cd0..f520585315 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed")