From 460036a0d49194c04472ecd076dcc655856b7809 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 6 Apr 2022 17:50:03 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/src/qworker.c | 14 ++--- source/libs/scalar/src/filter.c | 44 +++++++++++++--- .../libs/scalar/test/filter/filterTests.cpp | 52 +++++++++++++++++++ source/libs/scheduler/src/scheduler.c | 38 ++++++++++---- 4 files changed, 123 insertions(+), 25 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a83fcc26f2..f0f04a8a9b 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -977,10 +977,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } - QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + //QW_ERR_JRET(qwBuildAndSendQueryRsp(&qwMsg->connInfo, code)); + //QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - queryRsped = true; + //queryRsped = true; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); @@ -994,10 +994,10 @@ _return: input.code = code; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - if (!queryRsped) { - qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); - } + //if (!queryRsped) { + // qwBuildAndSendQueryRsp(&qwMsg->connInfo, code); + // QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); + //} QW_RET(TSDB_CODE_SUCCESS); } diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 393f143e5f..16236fc05b 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -687,11 +687,15 @@ int32_t filterGetRangeRes(void* h, SFilterRange *ra) { SFilterRangeNode* r = ctx->rs; while (r) { - FILTER_COPY_RA(ra, &r->ra); + if (num) { + ra->e = r->ra.e; + ra->eflag = r->ra.eflag; + } else { + FILTER_COPY_RA(ra, &r->ra); + } ++num; r = r->next; - ++ra; } if (num == 0) { @@ -3314,8 +3318,7 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t -int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { - SFilterInfo *info = NULL; +int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *isStrict) { SFilterRange ra = {0}; SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP); @@ -3369,13 +3372,14 @@ int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { *win = TSWINDOW_INITIALIZER; } else { filterGetRangeNum(prev, &num); - if (num > 1) { - qError("only one time range accepted, num:%d", num); - FLT_ERR_JRET(TSDB_CODE_QRY_INVALID_TIME_CONDITION); - } FLT_CHK_JMP(num < 1); + if (num > 1) { + *isStrict = false; + qDebug("more than one time range, num:%d", num); + } + SFilterRange tra; filterGetRangeRes(prev, &tra); win->skey = tra.s; @@ -3401,6 +3405,30 @@ _return: } +int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) { + SFilterInfo *info = NULL; + int32_t code = 0; + + *isStrict = true; + + FLT_ERR_RET(filterInitFromNode(pNode, &info, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP)); + + if (info->scalarMode) { + *win = TSWINDOW_INITIALIZER; + *isStrict = false; + goto _return; + } + + FLT_ERR_JRET(filterGetTimeRangeImpl(info, win, isStrict)); + +_return: + + filterFreeInfo(info); + + FLT_RET(code); +} + + int32_t filterConverNcharColumns(SFilterInfo* info, int32_t rows, bool *gotNchar) { if (FILTER_EMPTY_RES(info) || FILTER_ALL_RES(info)) { return TSDB_CODE_SUCCESS; diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index 1e6978d889..26ef5dbd44 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -241,6 +241,7 @@ TEST(timerangeTest, greater) { bool isStrict = false; int32_t code = filterGetTimeRange(opNode1, &win, &isStrict); ASSERT_EQ(code, 0); + ASSERT_EQ(isStrict, true); ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.ekey, INT64_MAX); //filterFreeInfo(filter); @@ -270,6 +271,7 @@ TEST(timerangeTest, greater_and_lower) { STimeWindow win = {0}; bool isStrict = false; int32_t code = filterGetTimeRange(logicNode, &win, &isStrict); + ASSERT_EQ(isStrict, true); ASSERT_EQ(code, 0); ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.ekey, tbig); @@ -277,6 +279,56 @@ TEST(timerangeTest, greater_and_lower) { nodesDestroyNode(logicNode); } +TEST(timerangeTest, greater_and_lower_not_strict) { + SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL; + bool eRes[5] = {false, false, true, true, true}; + SScalarParam res = {0}; + int64_t tsmall1 = 222, tbig1 = 333; + int64_t tsmall2 = 444, tbig2 = 555; + SNode *list[2] = {0}; + + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall1); + flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig1); + flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + list[0] = opNode1; + list[1] = opNode2; + + flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_AND, list, 2); + + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall2); + flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig2); + flttMakeOpNode(&opNode2, OP_TYPE_LOWER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval); + list[0] = opNode1; + list[1] = opNode2; + + flttMakeLogicNode(&logicNode2, LOGIC_COND_TYPE_AND, list, 2); + + list[0] = logicNode1; + list[1] = logicNode2; + flttMakeLogicNode(&logicNode1, LOGIC_COND_TYPE_OR, list, 2); + + //SFilterInfo *filter = NULL; + //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); + //ASSERT_EQ(code, 0); + STimeWindow win = {0}; + bool isStrict = false; + int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict); + ASSERT_EQ(isStrict, false); + ASSERT_EQ(code, 0); + ASSERT_EQ(win.skey, tsmall1); + ASSERT_EQ(win.ekey, tbig2); + //filterFreeInfo(filter); + nodesDestroyNode(logicNode1); +} + + + TEST(columnTest, smallint_column_greater_double_value) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5]= {1, 2, 3, 4, 5}; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 1675f91330..a2c767ac6c 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1235,6 +1235,22 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +int32_t schGetTaskFromTaskList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask) { + int32_t s = taosHashGetSize(pTaskList); + if (s <= 0) { + return TSDB_CODE_SUCCESS; + } + + SSchTask **task = taosHashGet(pTaskList, &taskId, sizeof(taskId)); + if (NULL == task || NULL == (*task)) { + return TSDB_CODE_SUCCESS; + } + + *pTask = *task; + + return TSDB_CODE_SUCCESS; +} + int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode) { int32_t code = 0; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; @@ -1247,19 +1263,21 @@ int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } - int32_t s = taosHashGetSize(pJob->execTasks); - if (s <= 0) { - SCH_JOB_ELOG("empty execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + schGetTaskFromTaskList(pJob->execTasks, pParam->taskId, &pTask); + if (NULL == pTask) { + if (TDMT_VND_EXPLAIN_RSP == msgType) { + schGetTaskFromTaskList(pJob->succTasks, pParam->taskId, &pTask); + } else { + SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } + } + + if (NULL == pTask) { + SCH_JOB_ELOG("task not found in execList & succList, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); - if (NULL == task || NULL == (*task)) { - SCH_JOB_ELOG("task not found in execTask list, refId:%" PRIx64 ", taskId:%" PRIx64, pParam->refId, pParam->taskId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); - } - - pTask = *task; SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); SCH_SET_TASK_HANDLE(pTask, pMsg->handle);