diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 6470f38ced..b44cad1668 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -24,9 +24,9 @@ extern "C" { #include "executorimpl.h" #define DS_CAPACITY_ENOUGH 1 -#define DS_CAPACITY_FULL 2 +#define DS_DATA_FULL 2 #define DS_NEED_SCHEDULE 3 -#define DS_END 4 +#define DS_QUERY_END 4 #define DS_IN_PROCESS 5 struct SDataSink; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d8e51d04d..83f180cd1c 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -124,7 +124,8 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; + int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery + ? DS_CAPACITY_ENOUGH : DS_DATA_FULL; pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -152,14 +153,14 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, static void endPut(struct SDataSinkHandle* pHandle) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_END; + pDispatcher->status = DS_QUERY_END; pthread_mutex_unlock(&pDispatcher->mutex); } static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; + *pStatus = (DS_QUERY_END == getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS); return 0; } SDataDispatchBuf* pBuf = NULL;