From e1ed9317d4c34337b47ea23b769da8b9db649a3e Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 7 Jul 2022 08:53:23 +0800 Subject: [PATCH] feat: insert from query --- source/client/src/clientImpl.c | 2 +- source/libs/executor/inc/dataSinkInt.h | 3 ++- source/libs/executor/src/dataDeleter.c | 4 ++-- source/libs/executor/src/dataDispatcher.c | 4 ++-- source/libs/executor/src/dataInserter.c | 27 +++++++---------------- source/libs/scheduler/src/schRemote.c | 11 +++++++-- 6 files changed, 24 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 671f04089c..7dd5d68209 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -657,7 +657,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList } if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || - TDMT_VND_CREATE_TABLE == pRequest->type) { + TDMT_VND_CREATE_TABLE == pRequest->type || TDMT_SCH_MERGE_QUERY == pRequest->type) { pRequest->body.resInfo.numOfRows = res.numOfRows; schedulerFreeJob(&pRequest->body.queryJob, 0); diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index dead1aff73..9426c99a0f 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -34,7 +34,7 @@ typedef struct SDataSinkManager { typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); -typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); +typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); @@ -50,6 +50,7 @@ typedef struct SDataSinkHandle { int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam); +int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam); #ifdef __cplusplus } diff --git a/source/libs/executor/src/dataDeleter.c b/source/libs/executor/src/dataDeleter.c index 8c220134f0..3c56abbd15 100644 --- a/source/libs/executor/src/dataDeleter.c +++ b/source/libs/executor/src/dataDeleter.c @@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pDeleter->mutex); } -static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; if (taosQueueEmpty(pDeleter->pDataBlocks)) { *pQueryEnd = pDeleter->queryEnd; @@ -168,7 +168,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; *pQueryEnd = pDeleter->queryEnd; - qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); + qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 5ee222efb7..b8495faffd 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -156,7 +156,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pDispatcher->mutex); } -static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; if (taosQueueEmpty(pDispatcher->pDataBlocks)) { *pQueryEnd = pDispatcher->queryEnd; @@ -170,7 +170,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE taosFreeQitem(pBuf); *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; *pQueryEnd = pDispatcher->queryEnd; - qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); + qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); } static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 5f7ebde4c9..891bc0e4b9 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -50,24 +50,6 @@ typedef struct SSubmitRspParam { SDataInserterHandle* pInserter; } SSubmitRspParam; -static int32_t updateStatus(SDataInserterHandle* pInserter) { - taosThreadMutexLock(&pInserter->mutex); - int32_t blockNums = taosQueueItemSize(pInserter->pDataBlocks); - int32_t status = - (0 == blockNums ? DS_BUF_EMPTY - : (blockNums < pInserter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); - pInserter->status = status; - taosThreadMutexUnlock(&pInserter->mutex); - return status; -} - -static int32_t getStatus(SDataInserterHandle* pInserter) { - taosThreadMutexLock(&pInserter->mutex); - int32_t status = pInserter->status; - taosThreadMutexUnlock(&pInserter->mutex); - return status; -} - int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { SSubmitRspParam* pParam = (SSubmitRspParam*)param; SDataInserterHandle* pInserter = pParam->pInserter; @@ -236,6 +218,13 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { taosThreadMutexUnlock(&pInserter->mutex); } +static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) { + SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle; + *pLen = pDispatcher->submitRes.affectedRows; + qDebug("got total affectedRows %" PRId64 , *pLen); +} + + static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); @@ -262,7 +251,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink; inserter->sink.fPut = putDataBlock; inserter->sink.fEndPut = endPut; - inserter->sink.fGetLen = NULL; + inserter->sink.fGetLen = getDataLength; inserter->sink.fGetData = NULL; inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fGetCacheSize = getCacheSize; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 263401d20e..f84f51affa 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -236,16 +236,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa } case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { - SQueryTableRsp *rsp = (SQueryTableRsp *)msg; - SCH_ERR_JRET(rspCode); if (NULL == msg) { SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } + + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + rsp->code = ntohl(rsp->code); + rsp->sversion = ntohl(rsp->sversion); + rsp->tversion = ntohl(rsp->tversion); + rsp->affectedRows = be64toh(rsp->affectedRows); + SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); + taosMemoryFreeClear(msg); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));