diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 0e22985059..fd0521af3b 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -93,6 +93,7 @@ extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in // query client extern int32_t tsQueryPolicy; +extern int32_t tsQueryRspPolicy; extern int32_t tsQuerySmaOptimize; extern int32_t tsQueryRsmaTolerance; extern bool tsQueryPlannerTrace; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 777fe1c4df..7d95e840e9 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -54,6 +54,9 @@ typedef enum { #define QUERY_POLICY_QNODE 3 #define QUERY_POLICY_CLIENT 4 +#define QUERY_RSP_POLICY_DELAY 0 +#define QUERY_RSP_POLICY_QUICK 1 + typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7b437e4769..940dd745c8 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1036,30 +1036,38 @@ static int32_t asyncExecSchQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaDat pRequest->type = pQuery->msgType; SArray* pMnodeList = taosArrayInit(4, sizeof(SQueryNodeLoad)); + SPlanContext cxt = {.queryId = pRequest->requestId, - .acctId = pRequest->pTscObj->acctId, - .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), - .pAstRoot = pQuery->pRoot, - .showRewrite = pQuery->showRewrite, - .pMsg = pRequest->msgBuf, - .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, - .pUser = pRequest->pTscObj->user, - .sysInfo = pRequest->pTscObj->sysInfo, - .allocatorId = pRequest->allocatorRefId}; - SQueryPlan* pDag = NULL; - int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); - if (code) { - tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), - pRequest->requestId); - } else { - pRequest->body.subplanNum = pDag->numOfSubplans; + .acctId = pRequest->pTscObj->acctId, + .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), + .pAstRoot = pQuery->pRoot, + .showRewrite = pQuery->showRewrite, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE, + .pUser = pRequest->pTscObj->user, + .sysInfo = pRequest->pTscObj->sysInfo, + .allocatorId = pRequest->allocatorRefId}; + + SAppInstInfo* pAppInfo = getAppInfo(pRequest); + SQueryPlan* pDag = NULL; + + int64_t st = taosGetTimestampUs(); + int32_t code = qCreateQueryPlan(&cxt, &pDag, pMnodeList); + if (code) { + tscError("0x%" PRIx64 " failed to create query plan, code:%s 0x%" PRIx64, pRequest->self, tstrerror(code), + pRequest->requestId); + } else { + pRequest->body.subplanNum = pDag->numOfSubplans; + } + + pRequest->metric.planEnd = taosGetTimestampUs(); + if (code == TSDB_CODE_SUCCESS) { + tscDebug("0x%" PRIx64 " create query plan success, elapsed time:%.2f ms, 0x%" PRIx64, pRequest->self, + (pRequest->metric.planEnd - st)/1000.0, pRequest->requestId); } - - pRequest->metric.planEnd = taosGetTimestampUs(); - - if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { - SArray* pNodeList = NULL; - buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); + if (TSDB_CODE_SUCCESS == code && !pRequest->validateOnly) { + SArray* pNodeList = NULL; + buildAsyncExecNodeList(pRequest, &pNodeList, pMnodeList, pResultMeta); SRequestConnInfo conn = {.pTrans = getAppInfo(pRequest)->pTransporter, .requestId = pRequest->requestId, diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index e69174476a..21a52a4b57 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -52,15 +52,15 @@ void printResult(TAOS_RES* pRes) { int32_t n = 0; char str[512] = {0}; while ((pRow = taos_fetch_row(pRes)) != NULL) { - int32_t* length = taos_fetch_lengths(pRes); - for (int32_t i = 0; i < numOfFields; ++i) { - printf("(%d):%d ", i, length[i]); - } - printf("\n"); - - int32_t code = taos_print_row(str, pRow, pFields, numOfFields); - printf("%s\n", str); - memset(str, 0, sizeof(str)); +// int32_t* length = taos_fetch_lengths(pRes); +// for(int32_t i = 0; i < numOfFields; ++i) { +// printf("(%d):%d " , i, length[i]); +// } +// printf("\n"); +// +// int32_t code = taos_print_row(str, pRow, pFields, numOfFields); +// printf("%s\n", str); +// memset(str, 0, sizeof(str)); } } @@ -102,17 +102,6 @@ void queryCallback(void* param, void* res, int32_t code) { taos_fetch_raw_block_a(res, fetchCallback, param); } -void queryCallback1(void* param, void* res, int32_t code) { - if (code != TSDB_CODE_SUCCESS) { - printf("failed to execute, reason:%s\n", taos_errstr(res)); - } - - taos_free_result(res); - - printf("exec query:\n"); - taos_query_a(param, "select * from tm1", queryCallback, param); -} - void createNewTable(TAOS* pConn, int32_t index) { char str[1024] = {0}; sprintf(str, "create table tu%d using st2 tags(%d)", index, index); @@ -123,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for (int32_t i = 0; i < 3280; i += 20) { + for(int32_t i = 0; i < 10000; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -141,10 +130,49 @@ void createNewTable(TAOS* pConn, int32_t index) { taos_free_result(p); } } + +void *queryThread(void *arg) { + TAOS* pConn = taos_connect("192.168.0.209", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + printf("failed to connect to db, reason:%s", taos_errstr(pConn)); + return NULL; + } + + int64_t el = 0; + + for (int32_t i = 0; i < 5000000; ++i) { + int64_t st = taosGetTimestampUs(); + TAOS_RES* pRes = taos_query(pConn, + "SELECT _wstart as ts,max(usage_user) FROM benchmarkcpu.host_49 WHERE ts >= 1451618560000 AND ts < 1451622160000 INTERVAL(1m) ;"); + if (taos_errno(pRes) != 0) { + printf("failed, reason:%s\n", taos_errstr(pRes)); + } else { + printResult(pRes); + } + + taos_free_result(pRes); + el += (taosGetTimestampUs() - st); + if (i % 1000 == 0 && i != 0) { + printf("total:%d, avg time:%.2fms\n", i, el/(double)(i*1000)); + } + } + + taos_close(pConn); + return NULL; +} + +static int32_t numOfThreads = 1; } // namespace int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); + if (argc > 1) { + numOfThreads = atoi(argv[1]); + } + + numOfThreads = TMAX(numOfThreads, 1); + printf("the runing threads is:%d", numOfThreads); + return RUN_ALL_TESTS(); } @@ -664,7 +692,6 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } -#endif TEST(testCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -676,13 +703,14 @@ TEST(testCase, projection_query_tables) { // } // taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc1"); + TAOS_RES* pRes = taos_query(pConn, "use benchmarkcpu"); taos_free_result(pRes); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); if (taos_errno(pRes) != 0) { printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); } + taos_free_result(pRes); pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)"); @@ -722,6 +750,16 @@ TEST(testCase, projection_query_tables) { taos_free_result(pRes); taos_close(pConn); } +#endif + +TEST(testCase, tsbs_perf_test) { + TdThread qid[20] = {0}; + + for(int32_t i = 0; i < numOfThreads; ++i) { + taosThreadCreate(&qid[i], NULL, queryThread, NULL); + } + getchar(); +} #if 0 TEST(testCase, projection_query_stables) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 88df54a9ea..56daa4075a 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -90,6 +90,7 @@ bool tsSmlDataFormat = false; // query int32_t tsQueryPolicy = 1; +int32_t tsQueryRspPolicy = 0; int32_t tsQuerySmaOptimize = 0; int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. bool tsQueryPlannerTrace = false; @@ -350,6 +351,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1; if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, 0) != 0) return -1; if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1; @@ -728,6 +730,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsMonitorPort = (uint16_t)cfgGetItem(pCfg, "monitorPort")->i32; tsMonitorMaxLogs = cfgGetItem(pCfg, "monitorMaxLogs")->i32; tsMonitorComp = cfgGetItem(pCfg, "monitorComp")->bval; + tsQueryRspPolicy = cfgGetItem(pCfg, "queryRspPolicy")->i32; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index d2b071ec61..849d3ef390 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -72,7 +72,7 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) { mndPostProcessQueryMsg(pMsg); } - dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); + dGTrace("msg:%p is freed, code:%s", pMsg, tstrerror(code)); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 03d4a975f2..09a46b59bb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -543,6 +543,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead pMTree->pLoadInfo = pBlockLoadInfo; pMTree->destroyLoadInfo = destroyLoadInfo; + ASSERT(pMTree->pLoadInfo != NULL); for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file struct SLDataIter *pIter = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index a7850f3114..6b706901ba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3395,19 +3395,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl goto _err; } - code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr); - if (code != TSDB_CODE_SUCCESS) { - goto _err; - } - - if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { - code = doOpenReaderImpl(pReader); + if (numOfTables > 0) { + code = tsdbTakeReadSnap(pReader->pTsdb, &pReader->pReadSnap, pReader->idStr); if (code != TSDB_CODE_SUCCESS) { - return code; + goto _err; } - } else { - STsdbReader* pPrevReader = pReader->innerReader[0]; - STsdbReader* pNextReader = pReader->innerReader[1]; + + if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) { + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + } else { + STsdbReader* pPrevReader = pReader->innerReader[0]; + STsdbReader* pNextReader = pReader->innerReader[1]; // we need only one row pPrevReader->capacity = 1; @@ -3422,19 +3423,20 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl pNextReader->pMemSchema = pReader->pMemSchema; pNextReader->pReadSnap = pReader->pReadSnap; - code = doOpenReaderImpl(pPrevReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doOpenReaderImpl(pPrevReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - code = doOpenReaderImpl(pNextReader); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doOpenReaderImpl(pNextReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - code = doOpenReaderImpl(pReader); - if (code != TSDB_CODE_SUCCESS) { - return code; + code = doOpenReaderImpl(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } } @@ -3513,6 +3515,10 @@ void tsdbReaderClose(STsdbReader* pReader) { taosMemoryFree(pLReader); } + if (pReader->innerReader[0] != 0) { + tsdbUntakeReadSnap(pReader->innerReader[0]->pTsdb, pReader->innerReader[0]->pReadSnap, pReader->idStr); + } + tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 " SMA-time:%.2f ms, fileBlocks:%" PRId64 diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d6e0de0eb1..fc996a6003 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -212,6 +212,7 @@ typedef struct SExprSupp { int32_t numOfExprs; // the number of scalar expression in group operator SqlFunctionCtx* pCtx; int32_t* rowEntryInfoOffset; // offset value for each row result cell info + SFilterInfo* pFilterInfo; } SExprSupp; typedef struct SOperatorInfo { @@ -926,7 +927,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); void doSetOperatorCompleted(SOperatorInfo* pOperator); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo); +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, SExprInfo* pPseudoExpr, int32_t numOfPseudoExpr, SSDataBlock* pBlock, const char* idStr); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8c1104f519..ea01b961b1 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1113,15 +1113,24 @@ void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numO static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoData* p, bool keep, int32_t status); -void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo) { +void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColMatchInfo, SFilterInfo* pFilterInfo) { if (pFilterNode == NULL || pBlock->info.rows == 0) { return; } - SFilterInfo* filter = NULL; + SFilterInfo* filter = pFilterInfo; + int64_t st = taosGetTimestampUs(); + +// pError("start filter"); // todo move to the initialization function - int32_t code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + int32_t code = 0; + bool needFree = false; + if (filter == NULL) { + needFree = true; + code = filterInitFromNode((SNode*)pFilterNode, &filter, 0); + } + SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; code = filterSetDataFromSlotId(filter, ¶m1); @@ -1130,7 +1139,10 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, const SArray* pColM // todo the keep seems never to be True?? bool keep = filterExecute(filter, pBlock, &p, NULL, param1.numOfCols, &status); - filterFreeInfo(filter); + + if (needFree) { + filterFreeInfo(filter); + } extractQualifiedTupleByFilterResult(pBlock, p, keep, status); @@ -2479,7 +2491,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - doFilter(pAggInfo->pCondition, pInfo->pRes, NULL); + doFilter(pAggInfo->pCondition, pInfo->pRes, NULL, NULL); if (!hasRemainResults(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -2873,7 +2885,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo); + doFilter(pInfo->pCondition, fillResult, pInfo->pColMatchColInfo, NULL); if (fillResult->info.rows > 0) { break; } @@ -3049,6 +3061,12 @@ void cleanupExprSupp(SExprSupp* pSupp) { destroyExprInfo(pSupp->pExprInfo, pSupp->numOfExprs); taosMemoryFreeClear(pSupp->pExprInfo); } + + if (pSupp->pFilterInfo != NULL) { + filterFreeInfo(pSupp->pFilterInfo); + pSupp->pFilterInfo = NULL; + } + taosMemoryFree(pSupp->rowEntryInfoOffset); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b0bb9e42b9..e62f72c8f8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -311,7 +311,7 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { SSDataBlock* pRes = pInfo->binfo.pRes; while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pRes, NULL); + doFilter(pInfo->pCondition, pRes, NULL, NULL); if (!hasRemainResults(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index b4aa6de0a8..53cfa6c27a 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -387,7 +387,7 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } if (pJoinInfo->pCondAfterMerge != NULL) { - doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL); + doFilter(pJoinInfo->pCondAfterMerge, pRes, NULL, NULL); } if (pRes->info.rows >= pOperator->resultInfo.threshold) { break; diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index e9e6fed66a..fe6ee4129a 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -315,7 +315,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } // do apply filter - doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL); + doFilter(pProjectInfo->pFilterNode, pFinalRes, NULL, NULL); // when apply the limit/offset for each group, pRes->info.rows may be 0, due to limit constraint. if (pFinalRes->info.rows > 0 || (pOperator->status == OP_EXEC_DONE)) { @@ -325,7 +325,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { } else { // do apply filter if (pRes->info.rows > 0) { - doFilter(pProjectInfo->pFilterNode, pRes, NULL); + doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); if (pRes->info.rows == 0) { continue; } @@ -518,7 +518,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { } } - doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL); + doFilter(pIndefInfo->pCondition, pInfo->pRes, NULL, NULL); size_t rows = pInfo->pRes->info.rows; if (rows > 0 || pOperator->status == OP_EXEC_DONE) { break; @@ -620,7 +620,7 @@ SSDataBlock* doGenerateSourceData(SOperatorInfo* pOperator) { } pRes->info.rows = 1; - doFilter(pProjectInfo->pFilterNode, pRes, NULL); + doFilter(pProjectInfo->pFilterNode, pRes, NULL, NULL); /*int32_t status = */ doIngroupLimitOffset(&pProjectInfo->limitInfo, 0, pRes, pOperator); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7ede06e6ca..c0c38e2b30 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -385,7 +385,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampUs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, pOperator->exprSupp.pFilterInfo); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; @@ -754,6 +754,11 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->dataBlockLoadFlag = pTableScanNode->dataRequired; pInfo->pResBlock = createResDataBlock(pDescNode); pInfo->pFilterNode = pTableScanNode->scan.node.pConditions; + + if (pInfo->pFilterNode != NULL) { + code = filterInitFromNode((SNode*)pInfo->pFilterNode, &pOperator->exprSupp.pFilterInfo, 0); + } + pInfo->scanFlag = MAIN_SCAN; pInfo->pColMatchInfo = pColList; pInfo->currentGroupId = -1; @@ -1123,7 +1128,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32 return NULL; } - doFilter(pInfo->pCondition, pResult, NULL); + doFilter(pInfo->pCondition, pResult, NULL, NULL); if (pResult->info.rows == 0) { continue; } @@ -1474,7 +1479,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock } if (filter) { - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); } blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); blockDataFreeRes((SSDataBlock*)pBlock); @@ -1896,7 +1901,7 @@ FETCH_NEXT_BLOCK: } } - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { @@ -2382,7 +2387,7 @@ static SSDataBlock* doFilterResult(SSysTableScanInfo* pInfo) { return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } - doFilter(pInfo->pCondition, pInfo->pRes, NULL); + doFilter(pInfo->pCondition, pInfo->pRes, NULL, NULL); return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes; } @@ -3455,7 +3460,7 @@ static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeSc if (pTableScanInfo->pFilterNode != NULL) { int64_t st = taosGetTimestampMs(); - doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo); + doFilter(pTableScanInfo->pFilterNode, pBlock, pTableScanInfo->pColMatchInfo, NULL); double el = (taosGetTimestampUs() - st) / 1000.0; pTableScanInfo->readRecorder.filterTime += el; diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index ed3247e10e..c651529c7a 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -216,7 +216,7 @@ SSDataBlock* doSort(SOperatorInfo* pOperator) { return NULL; } - doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo); + doFilter(pInfo->pCondition, pBlock, pInfo->pColMatchInfo, NULL); if (blockDataGetNumOfRows(pBlock) == 0) { continue; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 5be9918185..88af03775c 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1501,7 +1501,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { } doStreamFillImpl(pOperator); - doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo); + doFilter(pInfo->pCondition, pInfo->pRes, pInfo->pColMatchColInfo, NULL); pOperator->resultInfo.totalRows += pInfo->pRes->info.rows; if (pInfo->pRes->info.rows > 0) { break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index df16914cb8..c1478bc935 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1277,7 +1277,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -1315,7 +1315,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBlock, NULL); + doFilter(pInfo->pCondition, pBlock, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -2019,7 +2019,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -2062,7 +2062,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); while (1) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - doFilter(pInfo->pCondition, pBInfo->pRes, NULL); + doFilter(pInfo->pCondition, pBInfo->pRes, NULL, NULL); bool hasRemain = hasRemainResults(&pInfo->groupResInfo); if (!hasRemain) { @@ -5251,7 +5251,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { setInputDataBlock(pOperator, pSup->pCtx, pBlock, pIaInfo->inputOrder, scanFlag, true); doMergeAlignedIntervalAggImpl(pOperator, &pIaInfo->binfo.resultRowInfo, pBlock, pRes); - doFilter(pMiaInfo->pCondition, pRes, NULL); + doFilter(pMiaInfo->pCondition, pRes, NULL, NULL); if (pRes->info.rows >= pOperator->resultInfo.capacity) { break; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 626e78b4d3..36c6817595 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -123,6 +123,7 @@ typedef struct SQWTaskCtx { int32_t execId; int32_t level; + bool queryGotData; bool queryRsped; bool queryEnd; bool queryContinue; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9115c2d3aa..d2c16a1cb5 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -10,6 +10,7 @@ #include "tdatablock.h" #include "tmsg.h" #include "tname.h" +#include "tglobal.h" SQWorkerMgmt gQwMgmt = { .lock = 0, @@ -92,6 +93,19 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } +int32_t qwSendQueryRsp(QW_FPARAMS_DEF, int32_t msgType, SQWTaskCtx *ctx, int32_t rspCode, bool quickRsp) { + if ((!quickRsp) || QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy) { + if (!ctx->localExec) { + qwBuildAndSendQueryRsp(msgType, &ctx->ctrlConnInfo, rspCode, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, rspCode, tstrerror(rspCode)); + } + + ctx->queryRsped = true; + } + + return TSDB_CODE_SUCCESS; +} + int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { int32_t code = 0; bool qcontinue = true; @@ -144,7 +158,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (numOfResBlock == 0) { QW_TASK_DLOG("qExecTask end with empty res, useconds:%" PRIu64, useconds); } else { - QW_TASK_DLOG("qExecTask done", ""); + QW_TASK_DLOG("qExecTask done, useconds:%" PRIu64, useconds); } dsEndPut(sinkHandle, useconds); @@ -234,6 +248,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, int32_t code = 0; SOutputData output = {0}; + if (NULL == ctx->sinkHandle) { + return TSDB_CODE_SUCCESS; + } + *dataLen = 0; while (true) { @@ -407,6 +425,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } + if (ctx->rspCode) { + QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase)); + QW_ERR_JRET(ctx->rspCode); + } + if (!ctx->queryRsped) { QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); @@ -419,6 +442,11 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } + if (ctx->rspCode) { + QW_TASK_ELOG("task already failed cause of %s, phase:%s", tstrerror(ctx->rspCode), qwPhaseStr(phase)); + QW_ERR_JRET(ctx->rspCode); + } + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -499,21 +527,17 @@ _return: if (TSDB_CODE_SUCCESS == code && QW_PHASE_POST_QUERY == phase) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); + ctx->queryGotData = true; } - if (QW_PHASE_POST_QUERY == phase && ctx) { - if (!ctx->localExec) { - bool rsped = false; - SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); - qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); - if (!rsped) { - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - } + if (QW_PHASE_POST_QUERY == phase && ctx && !ctx->queryRsped) { + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + if (!rsped) { + qwSendQueryRsp(QW_FPARAMS(), input->msgType + 1, ctx, code, false); } - - ctx->queryRsped = true; } if (ctx) { @@ -551,6 +575,7 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); ctx->ctrlConnInfo = qwMsg->connInfo; + ctx->phase = -1; QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); @@ -604,6 +629,8 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } + qwSendQueryRsp(QW_FPARAMS(), qwMsg->msgType + 1, ctx, code, true); + ctx->level = plan->level; atomic_store_ptr(&ctx->taskHandle, pTaskInfo); atomic_store_ptr(&ctx->sinkHandle, sinkHandle); @@ -619,6 +646,31 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); + if (ctx != NULL && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + void *rsp = NULL; + int32_t dataLen = 0; + SOutputData sOutput = {0}; + QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); + + if (rsp) { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + } + + qwMsg->connInfo = ctx->dataConnInfo; + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + + qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code); + rsp = NULL; + + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, + tstrerror(code), dataLen); + } + } + QW_RET(TSDB_CODE_SUCCESS); } @@ -740,7 +792,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = true; // RC WARNING - if (QW_QUERY_RUNNING(ctx)) { + if (-1 == ctx->phase || false == ctx->queryGotData) { + QW_TASK_DLOG_E("task query unfinished"); + } else if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 2678a652a9..33bf50d853 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -913,7 +913,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schSetTaskCandidateAddrs(pJob, pTask)); if (SCH_IS_QUERY_JOB(pJob)) { - SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); +// SCH_ERR_RET(schEnsureHbConnection(pJob, pTask)); } SCH_RET(schBuildAndSendMsg(pJob, pTask, NULL, plan->msgType)); @@ -993,6 +993,12 @@ int32_t schLaunchTaskImpl(void *param) { SCH_ERR_JRET(schLaunchRemoteTask(pJob, pTask)); } +#if 0 + if (SCH_IS_QUERY_JOB(pJob)) { + SCH_ERR_JRET(schEnsureHbConnection(pJob, pTask)); + } +#endif + _return: if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 813a11c056..c4b45649a4 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -155,8 +155,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { return; } - SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", errCode:0x%x", *jobId, errCode); - + SCH_JOB_DLOG("start to free job 0x%" PRIx64 ", code:%s", *jobId, tstrerror(errCode)); schHandleJobDrop(pJob, errCode); schReleaseJob(*jobId); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 83323725be..359b9824cf 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -597,6 +597,7 @@ static int32_t specifyConnRef(SCliConn* conn, bool update, int64_t handle) { static void cliAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { SCliConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; + tDebug("%s conn %p alloc read buf", CONN_GET_INST_LABEL(conn), conn); transAllocBuffer(pBuf, buf); } static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { @@ -609,7 +610,7 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { if (nread > 0) { pBuf->len += nread; while (transReadComplete(pBuf)) { - tTrace("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); + tDebug("%s conn %p read complete", CONN_GET_INST_LABEL(conn), conn); if (pBuf->invalid) { cliHandleExcept(conn); break; diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index 42079ff005..f843c8559c 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -203,7 +203,7 @@ bool transReadComplete(SConnBuffer* connBuf) { } int transSetConnOption(uv_tcp_t* stream) { - uv_tcp_nodelay(stream, 1); + uv_tcp_nodelay(stream, 0); int ret = uv_tcp_keepalive(stream, 5, 60); return ret; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 57bf8a34b3..3616b4808f 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -332,7 +332,10 @@ void uvOnSendCb(uv_write_t* req, int status) { if (status == 0) { tTrace("conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { - SSvrMsg* msg = transQueuePop(&conn->srvMsgs); + SSvrMsg* msg = transQueuePop(&conn->srvMsgs); + STraceId* trace = &msg->msg.info.traceId; + tGDebug("conn %p write data out", conn); + destroySmsg(msg); // send cached data if (!transQueueEmpty(&conn->srvMsgs)) {