feat: insert from query

This commit is contained in:
dapan1121 2022-07-07 08:53:23 +08:00
parent d625cc2e47
commit e1ed9317d4
6 changed files with 24 additions and 27 deletions

View File

@ -657,7 +657,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
} }
if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type ||
TDMT_VND_CREATE_TABLE == pRequest->type) { TDMT_VND_CREATE_TABLE == pRequest->type || TDMT_SCH_MERGE_QUERY == pRequest->type) {
pRequest->body.resInfo.numOfRows = res.numOfRows; pRequest->body.resInfo.numOfRows = res.numOfRows;
schedulerFreeJob(&pRequest->body.queryJob, 0); schedulerFreeJob(&pRequest->body.queryJob, 0);

View File

@ -34,7 +34,7 @@ typedef struct SDataSinkManager {
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd);
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size); typedef int32_t (*FGetCacheSize)(struct SDataSinkHandle* pHandle, uint64_t* size);
@ -50,6 +50,7 @@ typedef struct SDataSinkHandle {
int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle); int32_t createDataDispatcher(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle);
int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam); int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDataSink, DataSinkHandle* pHandle, void *pParam);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -154,7 +154,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDeleter->mutex); taosThreadMutexUnlock(&pDeleter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle; SDataDeleterHandle* pDeleter = (SDataDeleterHandle*)pHandle;
if (taosQueueEmpty(pDeleter->pDataBlocks)) { if (taosQueueEmpty(pDeleter->pDataBlocks)) {
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
@ -168,7 +168,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen;
*pQueryEnd = pDeleter->queryEnd; *pQueryEnd = pDeleter->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {

View File

@ -156,7 +156,7 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pDispatcher->mutex); taosThreadMutexUnlock(&pDispatcher->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
if (taosQueueEmpty(pDispatcher->pDataBlocks)) { if (taosQueueEmpty(pDispatcher->pDataBlocks)) {
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
@ -170,7 +170,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
taosFreeQitem(pBuf); taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen; *pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pQueryEnd = pDispatcher->queryEnd; *pQueryEnd = pDispatcher->queryEnd;
qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows); qDebug("got data len %" PRId64 ", row num %d in sink", *pLen, ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->numOfRows);
} }
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {

View File

@ -50,24 +50,6 @@ typedef struct SSubmitRspParam {
SDataInserterHandle* pInserter; SDataInserterHandle* pInserter;
} SSubmitRspParam; } SSubmitRspParam;
static int32_t updateStatus(SDataInserterHandle* pInserter) {
taosThreadMutexLock(&pInserter->mutex);
int32_t blockNums = taosQueueItemSize(pInserter->pDataBlocks);
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pInserter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pInserter->status = status;
taosThreadMutexUnlock(&pInserter->mutex);
return status;
}
static int32_t getStatus(SDataInserterHandle* pInserter) {
taosThreadMutexLock(&pInserter->mutex);
int32_t status = pInserter->status;
taosThreadMutexUnlock(&pInserter->mutex);
return status;
}
int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) {
SSubmitRspParam* pParam = (SSubmitRspParam*)param; SSubmitRspParam* pParam = (SSubmitRspParam*)param;
SDataInserterHandle* pInserter = pParam->pInserter; SDataInserterHandle* pInserter = pParam->pInserter;
@ -236,6 +218,13 @@ static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
taosThreadMutexUnlock(&pInserter->mutex); taosThreadMutexUnlock(&pInserter->mutex);
} }
static void getDataLength(SDataSinkHandle* pHandle, int64_t* pLen, bool* pQueryEnd) {
SDataInserterHandle* pDispatcher = (SDataInserterHandle*)pHandle;
*pLen = pDispatcher->submitRes.affectedRows;
qDebug("got total affectedRows %" PRId64 , *pLen);
}
static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { static int32_t destroyDataSinker(SDataSinkHandle* pHandle) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize);
@ -262,7 +251,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink; SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink;
inserter->sink.fPut = putDataBlock; inserter->sink.fPut = putDataBlock;
inserter->sink.fEndPut = endPut; inserter->sink.fEndPut = endPut;
inserter->sink.fGetLen = NULL; inserter->sink.fGetLen = getDataLength;
inserter->sink.fGetData = NULL; inserter->sink.fGetData = NULL;
inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fDestroy = destroyDataSinker;
inserter->sink.fGetCacheSize = getCacheSize; inserter->sink.fGetCacheSize = getCacheSize;

View File

@ -236,16 +236,23 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
} }
case TDMT_SCH_QUERY_RSP: case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: { case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
SCH_ERR_JRET(rspCode); SCH_ERR_JRET(rspCode);
if (NULL == msg) { if (NULL == msg) {
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
} }
SQueryTableRsp *rsp = (SQueryTableRsp *)msg;
rsp->code = ntohl(rsp->code);
rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion);
rsp->affectedRows = be64toh(rsp->affectedRows);
SCH_ERR_JRET(rsp->code); SCH_ERR_JRET(rsp->code);
SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp));
atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows);
taosMemoryFreeClear(msg); taosMemoryFreeClear(msg);
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));