refactor(query): tune sink param.

This commit is contained in:
Haojun Liao 2024-05-18 09:54:22 +08:00
parent 43bec6c00f
commit 937ded9f70
2 changed files with 4 additions and 2 deletions

View File

@ -268,6 +268,7 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _return; goto _return;
} }
dispatcher->sink.fPut = putDataBlock; dispatcher->sink.fPut = putDataBlock;
dispatcher->sink.fEndPut = endPut; dispatcher->sink.fEndPut = endPut;
dispatcher->sink.fReset = resetDispatcher; dispatcher->sink.fReset = resetDispatcher;
@ -275,12 +276,14 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
dispatcher->sink.fGetData = getDataBlock; dispatcher->sink.fGetData = getDataBlock;
dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->sink.fDestroy = destroyDataSinker;
dispatcher->sink.fGetCacheSize = getCacheSize; dispatcher->sink.fGetCacheSize = getCacheSize;
dispatcher->pManager = pManager; dispatcher->pManager = pManager;
dispatcher->pSchema = pDataSink->pInputDataBlockDesc; dispatcher->pSchema = pDataSink->pInputDataBlockDesc;
dispatcher->status = DS_BUF_EMPTY; dispatcher->status = DS_BUF_EMPTY;
dispatcher->queryEnd = false; dispatcher->queryEnd = false;
dispatcher->pDataBlocks = taosOpenQueue(); dispatcher->pDataBlocks = taosOpenQueue();
taosThreadMutexInit(&dispatcher->mutex, NULL); taosThreadMutexInit(&dispatcher->mutex, NULL);
if (NULL == dispatcher->pDataBlocks) { if (NULL == dispatcher->pDataBlocks) {
taosMemoryFree(dispatcher); taosMemoryFree(dispatcher);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -290,7 +293,6 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pD
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_return: _return:
taosMemoryFree(pManager); taosMemoryFree(pManager);
return terrno; return terrno;
} }

View File

@ -537,7 +537,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
} }
if (handle) { if (handle) {
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 5000, .maxDataBlockNumPerQuery = 500}; SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
void* pSinkManager = NULL; void* pSinkManager = NULL;
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager); code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {