From 5a72ca804b7432f3b4783c4fc1cb6f1e740747b2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 14 May 2022 23:03:02 +0800 Subject: [PATCH 1/2] fix(query): set the correct row index during generating join results. --- source/libs/executor/src/executorimpl.c | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2cbfb9b344..d08345a02f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5500,18 +5500,21 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { int32_t blockId = pExprInfo->base.pParam[0].pCol->dataBlockId; int32_t slotId = pExprInfo->base.pParam[0].pCol->slotId; + int32_t rowIndex = -1; SColumnInfoData* pSrc = NULL; if (pJoinInfo->pLeft->info.blockId == blockId) { pSrc = taosArrayGet(pJoinInfo->pLeft->pDataBlock, slotId); + rowIndex = pJoinInfo->leftPos; } else { pSrc = taosArrayGet(pJoinInfo->pRight->pDataBlock, slotId); + rowIndex = pJoinInfo->rightPos; } - if (colDataIsNull_s(pSrc, pJoinInfo->leftPos)) { + if (colDataIsNull_s(pSrc, rowIndex)) { colDataAppendNULL(pDst, nrows); } else { - char* p = colDataGetData(pSrc, pJoinInfo->leftPos); + char* p = colDataGetData(pSrc, rowIndex); colDataAppend(pDst, nrows, p, false); } } From 692ae29c6cdaada1191fcd7d3dd66482ccc3aa80 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 14 May 2022 23:39:10 +0800 Subject: [PATCH 2/2] fix: clean the udf handles cache when pipe error --- source/libs/function/src/tudf.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index f2a0b4ec2c..3b83531e9f 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -825,6 +825,11 @@ void onUdfcPipeClose(uv_handle_t *handle) { taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn); taosMemoryFree((uv_pipe_t *) handle); + + //clear the udf handles cache + uv_mutex_lock(&gUdfdProxy.udfStubsMutex); + taosArrayClear(gUdfdProxy.udfStubs); + uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); } int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { @@ -1140,7 +1145,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { return code; } -void udfClientAsyncCb(uv_async_t *async) { +void udfcAsyncTaskCb(uv_async_t *async) { SUdfcProxy *udfc = async->data; QUEUE wq; @@ -1204,7 +1209,7 @@ void constructUdfService(void *argsThread) { SUdfcProxy *udfc = (SUdfcProxy *)argsThread; uv_loop_init(&udfc->uvLoop); - uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfClientAsyncCb); + uv_async_init(&udfc->uvLoop, &udfc->loopTaskAync, udfcAsyncTaskCb); udfc->loopTaskAync.data = udfc; uv_async_init(&udfc->uvLoop, &udfc->loopStopAsync, udfStopAsyncCb); udfc->loopStopAsync.data = udfc; @@ -1472,6 +1477,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, return code; } +//TODO: when to teardown udf. teardown udf is not called int32_t doTeardownUdf(UdfcFuncHandle handle) { fnInfo("tear down udf. udf func handle: %p", handle);