diff --git a/source/libs/executor/inc/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h similarity index 82% rename from source/libs/executor/inc/dataSinkMgt.h rename to include/libs/executor/dataSinkMgt.h index 6c0cda3d0a..df166ade40 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -24,11 +24,9 @@ extern "C" { #include "executor.h" #include "executorimpl.h" -#define DS_CAPACITY_ENOUGH 1 -#define DS_DATA_FULL 2 -#define DS_NEED_SCHEDULE 3 -#define DS_QUERY_END 4 -#define DS_IN_PROCESS 5 +#define DS_BUF_LOW 1 +#define DS_BUF_FULL 2 +#define DS_BUF_EMPTY 3 struct SDataSink; struct SSDataBlock; @@ -45,11 +43,16 @@ typedef struct SInputData { SHashObj* pTableRetrieveTsMap; } SInputData; -typedef struct SOutPutData { +typedef struct SOutputData { int32_t numOfRows; int8_t compressed; char* pData; -} SOutPutData; + bool queryEnd; + bool needSchedule; + int32_t bufStatus; + int64_t useconds; + int8_t precision; +} SOutputData; /** * Create a subplan's datasinker handle for all later operations. @@ -65,20 +68,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH * @param pRes * @return error code */ -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -/** - * - * @param handle - */ -void dsEndPut(DataSinkHandle handle); +void dsEndPut(DataSinkHandle handle, int64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle - * @return data length + * @param pLen data length */ -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); /** * Get data, the caller needs to allocate data memory. @@ -87,7 +86,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); * @param pStatus output * @return error code */ -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput); /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 1bbf5494dd..69727626af 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -31,10 +31,10 @@ typedef struct SDataSinkManager { pthread_mutex_t mutex; } SDataSinkManager; -typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); -typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); -typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef struct SDataSinkHandle { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 5fa35edb06..954ed6daf1 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -42,6 +42,8 @@ typedef struct SDataDispatchHandle { STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; + bool queryEnd; + int64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -124,7 +126,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_DATA_FULL; + int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); + int32_t status = (0 == blockNums ? DS_BUF_EMPTY : + (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -137,7 +141,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { return status; } -static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { @@ -145,38 +149,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pStatus = updateStatus(pDispatcher); + *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle) { +static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_QUERY_END; + pDispatcher->queryEnd = true; + pDispatcher->useconds = useconds; pthread_mutex_unlock(&pDispatcher->mutex); } -static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { +static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS; - return 0; + *pQueryEnd = pDispatcher->queryEnd; + *pLen = 0; + return; } SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); - return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } -static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->compressed = pEntry->compressed; tfree(pDispatcher->nextOutput.pData); // todo persistent - *pStatus = updateStatus(pDispatcher); + pOutput->bufStatus = updateStatus(pDispatcher); + pthread_mutex_lock(&pDispatcher->mutex); + pOutput->queryEnd = pDispatcher->queryEnd; + pOutput->needSchedule = false; + pOutput->useconds = pDispatcher->useconds; + pOutput->precision = pDispatcher->schema.precision; + pthread_mutex_unlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } @@ -205,7 +217,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; dispatcher->schema = pDataSink->schema; - dispatcher->status = DS_CAPACITY_ENOUGH; + dispatcher->status = DS_BUF_EMPTY; + dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index d9b122fbdd..80d99f96c6 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -32,24 +32,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH return TSDB_CODE_FAILED; } -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); + return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle) { +void dsEndPut(DataSinkHandle handle, int64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fEndPut(pHandleImpl); + return pHandleImpl->fEndPut(pHandleImpl, useconds); } -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) { +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetLen(pHandleImpl, pStatus); + pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); } -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); + return pHandleImpl->fGetData(pHandleImpl, pOutput); } void dsScheduleProcess(void* ahandle, void* pItem) {