diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5c65e95807..c424cb33fa 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -45,7 +45,7 @@ typedef struct SDataInserterHandle { SDataDeleterNode* pDeleter; SDeleterParam* pParam; STaosQueue* pDataBlocks; - SDataDeleterBuf nextOutput; + SDataInserterBuf nextOutput; int32_t status; bool queryEnd; uint64_t useconds; @@ -69,7 +69,7 @@ static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { return false; } -static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInput, SDataDeleterBuf* pBuf) { +static void toDataCacheEntry(SDataInserterHandle* pHandle, const SInputData* pInput, SDataInserterBuf* pBuf) { int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; @@ -98,7 +98,7 @@ static void toDataCacheEntry(SDataDeleterHandle* pHandle, const SInputData* pInp atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); } -static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDataDeleterBuf* pBuf) { +static bool allocBuf(SDataInserterHandle* pDeleter, const SInputData* pInput, SDataInserterBuf* pBuf) { uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery; if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) { qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, @@ -116,7 +116,7 @@ static bool allocBuf(SDataDeleterHandle* pDeleter, const SInputData* pInput, SDa return NULL != pBuf->pData; } -static int32_t updateStatus(SDataDeleterHandle* pDeleter) { +static int32_t updateStatus(SDataInserterHandle* pDeleter) { taosThreadMutexLock(&pDeleter->mutex); int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); int32_t status = @@ -127,7 +127,7 @@ static int32_t updateStatus(SDataDeleterHandle* pDeleter) { return status; } -static int32_t getStatus(SDataDeleterHandle* pDeleter) { +static int32_t getStatus(SDataInserterHandle* pDeleter) { taosThreadMutexLock(&pDeleter->mutex); int32_t status = pDeleter->status; taosThreadMutexUnlock(&pDeleter->mutex); @@ -135,8 +135,8 @@ static int32_t getStatus(SDataDeleterHandle* pDeleter) { } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { - SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; - SDataDeleterBuf* pBuf = taosAllocateQitem(sizeof(SDataDeleterBuf), DEF_QITEM); + SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; + SDataInserterBuf* pBuf = taosAllocateQitem(sizeof(SDataInserterBuf), DEF_QITEM); if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -147,7 +147,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { - SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; taosThreadMutexLock(&pDeleter->mutex); pDeleter->queryEnd = true; pDeleter->useconds = useconds; @@ -155,16 +155,16 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { } static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { - SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; if (taosQueueEmpty(pDeleter->pDataBlocks)) { *pQueryEnd = pDeleter->queryEnd; *pLen = 0; return; } - SDataDeleterBuf* pBuf = NULL; + SDataInserterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataDeleterBuf)); + memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataInserterBuf)); taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; *pQueryEnd = pDeleter->queryEnd; @@ -172,7 +172,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { - SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; if (NULL == pDeleter->nextOutput.pData) { assert(pDeleter->queryEnd); pOutput->useconds = pDeleter->useconds; @@ -202,11 +202,11 @@ static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { - SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; + SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); taosMemoryFreeClear(pDeleter->nextOutput.pData); while (!taosQueueEmpty(pDeleter->pDataBlocks)) { - SDataDeleterBuf* pBuf = NULL; + SDataInserterBuf* pBuf = NULL; taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); taosMemoryFreeClear(pBuf->pData); taosFreeQitem(pBuf); @@ -217,7 +217,7 @@ static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { } static int32_t getCacheSize(struct SDataSinkHandle* pHandle, uint64_t* size) { - SDataDeleterHandle* pDispatcher = (SDataDeleterHandle*)pHandle; + SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle; *size = atomic_load_64(&pDispatcher->cachedSize); return TSDB_CODE_SUCCESS;