feat: insert from query

This commit is contained in:
dapan1121 2022-07-08 10:27:17 +08:00
parent 2722c5ee5f
commit b849252eed
5 changed files with 26 additions and 13 deletions

View File

@ -110,7 +110,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
pMsgSendInfo->param = pParam; pMsgSendInfo->param = pParam;
pMsgSendInfo->msgInfo.pData = pMsg; pMsgSendInfo->msgInfo.pData = pMsg;
pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq); pMsgSendInfo->msgInfo.len = ntohl(pMsg->length);
pMsgSendInfo->msgType = TDMT_VND_SUBMIT; pMsgSendInfo->msgType = TDMT_VND_SUBMIT;
pMsgSendInfo->fp = inserterCallback; pMsgSendInfo->fp = inserterCallback;
@ -119,7 +119,7 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs
} }
SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { int32_t dataBlockToSubmit(SDataInserterHandle* pInserter, SSubmitReq** pReq) {
const SArray* pBlocks = pInserter->pDataBlocks; const SArray* pBlocks = pInserter->pDataBlocks;
const STSchema* pTSchema = pInserter->pSchema; const STSchema* pTSchema = pInserter->pSchema;
int64_t uid = pInserter->pNode->tableId; int64_t uid = pInserter->pNode->tableId;
@ -133,7 +133,7 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
// cal size // cal size
int32_t cap = sizeof(SSubmitReq); int32_t cap = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
// TODO min // TODO min
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
@ -144,15 +144,15 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
// assign data // assign data
// TODO // TODO
ret = rpcMallocCont(cap); ret = taosMemoryCalloc(1, cap);
ret->header.vgId = vgId; ret->header.vgId = htonl(vgId);
ret->version = htonl(pTSchema->version); ret->version = htonl(pTSchema->version);
ret->length = sizeof(SSubmitReq); ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(sz); ret->numOfBlocks = htonl(sz);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); SSDataBlock* pDataBlock = taosArrayGetP(pBlocks, i);
blkHead->numOfRows = htons(pDataBlock->info.rows); blkHead->numOfRows = htons(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version); blkHead->sversion = htonl(pTSchema->version);
@ -184,6 +184,12 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
} }
pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx); pColData = taosArrayGet(pDataBlock->pDataBlock, colIdx);
if (pColData->info.type != pColumn->type) {
qError("col type mis-match, schema type:%d, type in block:%d", pColumn->type, pColData->info.type);
terrno = TSDB_CODE_APP_ERROR;
return TSDB_CODE_APP_ERROR;
}
if (colDataIsNull_s(pColData, j)) { if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else { } else {
@ -204,16 +210,22 @@ SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) {
ret->length = htonl(ret->length); ret->length = htonl(ret->length);
return ret; *pReq = ret;
return TSDB_CODE_SUCCESS;
} }
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle;
taosArrayPush(pInserter->pDataBlocks, pInput->pData); taosArrayPush(pInserter->pDataBlocks, &pInput->pData);
SSubmitReq* pMsg = dataBlockToSubmit(pInserter); SSubmitReq* pMsg = NULL;
int32_t code = dataBlockToSubmit(pInserter, &pMsg);
if (code) {
return code;
}
int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet);
if (code) { if (code) {
return code; return code;
} }

View File

@ -6378,7 +6378,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) {
break; break;
case QUERY_NODE_INSERT_STMT: case QUERY_NODE_INSERT_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->msgType = TDMT_SCH_QUERY; pQuery->msgType = TDMT_VND_SUBMIT;
break; break;
case QUERY_NODE_VNODE_MODIF_STMT: case QUERY_NODE_VNODE_MODIF_STMT:
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;

View File

@ -1566,7 +1566,7 @@ static int32_t buildInsertSelectSubplan(SPhysiPlanContext* pCxt, SVnodeModifyLog
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink); code = createQueryInserter(pCxt, pModify, pSubplan, &pSubplan->pDataSink);
} }
pSubplan->msgType = TDMT_VND_SUBMIT; pSubplan->msgType = TDMT_SCH_MERGE_QUERY;
return code; return code;
} }

View File

@ -319,6 +319,7 @@ extern SSchedulerMgmt schMgmt;
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)

View File

@ -1001,7 +1001,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->execId = htonl(pTask->execId); pMsg->execId = htonl(pTask->execId);
pMsg->taskType = TASK_TYPE_TEMP; pMsg->taskType = TASK_TYPE_TEMP;
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
pMsg->needFetch = SCH_JOB_NEED_FETCH(pJob); pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
pMsg->phyLen = htonl(pTask->msgLen); pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len); pMsg->sqlLen = htonl(len);