From 0d4fb5bb80b8286ebf84ed62c14896675361510d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 6 Jul 2022 16:29:51 +0800 Subject: [PATCH] feat: support insert from query --- include/common/tdatablock.h | 2 + include/common/tmsg.h | 1 + include/libs/executor/dataSinkMgt.h | 4 + include/libs/executor/executor.h | 2 +- include/libs/qworker/qworker.h | 2 +- source/common/src/tdatablock.c | 73 +++++ source/dnode/mgmt/node_mgmt/src/dmTransport.c | 3 +- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tsdb/tsdbRead.c | 36 +++ source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/dataInserter.c | 304 ++++++++---------- source/libs/executor/src/dataSinkMgt.c | 2 + source/libs/executor/src/executorMain.c | 2 +- source/libs/executor/src/executorimpl.c | 14 +- source/libs/parser/src/parTranslater.c | 2 +- source/libs/qworker/inc/qwInt.h | 22 +- source/libs/qworker/inc/qwMsg.h | 2 +- source/libs/qworker/src/qwMsg.c | 12 +- source/libs/qworker/src/qworker.c | 47 ++- source/libs/scheduler/inc/schInt.h | 6 +- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schRemote.c | 1 + source/libs/scheduler/src/schTask.c | 12 +- 23 files changed, 330 insertions(+), 224 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 8b64287033..1c822c6c05 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -246,6 +246,8 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks char* buildCtbNameByGroupId(const char* stbName, uint64_t groupId); +SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId); + static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock); } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dedc06a2b9..aedf0680ee 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1510,6 +1510,7 @@ typedef struct SSubQueryMsg { int32_t execId; int8_t taskType; int8_t explain; + int8_t needFetch; uint32_t sqlLen; // the query sql, uint32_t phyLen; char msg[]; diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 957c40f21e..ca2c49bfb5 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -45,6 +45,10 @@ typedef struct SDeleterParam { SArray* pUidList; } SDeleterParam; +typedef struct SInserterParam { + SReadHandle* readHandle; +} SInserterParam; + typedef struct SDataSinkStat { uint64_t cachedSize; } SDataSinkStat; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 45fa94b3bf..d9add5275f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -155,7 +155,7 @@ int64_t qGetQueriedTableUid(qTaskInfo_t tinfo); */ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t tagCondLen, SArray* pTableIdList); -void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); +void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 36e9b3309c..8149b659de 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -74,7 +74,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); -int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); +int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index bca740e9ce..757235e5aa 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2105,3 +2105,76 @@ const char* blockDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRow ASSERT(pStart - pData == dataLen); return pStart; } + + +SSubmitReq* dataBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, int64_t uid, int64_t suid, int32_t vgId) { + SSubmitReq* ret = NULL; + int32_t sz = taosArrayGetSize(pBlocks); + + // cal size + int32_t cap = sizeof(SSubmitReq); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + int32_t rows = pDataBlock->info.rows; + // TODO min + int32_t rowSize = pDataBlock->info.rowSize; + int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema); + + cap += sizeof(SSubmitBlk) + rows * maxLen; + } + + // assign data + // TODO + ret = rpcMallocCont(cap); + ret->header.vgId = vgId; + ret->version = htonl(1); + ret->length = sizeof(SSubmitReq); + ret->numOfBlocks = htonl(sz); + + SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq)); + for (int32_t i = 0; i < sz; i++) { + SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); + + blkHead->numOfRows = htons(pDataBlock->info.rows); + blkHead->sversion = htonl(pTSchema->version); + // TODO + blkHead->suid = htobe64(suid); + blkHead->uid = htobe64(uid); + blkHead->schemaLen = htonl(0); + + int32_t rows = pDataBlock->info.rows; + int32_t dataLen = 0; + STSRow* rowData = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); + for (int32_t j = 0; j < rows; j++) { + SRowBuilder rb = {0}; + tdSRowInit(&rb, pTSchema->version); + tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen); + tdSRowResetBuf(&rb, rowData); + + for (int32_t k = 0; k < pTSchema->numOfCols; k++) { + const STColumn* pColumn = &pTSchema->columns[k]; + SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k); + if (colDataIsNull_s(pColData, j)) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NONE, NULL, false, pColumn->offset, k); + } else { + void* data = colDataGetData(pColData, j); + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, pColumn->offset, k); + } + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + blkHead->dataLen = htonl(dataLen); + + ret->length += sizeof(SSubmitBlk) + dataLen; + blkHead = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk) + dataLen); + } + + ret->length = htonl(ret->length); + + return ret; +} + + diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 7043991525..28df060494 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -88,7 +88,8 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { case TDMT_MND_SYSTABLE_RETRIEVE_RSP: case TDMT_DND_SYSTABLE_RETRIEVE_RSP: case TDMT_SCH_FETCH_RSP: - qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0); + case TDMT_VND_SUBMIT_RSP: + qWorkerProcessRspMsg(NULL, NULL, pRpc, 0); return; case TDMT_MND_STATUS_RSP: if (pEpSet != NULL) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 38cb3b70a6..9feb9ee649 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -137,6 +137,7 @@ int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDat SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); +int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t* suid); // tq diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 4604e3699f..01c1a47ab8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3777,3 +3777,39 @@ void tsdbCleanupReadHandle(tsdbReaderT queryHandle) { taosMemoryFree(pTsdbReadHandle->pSchema); taosMemoryFreeClear(pTsdbReadHandle); } + +int32_t tsdbGetTableSchema(SVnode* pVnode, int64_t uid, STSchema** pSchema, int64_t *suid) { + int32_t sversion = 1; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pVnode->pMeta, 0); + int32_t code = metaGetTableEntryByUid(&mr, uid); + if (code != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + metaReaderClear(&mr); + return terrno; + } + + *suid = 0; + + if (mr.me.type == TSDB_CHILD_TABLE) { + *suid = mr.me.ctbEntry.suid; + code = metaGetTableEntryByUid(&mr, *suid); + if (code != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + metaReaderClear(&mr); + return terrno; + } + sversion = mr.me.stbEntry.schemaRow.version; + } else { + ASSERT(mr.me.type == TSDB_NORMAL_TABLE); + sversion = mr.me.ntbEntry.schemaRow.version; + } + + metaReaderClear(&mr); + *pSchema = metaGetTbTSchema(pVnode->pMeta, uid, sversion); + + return TSDB_CODE_SUCCESS; +} + + diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 00f2e09e0c..51e4c4411d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -855,7 +855,7 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, const char* sql, EOPTR_EXEC_MODEL model); -int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo); +int32_t createDataSinkParam(SDataSinkNode *pNode, void **pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity, int32_t* resNum); diff --git a/source/libs/executor/src/dataInserter.c b/source/libs/executor/src/dataInserter.c index c424cb33fa..26c8d9ede6 100644 --- a/source/libs/executor/src/dataInserter.c +++ b/source/libs/executor/src/dataInserter.c @@ -24,195 +24,153 @@ extern SDataSinkStat gDataSinkStat; -typedef struct SDataInserterBuf { - int32_t useSize; - int32_t allocSize; - char* pData; -} SDataInserterBuf; - -typedef struct SDataCacheEntry { - int32_t dataLen; - int32_t numOfRows; - int32_t numOfCols; - int8_t compressed; - char data[]; -} SDataCacheEntry; +typedef struct SSubmitRes { + int64_t affectedRows; + int32_t code; + SSubmitRsp *pRsp; +} SSubmitRes; typedef struct SDataInserterHandle { SDataSinkHandle sink; SDataSinkManager* pManager; - SDataBlockDescNode* pSchema; - SDataDeleterNode* pDeleter; - SDeleterParam* pParam; - STaosQueue* pDataBlocks; - SDataInserterBuf nextOutput; + STSchema* pSchema; + SQueryInserterNode* pNode; + SSubmitRes submitRes; + SInserterParam* pParam; + SArray* pDataBlocks; int32_t status; bool queryEnd; uint64_t useconds; uint64_t cachedSize; TdThreadMutex mutex; + tsem_t ready; } SDataInserterHandle; -static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) { - if (tsCompressColData < 0 || 0 == pData->info.rows) { - return false; - } +typedef struct SSubmitRspParam { + SDataInserterHandle* pInserter; +} SSubmitRspParam; - for (int32_t col = 0; col < numOfCols; ++col) { - SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col); - int32_t colSize = pColRes->info.bytes * pData->info.rows; - if (NEEDTO_COMPRESS_QUERY(colSize)) { - return true; - } - } - - return false; -} - -static void toDataCacheEntry(SDataInserterHandle* pHandle, const SInputData* pInput, SDataInserterBuf* pBuf) { - int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots); - - SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData; - pEntry->compressed = 0; - pEntry->numOfRows = pInput->pData->info.rows; - pEntry->numOfCols = taosArrayGetSize(pInput->pData->pDataBlock); - pEntry->dataLen = sizeof(SDeleterRes); - - ASSERT(1 == pEntry->numOfRows); - ASSERT(1 == pEntry->numOfCols); - - pBuf->useSize = sizeof(SDataCacheEntry); - - SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pInput->pData->pDataBlock, 0); - - SDeleterRes* pRes = (SDeleterRes*)pEntry->data; - pRes->suid = pHandle->pParam->suid; - pRes->uidList = pHandle->pParam->pUidList; - pRes->skey = pHandle->pDeleter->deleteTimeRange.skey; - pRes->ekey = pHandle->pDeleter->deleteTimeRange.ekey; - pRes->affectedRows = *(int64_t*)pColRes->pData; - - pBuf->useSize += pEntry->dataLen; - - atomic_add_fetch_64(&pHandle->cachedSize, pEntry->dataLen); - atomic_add_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); -} - -static bool allocBuf(SDataInserterHandle* pDeleter, const SInputData* pInput, SDataInserterBuf* pBuf) { - uint32_t capacity = pDeleter->pManager->cfg.maxDataBlockNumPerQuery; - if (taosQueueItemSize(pDeleter->pDataBlocks) > capacity) { - qError("SinkNode queue is full, no capacity, max:%d, current:%d, no capacity", capacity, - taosQueueItemSize(pDeleter->pDataBlocks)); - return false; - } - - pBuf->allocSize = sizeof(SDataCacheEntry) + sizeof(SDeleterRes); - - pBuf->pData = taosMemoryMalloc(pBuf->allocSize); - if (pBuf->pData == NULL) { - qError("SinkNode failed to malloc memory, size:%d, code:%d", pBuf->allocSize, TAOS_SYSTEM_ERROR(errno)); - } - - return NULL != pBuf->pData; -} - -static int32_t updateStatus(SDataInserterHandle* pDeleter) { - taosThreadMutexLock(&pDeleter->mutex); - int32_t blockNums = taosQueueItemSize(pDeleter->pDataBlocks); +static int32_t updateStatus(SDataInserterHandle* pInserter) { + taosThreadMutexLock(&pInserter->mutex); + int32_t blockNums = taosQueueItemSize(pInserter->pDataBlocks); int32_t status = (0 == blockNums ? DS_BUF_EMPTY - : (blockNums < pDeleter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); - pDeleter->status = status; - taosThreadMutexUnlock(&pDeleter->mutex); + : (blockNums < pInserter->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL)); + pInserter->status = status; + taosThreadMutexUnlock(&pInserter->mutex); return status; } -static int32_t getStatus(SDataInserterHandle* pDeleter) { - taosThreadMutexLock(&pDeleter->mutex); - int32_t status = pDeleter->status; - taosThreadMutexUnlock(&pDeleter->mutex); +static int32_t getStatus(SDataInserterHandle* pInserter) { + taosThreadMutexLock(&pInserter->mutex); + int32_t status = pInserter->status; + taosThreadMutexUnlock(&pInserter->mutex); return status; } +int32_t inserterCallback(void* param, SDataBuf* pMsg, int32_t code) { + SSubmitRspParam* pParam = (SSubmitRspParam*)param; + SDataInserterHandle* pInserter = pParam->pInserter; + + pInserter->submitRes.code = code; + + if (code == TSDB_CODE_SUCCESS) { + pInserter->submitRes.pRsp = taosMemoryCalloc(1, sizeof(SSubmitRsp)); + SDecoder coder = {0}; + tDecoderInit(&coder, pMsg->pData, pMsg->len); + code = tDecodeSSubmitRsp(&coder, pInserter->submitRes.pRsp); + if (code) { + tFreeSSubmitRsp(pInserter->submitRes.pRsp); + pInserter->submitRes.code = code; + goto _return; + } + + if (pInserter->submitRes.pRsp->nBlocks > 0) { + for (int32_t i = 0; i < pInserter->submitRes.pRsp->nBlocks; ++i) { + SSubmitBlkRsp *blk = pInserter->submitRes.pRsp->pBlocks + i; + if (TSDB_CODE_SUCCESS != blk->code) { + code = blk->code; + tFreeSSubmitRsp(pInserter->submitRes.pRsp); + pInserter->submitRes.code = code; + goto _return; + } + } + } + + pInserter->submitRes.affectedRows += pInserter->submitRes.pRsp->affectedRows; + qDebug("submit rsp received, affectedRows:%d, total:%d", pInserter->submitRes.pRsp->affectedRows, pInserter->submitRes.affectedRows); + + tFreeSSubmitRsp(pInserter->submitRes.pRsp); + } + +_return: + + tsem_post(&pInserter->ready); + + taosMemoryFree(param); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t sendSubmitRequest(SDataInserterHandle* pInserter, SSubmitReq* pMsg, void* pTransporter, SEpSet* pEpset) { + // send the fetch remote task result reques + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (NULL == pMsgSendInfo) { + taosMemoryFreeClear(pMsg); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return terrno; + } + + SSubmitRspParam* pParam = taosMemoryCalloc(1, sizeof(SSubmitRspParam)); + pParam->pInserter = pInserter; + + pMsgSendInfo->param = pParam; + pMsgSendInfo->msgInfo.pData = pMsg; + pMsgSendInfo->msgInfo.len = sizeof(SSubmitReq); + pMsgSendInfo->msgType = TDMT_VND_SUBMIT; + pMsgSendInfo->fp = inserterCallback; + + int64_t transporterId = 0; + return asyncSendMsgToServer(pTransporter, pEpset, &transporterId, pMsgSendInfo); +} + + static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) { - SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; - SDataInserterBuf* pBuf = taosAllocateQitem(sizeof(SDataInserterBuf), DEF_QITEM); - if (NULL == pBuf || !allocBuf(pDeleter, pInput, pBuf)) { - return TSDB_CODE_QRY_OUT_OF_MEMORY; + SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; + taosArrayPush(pInserter->pDataBlocks, pInput->pData); + SSubmitReq* pMsg = dataBlockToSubmit(pInserter->pDataBlocks, pInserter->pSchema, pInserter->pNode->tableId, pInserter->pNode->suid, pInserter->pNode->vgId); + + int32_t code = sendSubmitRequest(pInserter, pMsg, pInserter->pParam->readHandle->pMsgCb->clientRpc, &pInserter->pNode->epSet); + if (code) { + return code; } - toDataCacheEntry(pDeleter, pInput, pBuf); - taosWriteQitem(pDeleter->pDataBlocks, pBuf); - *pContinue = (DS_BUF_LOW == updateStatus(pDeleter) ? true : false); + + tsem_wait(&pInserter->ready); + + if (pInserter->submitRes.code) { + return pInserter->submitRes.code; + } + + *pContinue = true; + return TSDB_CODE_SUCCESS; } static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { - SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; - taosThreadMutexLock(&pDeleter->mutex); - pDeleter->queryEnd = true; - pDeleter->useconds = useconds; - taosThreadMutexUnlock(&pDeleter->mutex); -} - -static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd) { - SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; - if (taosQueueEmpty(pDeleter->pDataBlocks)) { - *pQueryEnd = pDeleter->queryEnd; - *pLen = 0; - return; - } - - SDataInserterBuf* pBuf = NULL; - taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - memcpy(&pDeleter->nextOutput, pBuf, sizeof(SDataInserterBuf)); - taosFreeQitem(pBuf); - *pLen = ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->dataLen; - *pQueryEnd = pDeleter->queryEnd; - qDebug("got data len %d, row num %d in sink", *pLen, ((SDataCacheEntry*)(pDeleter->nextOutput.pData))->numOfRows); -} - -static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) { - SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; - if (NULL == pDeleter->nextOutput.pData) { - assert(pDeleter->queryEnd); - pOutput->useconds = pDeleter->useconds; - pOutput->precision = pDeleter->pSchema->precision; - pOutput->bufStatus = DS_BUF_EMPTY; - pOutput->queryEnd = pDeleter->queryEnd; - return TSDB_CODE_SUCCESS; - } - SDataCacheEntry* pEntry = (SDataCacheEntry*)(pDeleter->nextOutput.pData); - memcpy(pOutput->pData, pEntry->data, pEntry->dataLen); - pOutput->numOfRows = pEntry->numOfRows; - pOutput->numOfCols = pEntry->numOfCols; - pOutput->compressed = pEntry->compressed; - - atomic_sub_fetch_64(&pDeleter->cachedSize, pEntry->dataLen); - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pEntry->dataLen); - - taosMemoryFreeClear(pDeleter->nextOutput.pData); // todo persistent - pOutput->bufStatus = updateStatus(pDeleter); - taosThreadMutexLock(&pDeleter->mutex); - pOutput->queryEnd = pDeleter->queryEnd; - pOutput->useconds = pDeleter->useconds; - pOutput->precision = pDeleter->pSchema->precision; - taosThreadMutexUnlock(&pDeleter->mutex); - - return TSDB_CODE_SUCCESS; + SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; + taosThreadMutexLock(&pInserter->mutex); + pInserter->queryEnd = true; + pInserter->useconds = useconds; + taosThreadMutexUnlock(&pInserter->mutex); } static int32_t destroyDataSinker(SDataSinkHandle* pHandle) { - SDataInserterHandle* pDeleter = (SDataInserterHandle*)pHandle; - atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pDeleter->cachedSize); - taosMemoryFreeClear(pDeleter->nextOutput.pData); - while (!taosQueueEmpty(pDeleter->pDataBlocks)) { - SDataInserterBuf* pBuf = NULL; - taosReadQitem(pDeleter->pDataBlocks, (void**)&pBuf); - taosMemoryFreeClear(pBuf->pData); - taosFreeQitem(pBuf); - } - taosCloseQueue(pDeleter->pDataBlocks); - taosThreadMutexDestroy(&pDeleter->mutex); + SDataInserterHandle* pInserter = (SDataInserterHandle*)pHandle; + atomic_sub_fetch_64(&gDataSinkStat.cachedSize, pInserter->cachedSize); + taosArrayDestroy(pInserter->pDataBlocks); + taosMemoryFree(pInserter->pSchema); + taosThreadMutexDestroy(&pInserter->mutex); return TSDB_CODE_SUCCESS; } @@ -230,25 +188,39 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat return TSDB_CODE_QRY_OUT_OF_MEMORY; } - SDataDeleterNode* pDeleterNode = (SDataDeleterNode *)pDataSink; + SDataDeleterNode* pInserterNode = (SQueryInserterNode *)pDataSink; inserter->sink.fPut = putDataBlock; inserter->sink.fEndPut = endPut; - inserter->sink.fGetLen = getDataLength; - inserter->sink.fGetData = getDataBlock; + inserter->sink.fGetLen = NULL; + inserter->sink.fGetData = NULL; inserter->sink.fDestroy = destroyDataSinker; inserter->sink.fGetCacheSize = getCacheSize; inserter->pManager = pManager; - inserter->pDeleter = pDeleterNode; - inserter->pSchema = pDataSink->pInputDataBlockDesc; + inserter->pNode = pInserterNode; inserter->pParam = pParam; inserter->status = DS_BUF_EMPTY; inserter->queryEnd = false; - inserter->pDataBlocks = taosOpenQueue(); + + int64_t suid = 0; + int32_t code = tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid); + if (code) { + return code; + } + + if (pInserterNode->suid != suid) { + terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; + return terrno; + } + + inserter->pDataBlocks = taosArrayInit(1, POINTER_BYTES); taosThreadMutexInit(&inserter->mutex, NULL); if (NULL == inserter->pDataBlocks) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; return TSDB_CODE_QRY_OUT_OF_MEMORY; } + + tsem_init(&inserter->ready, 0, 0); + *pHandle = inserter; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 498171e88c..21f4ca24f1 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -40,6 +40,8 @@ int32_t dsCreateDataSinker(const SDataSinkNode *pDataSink, DataSinkHandle* pHand return createDataDispatcher(&gDataSinkManager, pDataSink, pHandle); case QUERY_NODE_PHYSICAL_PLAN_DELETE: return createDataDeleter(&gDataSinkManager, pDataSink, pHandle, pParam); + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: + return createDataInserter(&gDataSinkManager, pDataSink, pHandle, pParam); } return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index ed78e4173a..f651069a3a 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -52,7 +52,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, if (handle) { void* pSinkParam = NULL; - code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo); + code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 25b61e15c3..0833cdd578 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1992,7 +1992,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) { taosMemoryFreeClear(pMsgBody); } -void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { +void qProcessRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { SMsgSendInfo* pSendInfo = (SMsgSendInfo*)pMsg->info.ahandle; assert(pMsg->info.ahandle != NULL); @@ -4687,10 +4687,20 @@ int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) { return TDB_CODE_SUCCESS; } -int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo) { +int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; switch (pNode->type) { + case QUERY_NODE_PHYSICAL_PLAN_QUERY_INSERT: { + SInserterParam* pInserterParam = taosMemoryCalloc(1, sizeof(SInserterParam)); + if (NULL == pInserterParam) { + return TSDB_CODE_OUT_OF_MEMORY; + } + pInserterParam->readHandle = readHandle; + + *pParam = pInserterParam; + break; + } case QUERY_NODE_PHYSICAL_PLAN_DELETE: { SDeleterParam* pDeleterParam = taosMemoryCalloc(1, sizeof(SDeleterParam)); if (NULL == pDeleterParam) { diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index c2beb8a743..8d99f145b3 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6241,7 +6241,7 @@ static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery) { break; case QUERY_NODE_INSERT_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; - pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->msgType = TDMT_SCH_QUERY; break; case QUERY_NODE_VNODE_MODIF_STMT: pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index eb10a2fdd6..4c92611a54 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -80,12 +80,19 @@ typedef struct SQWDebug { extern SQWDebug gQWDebug; +typedef struct SQWMsgInfo { + int8_t taskType; + int8_t explain; + int8_t needFetch; +} SQWMsgInfo; + typedef struct SQWMsg { void *node; int32_t code; int32_t msgType; char *msg; int32_t msgLen; + SQWMsgInfo msgInfo; SRpcHandleInfo connInfo; } SQWMsg; @@ -122,9 +129,11 @@ typedef struct SQWTaskCtx { int8_t phase; int8_t taskType; int8_t explain; + int8_t needFetch; int32_t queryType; int32_t execId; + bool queryRsped; bool queryFetched; bool queryEnd; bool queryContinue; @@ -161,7 +170,7 @@ typedef struct SQWMsgStat { uint64_t queryProcessed; uint64_t cqueryProcessed; uint64_t fetchProcessed; - uint64_t fetchRspProcessed; + uint64_t rspProcessed; uint64_t cancelProcessed; uint64_t dropProcessed; uint64_t hbProcessed; @@ -211,8 +220,8 @@ typedef struct SQWorkerMgmt { #define QW_STAT_GET(_item) atomic_load_64(&(_item)) #define QW_GET_EVENT(ctx, event) atomic_load_8(&(ctx)->events[event]) -#define QW_IS_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) -#define QW_IS_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) +#define QW_EVENT_RECEIVED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_RECEIVED) +#define QW_EVENT_PROCESSED(ctx, event) (QW_GET_EVENT(ctx, event) == QW_EVENT_PROCESSED) #define QW_SET_EVENT_RECEIVED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_RECEIVED) #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) @@ -221,13 +230,8 @@ typedef struct SQWorkerMgmt { #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) -#define QW_IS_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) +#define QW_QUERY_RUNNING(ctx) (QW_GET_PHASE(ctx) == QW_PHASE_PRE_QUERY || QW_GET_PHASE(ctx) == QW_PHASE_PRE_CQUERY) -#define QW_TASK_NOT_EXIST(code) (TSDB_CODE_QRY_SCH_NOT_EXIST == (code) || TSDB_CODE_QRY_TASK_NOT_EXIST == (code)) -#define QW_TASK_ALREADY_EXIST(code) (TSDB_CODE_QRY_TASK_ALREADY_EXIST == (code)) -#define QW_TASK_READY(status) \ - (status == JOB_TASK_STATUS_SUCC || status == JOB_TASK_STATUS_FAIL || status == JOB_TASK_STATUS_CANCELLED || \ - status == JOB_TASK_STATUS_PART_SUCC) #define QW_SET_QTID(id, qId, tId, eId) \ do { \ *(uint64_t *)(id) = (qId); \ diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 9e9d1f44cb..b2205a46f1 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -25,7 +25,7 @@ extern "C" { int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index cc4228f7c7..94b6ddd6a2 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -366,10 +366,14 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; + qwMsg.msgInfo.explain = msg->explain; + qwMsg.msgInfo.taskType = msg->taskType; + qwMsg.msgInfo.needFetch = msg->needFetch; + char * sql = strndup(msg->msg, msg->sqlLen); QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); - QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain, sql)); + QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); return TSDB_CODE_SUCCESS; @@ -447,14 +451,14 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int return TSDB_CODE_SUCCESS; } -int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { +int32_t qWorkerProcessRspMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { SQWorker * mgmt = (SQWorker *)qWorkerMgmt; if (mgmt) { qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE); - QW_STAT_INC(mgmt->stat.msgStat.fetchRspProcessed, 1); + QW_STAT_INC(mgmt->stat.msgStat.rspProcessed, 1); } - qProcessFetchRsp(NULL, pMsg, NULL); + qProcessRspMsg(NULL, pMsg, NULL); pMsg->pCont = NULL; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index b8a2f911bc..ea12ca55d4 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -123,11 +123,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { break; } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) { + if (ctx->needFetch && (!ctx->queryRsped) && execNum >= QW_DEFAULT_SHORT_RUN_TIMES) { break; } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { break; } @@ -293,7 +293,6 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *input, SQWPhaseOutput *output) { int32_t code = 0; SQWTaskCtx *ctx = NULL; - SRpcHandleInfo *cancelConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -314,13 +313,13 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu switch (phase) { case QW_PHASE_PRE_QUERY: { - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_ELOG("task already dropped at wrong phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_STATUS_ERROR); break; } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); @@ -334,29 +333,29 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_FETCH: { - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_TASK_WLOG("last fetch still not processed, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } - if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { + if (!ctx->queryRsped) { QW_TASK_ELOG("ready msg has not been processed, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); } break; } case QW_PHASE_PRE_CQUERY: { - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); //qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); @@ -385,11 +384,6 @@ _return: qwReleaseTaskCtx(mgmt, ctx); } - if (cancelConnection) { - qwBuildAndSendCancelRsp(cancelConnection, code); - QW_TASK_DLOG("cancel rsp send, handle:%p, code:%x - %s", cancelConnection->handle, code, tstrerror(code)); - } - if (code != TSDB_CODE_SUCCESS) { QW_TASK_ELOG("end to handle event at phase %s, code:%s", qwPhaseStr(phase), tstrerror(code)); } else { @@ -411,7 +405,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_LOCK(QW_WRITE, &ctx->lock); - if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } @@ -420,10 +414,10 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp connInfo = ctx->ctrlConnInfo; rspConnection = &connInfo; - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY); + ctx->queryRsped = true; } - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_PHASE_POST_FETCH == phase) { QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); @@ -512,7 +506,7 @@ _return: } -int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql) { +int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { int32_t code = 0; bool queryRsped = false; SSubplan *plan = NULL; @@ -525,8 +519,9 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - ctx->taskType = taskType; - ctx->explain = explain; + ctx->taskType = qwMsg->msgInfo.taskType; + ctx->explain = qwMsg->msgInfo.explain; + ctx->needFetch = qwMsg->msgInfo.needFetch; ctx->queryType = qwMsg->msgType; QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -596,7 +591,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, &queryEnd)); - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); @@ -633,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { break; } - if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwFreeFetchRsp(rsp); rsp = NULL; @@ -695,7 +690,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = true; // RC WARNING - if (QW_IS_QUERY_RUNNING(ctx)) { + if (QW_QUERY_RUNNING(ctx)) { atomic_store_8((int8_t *)&ctx->queryContinue, 1); } else if (0 == atomic_load_8((int8_t *)&ctx->queryInQueue)) { qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC); @@ -742,12 +737,12 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = true; - if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG_E("task already dropping"); QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } - if (QW_IS_QUERY_RUNNING(ctx)) { + if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); } else if (ctx->phase > 0) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 8e8652aab5..ff5ce037e7 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -296,8 +296,8 @@ extern SSchedulerMgmt schMgmt; #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) #define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) -#define SCH_IS_DATA_SRC_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) -#define SCH_IS_DATA_SRC_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) +#define SCH_IS_DATA_BIND_QRY_TASK(task) ((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) +#define SCH_IS_DATA_BIND_TASK(task) (((task)->plan->subplanType == SUBPLAN_TYPE_SCAN) || ((task)->plan->subplanType == SUBPLAN_TYPE_MODIFY)) #define SCH_IS_LEAF_TASK(_job, _task) (((_task)->level->level + 1) == (_job)->levelNum) #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) @@ -317,7 +317,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) -#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) +#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level)) #define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0) #define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 858f68e7ae..a305a127e0 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -247,7 +247,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { int32_t schAppendJobDataSrc(SSchJob *pJob, SSchTask *pTask) { - if (!SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (!SCH_IS_DATA_BIND_QRY_TASK(pTask)) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 03a247f828..263401d20e 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -981,6 +981,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->execId = htonl(pTask->execId); pMsg->taskType = TASK_TYPE_TEMP; pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); + pMsg->needFetch = SCH_JOB_NEED_FETCH(pJob); pMsg->phyLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e60006d75c..3d0889d258 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -274,7 +274,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { return TSDB_CODE_SUCCESS; } @@ -311,7 +311,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 pTask->lastMsgType = 0; memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pData) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } @@ -356,7 +356,7 @@ _return: int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode) { int32_t code = 0; - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { if (NULL == pData->pEpSet) { SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); SCH_ERR_JRET(rspCode); @@ -490,7 +490,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo return TSDB_CODE_SUCCESS; } - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { *needRetry = false; SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, @@ -526,7 +526,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { schDeregisterTaskHb(pJob, pTask); - if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); @@ -594,7 +594,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_IS_DATA_BIND_TASK(pTask)) { SCH_TASK_ELOG("no execNode specifed for data src task, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); }