diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 723402e639..b65189153e 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -89,9 +89,6 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { case TDMT_SCH_MERGE_FETCH: code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts); break; - case TDMT_SCH_FETCH_RSP: - code = qWorkerProcessFetchRsp(pQnode, pQnode->pQuery, pMsg, ts); - break; case TDMT_SCH_CANCEL_TASK: code = qWorkerProcessCancelMsg(pQnode, pQnode->pQuery, pMsg, ts); break; diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index 891bc0e4b9..0616f4b12f 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -118,7 +118,13 @@ static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMs } -SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) { +SSubmitReq* dataBlockToSubmit(SDataInserterHandle* pInserter) { + const SArray* pBlocks = pInserter->pDataBlocks; + const STSchema* pTSchema = pInserter->pSchema; + int64_t uid = pInserter->pNode->tableId; + int64_t suid = pInserter->pNode->stableId; + int32_t vgId = pInserter->pNode->vgId; + SSubmitReq* ret = NULL; int32_t sz = taosArrayGetSize(pBlocks); @@ -192,7 +198,7 @@ SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, i static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; taosArrayPush(pInserter->pDataBlocks, pInput->pData); - SSubmitReq* pMsg = dataBlockToSubmit(pInserter->pDataBlocks, pInserter->pSchema, pInserter->pNode->tableId, pInserter->pNode->suid, pInserter->pNode->vgId); + SSubmitReq* pMsg = dataBlockToSubmit(pInserter); int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); if (code) { @@ -248,7 +254,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat return TSDB_CODE_QRY_OUT_OF_MEMORY; } - SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink; + SQueryInserterNode* pInserterNode = (SQueryInserterNode *)pDataSink; inserter->sink.fPut = putDataBlock; inserter->sink.fEndPut = endPut; inserter->sink.fGetLen = getDataLength; @@ -267,7 +273,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat return code; } - if (pInserterNode->suid != suid) { + if (pInserterNode->stableId != suid) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; return terrno; } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 4f8ea00271..b36adbe5d4 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -39,6 +39,9 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) { if (TK_USING == t.type || TK_VALUES == t.type) { return true; } + if (0 == t.type) { + break; + } } while (pStr - pSql < length); return false; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 335a37fdb0..4db8d3dca7 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -247,7 +247,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, } int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) { - int32_t len = 0; + int64_t len = 0; bool queryEnd = false; int32_t code = 0; SOutputData output = {0}; @@ -255,7 +255,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes dsGetDataLength(ctx->sinkHandle, &len, &queryEnd); if (len <= 0 || len != sizeof(SDeleterRes)) { - QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len); + QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 959deda795..75e87e9eba 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -318,7 +318,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) #define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) -#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) +#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 8c52f2994d..42f482ab76 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -30,6 +30,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { case TDMT_SCH_EXPLAIN_RSP: return TSDB_CODE_SUCCESS; case TDMT_SCH_FETCH_RSP: + case TDMT_SCH_MERGE_FETCH_RSP: if (lastMsgType != reqMsgType) { SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType));