diff --git a/source/libs/executor/inc/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h similarity index 80% rename from source/libs/executor/inc/dataSinkMgt.h rename to include/libs/executor/dataSinkMgt.h index d13423b25d..a0819fcf85 100644 --- a/source/libs/executor/inc/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -21,13 +21,11 @@ extern "C" { #endif #include "os.h" -#include "executorimpl.h" +#include "thash.h" -#define DS_CAPACITY_ENOUGH 1 -#define DS_CAPACITY_FULL 2 -#define DS_NEED_SCHEDULE 3 -#define DS_END 4 -#define DS_IN_PROCESS 5 +#define DS_BUF_LOW 1 +#define DS_BUF_FULL 2 +#define DS_BUF_EMPTY 3 struct SDataSink; struct SSDataBlock; @@ -42,15 +40,20 @@ int32_t dsDataSinkMgtInit(SDataSinkMgtCfg *cfg); typedef void* DataSinkHandle; typedef struct SInputData { - const SSDataBlock* pData; + const struct SSDataBlock* pData; SHashObj* pTableRetrieveTsMap; } SInputData; -typedef struct SOutPutData { +typedef struct SOutputData { int32_t numOfRows; int8_t compressed; char* pData; -} SOutPutData; + bool queryEnd; + bool needSchedule; + int32_t bufStatus; + int64_t useconds; + int8_t precision; +} SOutputData; /** * Create a subplan's datasinker handle for all later operations. @@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH * @param pRes * @return error code */ -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -void dsEndPut(DataSinkHandle handle); +void dsEndPut(DataSinkHandle handle, int64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle - * @return data length + * @param pLen data length */ -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); /** * Get data, the caller needs to allocate data memory. @@ -84,7 +87,7 @@ int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus); * @param pStatus output * @return error code */ -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput); /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 1bbf5494dd..69727626af 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -31,10 +31,10 @@ typedef struct SDataSinkManager { pthread_mutex_t mutex; } SDataSinkManager; -typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); -typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); -typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef struct SDataSinkHandle { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 3d8e51d04d..e4b0557bff 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -19,6 +19,7 @@ #include "tcompression.h" #include "tglobal.h" #include "tqueue.h" +#include "executorimpl.h" #define DATA_META_LENGTH(tables) (sizeof(int32_t) + sizeof(STableIdInfo) * taosHashGetSize(tables) + sizeof(SRetrieveTableRsp)) @@ -42,6 +43,8 @@ typedef struct SDataDispatchHandle { STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; + bool queryEnd; + int64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -124,7 +127,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_CAPACITY_ENOUGH : DS_CAPACITY_FULL; + int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); + int32_t status = (0 == blockNums ? DS_BUF_EMPTY : + (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -137,7 +142,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { return status; } -static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { @@ -145,38 +150,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pStatus = updateStatus(pDispatcher); + *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle) { +static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_END; + pDispatcher->queryEnd = true; + pDispatcher->useconds = useconds; pthread_mutex_unlock(&pDispatcher->mutex); } -static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { +static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = getStatus(pDispatcher) ? DS_END : DS_IN_PROCESS; - return 0; + *pQueryEnd = pDispatcher->queryEnd; + *pLen = 0; + return; } SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); - return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } -static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->compressed = pEntry->compressed; tfree(pDispatcher->nextOutput.pData); // todo persistent - *pStatus = updateStatus(pDispatcher); + pOutput->bufStatus = updateStatus(pDispatcher); + pthread_mutex_lock(&pDispatcher->mutex); + pOutput->queryEnd = pDispatcher->queryEnd; + pOutput->needSchedule = false; + pOutput->useconds = pDispatcher->useconds; + pOutput->precision = pDispatcher->schema.precision; + pthread_mutex_unlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } @@ -205,7 +218,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; dispatcher->schema = pDataSink->schema; - dispatcher->status = DS_CAPACITY_ENOUGH; + dispatcher->status = DS_BUF_EMPTY; + dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 8a96c5d05f..e3f0cd7eaa 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH return TSDB_CODE_FAILED; } -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); + return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle) { +void dsEndPut(DataSinkHandle handle, int64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fEndPut(pHandleImpl); + return pHandleImpl->fEndPut(pHandleImpl, useconds); } -int32_t dsGetDataLength(DataSinkHandle handle, int32_t* pStatus) { +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetLen(pHandleImpl, pStatus); + pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); } -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); + return pHandleImpl->fGetData(pHandleImpl, pOutput); } void dsScheduleProcess(void* ahandle, void* pItem) {