diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index fe95dc613e..8e3898fe2f 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -463,7 +463,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pInfo->pColMatchInfo = pColMatchColInfo; pOperator->name = "GroupSortOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT; - pOperator->blocking = true; + pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->exprSupp.pExprInfo = pExprInfo; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f365be906c..1a56eacda8 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4117,6 +4117,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SExprI goto _error; } + miaInfo->groupIntervalHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); + SIntervalAggOperatorInfo* iaInfo = &miaInfo->intervalAggOperatorInfo; iaInfo->win = pTaskInfo->window; diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index 7fffa84e0b..838071dbf1 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -613,7 +613,9 @@ int32_t udfdOpenClientRpc() { } int32_t udfdCloseClientRpc() { + fnInfo("udfd begin closing rpc"); rpcClose(global.clientRpc); + fnInfo("udfd finish closing rpc"); return 0; } @@ -937,9 +939,8 @@ int main(int argc, char *argv[]) { uv_thread_create(&mnodeConnectThread, udfdConnectMnodeThreadFunc, NULL); udfdRun(); - + removeListeningPipe(); - uv_thread_join(&mnodeConnectThread); udfdCloseClientRpc(); return 0;