From c449b7208acdec8656f03af90fce064192b57b9d Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Thu, 23 Jun 2022 16:52:06 +0800 Subject: [PATCH] fix: set correct operator blocking and add closing rpc trace to udfd --- source/libs/executor/src/sortoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 2 ++ source/libs/function/src/udfd.c | 3 ++- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 4b5ad7b123..81cf8d6527 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -460,7 +460,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "GroupSortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; - pOperator->blocking = true; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->exprSupp.pExprInfo = pExprInfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 19f0fd4ea7..7725a939d6 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4117,6 +4117,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI goto _error; } + miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; iaInfo->win = pTaskInfo->window; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 7fffa84e0b..f8713e4b47 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -613,7 +613,9 @@ int32_t udfdOpenClientRpc() { } int32_t udfdCloseClientRpc() { + fnInfo("udfd begin closing rpc"); rpcClose(global.clientRpc); + fnInfo("udfd finish closing rpc"); return 0; } @@ -937,7 +939,6 @@ int main(int argc, char *argv[]) { uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); udfdRun(); - removeListeningPipe(); uv_thread_join(&mnodeConnectThread); udfdCloseClientRpc();