fix: global data sink manager issue
This commit is contained in:
parent
b5bd8f7c23
commit
fe4e45647e
|
@ -224,6 +224,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
}
|
||||
taosCloseQueue(pDeleter->pDataBlocks);
|
||||
taosThreadMutexDestroy(&pDeleter->mutex);
|
||||
|
||||
taosMemoryFree(pDeleter->pManager);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -226,6 +226,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
}
|
||||
taosCloseQueue(pDispatcher->pDataBlocks);
|
||||
taosThreadMutexDestroy(&pDispatcher->mutex);
|
||||
taosMemoryFree(pDispatcher->pManager);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -395,6 +395,8 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
|
|||
taosMemoryFree(pInserter->pParam);
|
||||
taosHashCleanup(pInserter->pCols);
|
||||
taosThreadMutexDestroy(&pInserter->mutex);
|
||||
|
||||
taosMemoryFree(pInserter->pManager);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -18,12 +18,17 @@
|
|||
#include "planner.h"
|
||||
#include "tarray.h"
|
||||
|
||||
static SDataSinkManager gDataSinkManager = {0};
|
||||
SDataSinkStat gDataSinkStat = {0};
|
||||
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI) {
|
||||
gDataSinkManager.cfg = *cfg;
|
||||
gDataSinkManager.pAPI = pAPI;
|
||||
int32_t dsDataSinkMgtInit(SDataSinkMgtCfg* cfg, SStorageAPI* pAPI, void** ppSinkManager) {
|
||||
SDataSinkManager* pSinkManager = taosMemoryMalloc(sizeof(SDataSinkManager));
|
||||
if (NULL == pSinkManager) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pSinkManager->cfg = *cfg;
|
||||
pSinkManager->pAPI = pAPI;
|
||||
|
||||
*ppSinkManager = pSinkManager;
|
||||
return 0; // to avoid compiler eror
|
||||
}
|
||||
|
||||
|
@ -33,15 +38,16 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat* pStat) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t dsCreateDataSinker(const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
|
||||
int32_t dsCreateDataSinker(void* pSinkManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void* pParam, const char* id) {
|
||||
SDataSinkManager* pManager = pSinkManager;
|
||||
switch ((int)nodeType(pDataSink)) {
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
|
||||
return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle);
|
||||
return createDataDispatcher(pManager, pDataSink, pHandle);
|
||||
case QUERY_NODE_PHYSICAL_PLAN_DELETE: {
|
||||
return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam);
|
||||
return createDataDeleter(pManager, pDataSink, pHandle, pParam);
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: {
|
||||
return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam);
|
||||
return createDataInserter(pManager, pDataSink, pHandle, pParam);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -512,7 +512,8 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
}
|
||||
|
||||
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 500, .maxDataBlockNumPerQuery = 50};
|
||||
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI);
|
||||
void* pSinkManager = NULL;
|
||||
code = dsDataSinkMgtInit(&cfg, &(*pTask)->storageAPI, &pSinkManager);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to dsDataSinkMgtInit, code:%s, %s", tstrerror(code), (*pTask)->id.str);
|
||||
goto _error;
|
||||
|
@ -527,7 +528,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
|
|||
}
|
||||
|
||||
// pSinkParam has been freed during create sinker.
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
|
||||
code = dsCreateDataSinker(pSinkManager, pSubplan->pDataSink, handle, pSinkParam, (*pTask)->id.str);
|
||||
}
|
||||
|
||||
qDebug("subplan task create completed, TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, pSubplan->id.queryId);
|
||||
|
|
Loading…
Reference in New Issue