diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index ea65db7779..022bd951b0 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -268,6 +268,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD terrno = TSDB_CODE_OUT_OF_MEMORY; goto _return; } + dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fEndPut = endPut; dispatcher->sink.fReset = resetDispatcher; @@ -275,12 +276,14 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fGetCacheSize = getCacheSize; + dispatcher->pManager = pManager; dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->status = DS_BUF_EMPTY; dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); taosThreadMutexInit(&dispatcher->mutex, NULL); + if (NULL == dispatcher->pDataBlocks) { taosMemoryFree(dispatcher); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -290,7 +293,6 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD return TSDB_CODE_SUCCESS; _return: - taosMemoryFree(pManager); return terrno; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 889a9b62e1..29f667cb66 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -537,7 +537,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, } if (handle) { - SDataSinkMgtCfg cfg = {.maxDataBlockNum = 5000, .maxDataBlockNumPerQuery = 500}; + SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50}; void* pSinkManager = NULL; code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); if (code != TSDB_CODE_SUCCESS) {