From 8a0da7bad296fbd3756c0de77edcec9c2d06aa02 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Tue, 11 Jan 2022 02:56:24 -0500 Subject: [PATCH] TD-12678 datasink interface adjust --- include/libs/executor/dataSinkMgt.h | 25 ++++++++------- source/libs/executor/inc/dataSinkInt.h | 8 ++--- source/libs/executor/src/dataDispatcher.c | 38 +++++++++++++++-------- source/libs/executor/src/dataSinkMgt.c | 16 +++++----- 4 files changed, 51 insertions(+), 36 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index b44cad1668..733e6d665d 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -23,11 +23,9 @@ extern "C" { #include "os.h" #include "executorimpl.h" -#define DS_CAPACITY_ENOUGH 1 -#define DS_DATA_FULL 2 -#define DS_NEED_SCHEDULE 3 -#define DS_QUERY_END 4 -#define DS_IN_PROCESS 5 +#define DS_BUF_LOW 1 +#define DS_BUF_FULL 2 +#define DS_BUF_EMPTY 3 struct SDataSink; struct SSDataBlock; @@ -46,11 +44,16 @@ typedef struct SInputData { SHashObj* pTableRetrieveTsMap; } SInputData; -typedef struct SOutPutData { +typedef struct SOutputData { int32_t numOfRows; int8_t compressed; char* pData; -} SOutPutData; + bool queryEnd; + bool needSchedule; + int32_t bufStatus; + int64_t useconds; + int8_t precision; +} SOutputData; /** * Create a subplan's datasinker handle for all later operations. @@ -66,16 +69,16 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH * @param pRes * @return error code */ -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus); +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -void dsEndPut(DataSinkHandle handle); +void dsEndPut(DataSinkHandle handle, int64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. * @param handle * @param pLen data length */ -void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, int32_t* pStatus); +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd); /** * Get data, the caller needs to allocate data memory. @@ -84,7 +87,7 @@ void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, int32_t* pStatus); * @param pStatus output * @return error code */ -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus); +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput); /** * After dsGetStatus returns DS_NEED_SCHEDULE, the caller need to put this into the work queue. diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 1bbf5494dd..69727626af 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -31,10 +31,10 @@ typedef struct SDataSinkManager { pthread_mutex_t mutex; } SDataSinkManager; -typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle); -typedef int32_t (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pStatus); -typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus); +typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); +typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef struct SDataSinkHandle { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 83f180cd1c..954ed6daf1 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -42,6 +42,8 @@ typedef struct SDataDispatchHandle { STaosQueue* pDataBlocks; SDataDispatchBuf nextOutput; int32_t status; + bool queryEnd; + int64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -124,8 +126,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, static int32_t updateStatus(SDataDispatchHandle* pDispatcher) { pthread_mutex_lock(&pDispatcher->mutex); - int32_t status = taosQueueSize(pDispatcher->pDataBlocks) < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery - ? DS_CAPACITY_ENOUGH : DS_DATA_FULL; + int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks); + int32_t status = (0 == blockNums ? DS_BUF_EMPTY : + (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); pDispatcher->status = status; pthread_mutex_unlock(&pDispatcher->mutex); return status; @@ -138,7 +141,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) { return status; } -static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, int32_t* pStatus) { +static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf)); if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) { @@ -146,38 +149,46 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, } toDataCacheEntry(pDispatcher, pInput, pBuf); taosWriteQitem(pDispatcher->pDataBlocks, pBuf); - *pStatus = updateStatus(pDispatcher); + *pContinue = (DS_BUF_LOW == updateStatus(pDispatcher) ? true : false); return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle) { +static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); - pDispatcher->status = DS_QUERY_END; + pDispatcher->queryEnd = true; + pDispatcher->useconds = useconds; pthread_mutex_unlock(&pDispatcher->mutex); } -static int32_t getDataLength(SDataSinkHandle* pHandle, int32_t* pStatus) { +static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { - *pStatus = (DS_QUERY_END == getStatus(pDispatcher) ? DS_QUERY_END : DS_IN_PROCESS); - return 0; + *pQueryEnd = pDispatcher->queryEnd; + *pLen = 0; + return; } SDataDispatchBuf* pBuf = NULL; taosReadQitem(pDispatcher->pDataBlocks, (void**)&pBuf); memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf)); taosFreeQitem(pBuf); - return ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; + *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; } -static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutPutData* pOutput, int32_t* pStatus) { +static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDispatcher->nextOutput.pData); memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); pOutput->numOfRows = pEntry->numOfRows; pOutput->compressed = pEntry->compressed; tfree(pDispatcher->nextOutput.pData); // todo persistent - *pStatus = updateStatus(pDispatcher); + pOutput->bufStatus = updateStatus(pDispatcher); + pthread_mutex_lock(&pDispatcher->mutex); + pOutput->queryEnd = pDispatcher->queryEnd; + pOutput->needSchedule = false; + pOutput->useconds = pDispatcher->useconds; + pOutput->precision = pDispatcher->schema.precision; + pthread_mutex_unlock(&pDispatcher->mutex); return TSDB_CODE_SUCCESS; } @@ -206,7 +217,8 @@ int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSink* pDataS dispatcher->sink.fDestroy = destroyDataSinker; dispatcher->pManager = pManager; dispatcher->schema = pDataSink->schema; - dispatcher->status = DS_CAPACITY_ENOUGH; + dispatcher->status = DS_BUF_EMPTY; + dispatcher->queryEnd = false; dispatcher->pDataBlocks = taosOpenQueue(); pthread_mutex_init(&dispatcher->mutex, NULL); if (NULL == dispatcher->pDataBlocks) { diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 01cc84c4f7..e3f0cd7eaa 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -31,24 +31,24 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH return TSDB_CODE_FAILED; } -int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, int32_t* pStatus) { +int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fPut(pHandleImpl, pInput, pStatus); + return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle) { +void dsEndPut(DataSinkHandle handle, int64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fEndPut(pHandleImpl); + return pHandleImpl->fEndPut(pHandleImpl, useconds); } -void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, int32_t* pStatus) { +void dsGetDataLength(DataSinkHandle handle, int32_t* pLen, bool* pQueryEnd) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - *pLen = pHandleImpl->fGetLen(pHandleImpl, pStatus); + pHandleImpl->fGetLen(pHandleImpl, pLen, pQueryEnd); } -int32_t dsGetDataBlock(DataSinkHandle handle, SOutPutData* pOutput, int32_t* pStatus) { +int32_t dsGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; - return pHandleImpl->fGetData(pHandleImpl, pOutput, pStatus); + return pHandleImpl->fGetData(pHandleImpl, pOutput); } void dsScheduleProcess(void* ahandle, void* pItem) {