From b849252eed8065fc4a11ed9e9c13770bf176dced Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 8 Jul 2022 10:27:17 +0800 Subject: [PATCH] feat: insert from query --- source/libs/executor/src/dataInserter.c | 32 +++++++++++++++------- source/libs/parser/src/parTranslater.c | 2 +- source/libs/planner/src/planPhysiCreater.c | 2 +- source/libs/scheduler/inc/schInt.h | 1 + source/libs/scheduler/src/schRemote.c | 2 +- 5 files changed, 26 insertions(+), 13 deletions(-) diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index edc6143b93..860ecd35b6 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -110,7 +110,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs pMsgSendInfo->param = pParam; pMsgSendInfo->msgInfo.pData = pMsg; - pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq); + pMsgSendInfo->msgInfo.len = ntohl(pMsg->length); pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->fp = inserterCallback; @@ -119,7 +119,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs } -SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { +int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) { const SArray* pBlocks = pInserter->pDataBlocks; const STSchema* pTSchema = pInserter->pSchema; int64_t uid = pInserter->pNode->tableId; @@ -133,7 +133,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { // cal size int32_t cap = sizeof(SSubmitReq); for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i); int32_t rows = pDataBlock->info.rows; // TODO min int32_t rowSize = pDataBlock->info.rowSize; @@ -144,15 +144,15 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { // assign data // TODO - ret = rpcMallocCont(cap); - ret->header.vgId = vgId; + ret = taosMemoryCalloc(1, cap); + ret->header.vgId = htonl(vgId); ret->version = htonl(pTSchema->version); ret->length = sizeof(SSubmitReq); ret->numOfBlocks = htonl(sz); SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i); blkHead->numOfRows = htons(pDataBlock->info.rows); blkHead->sversion = htonl(pTSchema->version); @@ -184,6 +184,12 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { } pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx); + if (pColData->info.type != pColumn->type) { + qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type); + terrno = TSDB_CODE_APP_ERROR; + return TSDB_CODE_APP_ERROR; + } + if (colDataIsNull_s(pColData, j)) { tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); } else { @@ -204,16 +210,22 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { ret->length = htonl(ret->length); - return ret; + *pReq = ret; + + return TSDB_CODE_SUCCESS; } static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; - taosArrayPush(pInserter->pDataBlocks, pInput->pData); - SSubmitReq* pMsg = dataBlockToSubmit(pInserter); + taosArrayPush(pInserter->pDataBlocks, &pInput->pData); + SSubmitReq* pMsg = NULL; + int32_t code = dataBlockToSubmit(pInserter, &pMsg); + if (code) { + return code; + } - int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); + code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); if (code) { return code; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c05742b52c..0975fe7309 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6378,7 +6378,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { break; case QUERY_NODE_INSERT_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - pQuery->msgType = TDMT_SCH_QUERY; + pQuery->msgType = TDMT_VND_SUBMIT; break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 6d5d5e220b..18d69d21d8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1566,7 +1566,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog if (TSDB_CODE_SUCCESS == code) { code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink); } - pSubplan->msgType = TDMT_VND_SUBMIT; + pSubplan->msgType = TDMT_SCH_MERGE_QUERY; return code; } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 75e87e9eba..9018deaf13 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -319,6 +319,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) +#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 42f482ab76..f9c3d80e46 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1001,7 +1001,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->execId = htonl(pTask->execId); pMsg->taskType = TASK_TYPE_TEMP; pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); - pMsg->needFetch = SCH_JOB_NEED_FETCH(pJob); + pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask); pMsg->phyLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len);