diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 5fee0ad27a..84a85e54ef 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -949,6 +949,11 @@ typedef struct STimeWindow { TSKEY ekey; } STimeWindow; +typedef struct SQueryHint { + bool withHint; + bool batchScan; +} SQueryHint; + typedef struct { int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed int32_t tsLen; // total length of ts comp block @@ -967,12 +972,18 @@ typedef struct { int64_t offset; } SInterval; -typedef struct { - int32_t code; + +typedef struct STbVerInfo { char tbFName[TSDB_TABLE_FNAME_LEN]; int32_t sversion; int32_t tversion; +} STbVerInfo; + + +typedef struct { + int32_t code; int64_t affectedRows; + SArray* tbVerInfo; // STbVerInfo } SQueryTableRsp; int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp); diff --git a/include/common/ttokendef.h b/include/common/ttokendef.h index 8a6b7b5020..43e59346f9 100644 --- a/include/common/ttokendef.h +++ b/include/common/ttokendef.h @@ -361,6 +361,7 @@ #define TK_NK_HEX 603 // hex number 0x123 #define TK_NK_OCT 604 // oct number #define TK_NK_BIN 605 // bin format data 0b111 +#define TK_NK_HINT 606 #define TK_NK_NIL 65535 diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a2f54faba5..580863939f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -156,7 +156,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, * @return */ int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion); + int32_t* tversion, int32_t idx); /** * The main task execution function, including query on both table and multiple tables, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 90072b6053..d4058767bb 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -451,6 +451,10 @@ typedef struct SHashJoinPhysiNode { SNode* pFilterConditions; SNodeList* pTargets; SQueryStat inputStat[2]; + + SNode* pPrimKeyCond; + SNode* pColEqCond; + SNode* pTagEqCond; } SHashJoinPhysiNode; typedef struct SGroupCachePhysiNode { diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 8d1114c76a..71b8badb13 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -90,11 +90,6 @@ typedef struct SExecResult { void* res; } SExecResult; -typedef struct STbVerInfo { - char tbFName[TSDB_TABLE_FNAME_LEN]; - int32_t sversion; - int32_t tversion; -} STbVerInfo; #pragma pack(push, 1) typedef struct SCTableMeta { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6098b26408..7d80e114fb 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5818,11 +5818,18 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp if (tStartEncode(&encoder) < 0) return -1; if (tEncodeI32(&encoder, pRsp->code) < 0) return -1; - if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1; - if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1; if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1; - + int32_t tbNum = taosArrayGetSize(pRsp->tbVerInfo); + if (tEncodeI32(&encoder, tbNum) < 0) return -1; + if (tbNum > 0) { + for (int32_t i = 0; i < tbNum; ++i) { + STbVerInfo *pVer = taosArrayGet(pRsp->tbVerInfo, i); + if (tEncodeCStr(&encoder, pVer->tbFName) < 0) return -1; + if (tEncodeI32(&encoder, pVer->sversion) < 0) return -1; + if (tEncodeI32(&encoder, pVer->tversion) < 0) return -1; + } + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5838,11 +5845,19 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR if (tStartDecode(&decoder) < 0) return -1; if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1; - if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1; - if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1; if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1; - + int32_t tbNum = 0; + if (tDecodeI32(&decoder, &tbNum) < 0) return -1; + if (tbNum > 0) { + pRsp->tbVerInfo = taosArrayInit(tbNum, sizeof(STbVerInfo)); + if (NULL == pRsp->tbVerInfo) return -1; + STbVerInfo tbVer; + if (tDecodeCStrTo(&decoder, tbVer.tbFName) < 0) return -1; + if (tDecodeI32(&decoder, &tbVer.sversion) < 0) return -1; + if (tDecodeI32(&decoder, &tbVer.tversion) < 0) return -1; + if (NULL == taosArrayPush(pRsp->tbVerInfo, &tbVer)) return -1; + } + tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/libs/command/inc/commandInt.h b/source/libs/command/inc/commandInt.h index 2a7aeb0060..996891c77a 100644 --- a/source/libs/command/inc/commandInt.h +++ b/source/libs/command/inc/commandInt.h @@ -67,6 +67,8 @@ extern "C" { #define EXPLAIN_EVENT_FORMAT "Event" #define EXPLAIN_EVENT_START_FORMAT "Start Cond: " #define EXPLAIN_EVENT_END_FORMAT "End Cond: " +#define EXPLAIN_GROUP_CACHE_FORMAT "Group Cache" +#define EXPLAIN_DYN_QRY_CTRL_FORMAT "Dynamic Query Control for %s" #define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms" #define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms" @@ -96,6 +98,12 @@ extern "C" { #define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64 #define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64 #define EXPLAIN_PARTITIONS_FORMAT "partitions=%d" +#define EXPLAIN_GLOBAL_GROUP_FORMAT "global_group=%d" +#define EXPLAIN_GROUP_BY_UID_FORMAT "group_by_uid=%d" +#define EXPLAIN_BATCH_SCAN_FORMAT "batch_scan=%d" +#define EXPLAIN_VGROUP_SLOT_FORMAT "vgroup_slot=%d,%d" +#define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d" +#define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d" #define COMMAND_RESET_LOG "resetLog" #define COMMAND_SCHEDULE_POLICY "schedulePolicy" diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 5e13e6890d..a7f503d87e 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -24,6 +24,17 @@ int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes); int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel); +char *qExplainGetDynQryCtrlType(EDynQueryType type) { + switch (type) { + case DYN_QTYPE_STB_HASH: + return "STable Join"; + default: + break; + } + + return "unknown task"; +} + void qExplainFreeResNode(SExplainResNode *resNode) { if (NULL == resNode) { return; @@ -1522,6 +1533,168 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i } break; } + case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:{ + SHashJoinPhysiNode *pJoinNode = (SHashJoinPhysiNode *)pNode; + EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType)); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pJoinNode->pTargets->length); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pJoinNode->node.inputTsOrder)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pJoinNode->node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_APPEND_LIMIT(pJoinNode->node.pLimit); + EXPLAIN_ROW_APPEND_SLIMIT(pJoinNode->node.pSlimit); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pJoinNode->node.pConditions || pJoinNode->pFilterConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + if (pJoinNode->node.pConditions) { + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + } + if (pJoinNode->pFilterConditions) { + if (pJoinNode->node.pConditions) { + EXPLAIN_ROW_APPEND(" AND "); + } + QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFilterConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + } + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + + bool conditionsGot = false; + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT); + if (pJoinNode->pPrimKeyCond) { + QRY_ERR_RET( + nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + conditionsGot = true; + } + if (pJoinNode->pColEqCond) { + if (conditionsGot) { + EXPLAIN_ROW_APPEND(" AND "); + } + QRY_ERR_RET( + nodesNodeToSQL(pJoinNode->pColEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + conditionsGot = true; + } + if (pJoinNode->pTagEqCond) { + if (conditionsGot) { + EXPLAIN_ROW_APPEND(" AND "); + } + QRY_ERR_RET( + nodesNodeToSQL(pJoinNode->pTagEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + conditionsGot = true; + } + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:{ + SGroupCachePhysiNode *pGroupCache = (SGroupCachePhysiNode *)pNode; + EXPLAIN_ROW_NEW(level, EXPLAIN_GROUP_CACHE_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + EXPLAIN_ROW_APPEND(EXPLAIN_GLOBAL_GROUP_FORMAT, pGroupCache->globalGrp); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_GROUP_BY_UID_FORMAT, pGroupCache->grpByUid); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_BATCH_SCAN_FORMAT, pGroupCache->batchFetch); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pGroupCache->node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pGroupCache->node.inputTsOrder)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pGroupCache->node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pGroupCache->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_APPEND_LIMIT(pGroupCache->node.pLimit); + EXPLAIN_ROW_APPEND_SLIMIT(pGroupCache->node.pSlimit); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pGroupCache->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pGroupCache->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + } + break; + } + case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:{ + SDynQueryCtrlPhysiNode *pDyn = (SDynQueryCtrlPhysiNode *)pNode; + EXPLAIN_ROW_NEW(level, EXPLAIN_DYN_QRY_CTRL_FORMAT, qExplainGetDynQryCtrlType(pDyn->qType)); + EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); + if (pResNode->pExecInfo) { + QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + } + EXPLAIN_ROW_APPEND(EXPLAIN_BATCH_SCAN_FORMAT, pDyn->stbJoin.batchFetch); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_VGROUP_SLOT_FORMAT, pDyn->stbJoin.vgSlot[0], pDyn->stbJoin.vgSlot[1]); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_UID_SLOT_FORMAT, pDyn->stbJoin.uidSlot[0], pDyn->stbJoin.uidSlot[1]); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_SRC_SCAN_FORMAT, pDyn->stbJoin.srcScan[0], pDyn->stbJoin.srcScan[1]); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDyn->node.pOutputDataBlockDesc->totalRowSize); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pDyn->node.inputTsOrder)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); + + if (verbose) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, + nodesGetOutputNumFromSlotList(pDyn->node.pOutputDataBlockDesc->pSlots)); + EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); + EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDyn->node.pOutputDataBlockDesc->outputRowSize); + EXPLAIN_ROW_APPEND_LIMIT(pDyn->node.pLimit); + EXPLAIN_ROW_APPEND_SLIMIT(pDyn->node.pSlimit); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + + if (pDyn->node.pConditions) { + EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); + QRY_ERR_RET(nodesNodeToSQL(pDyn->node.pConditions, tbuf + VARSTR_HEADER_SIZE, + TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); + EXPLAIN_ROW_END(); + QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); + } + } + break; + } default: qError("not supported physical node type %d", pNode->type); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 401b532a69..85d64e0716 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -641,7 +641,7 @@ typedef struct SStreamFillOperatorInfo { SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode); int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo); -void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo); +void cleanupQueriedTableScanInfo(void* p); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 9623c7c93c..9029adbdde 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -84,6 +84,7 @@ struct SExecTaskInfo { int32_t qbufQuota; // total available buffer (in KB) during execution query int64_t version; // used for stream to record wal version, why not move to sschemainfo SStreamTaskInfo streamInfo; + SArray* schemaInfos; SSchemaInfo schemaInfo; const char* sql; // query sql string jmp_buf env; // jump to this position when error happens. diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 52d0068f7d..294aabfeb1 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -330,7 +330,8 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask return TSDB_CODE_OUT_OF_MEMORY; } - SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw; + SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos); + SSchemaWrapper* pWrapper = pSchemaInfo->sw; for (int32_t i = 0; i < numOfCols; ++i) { SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i); diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index fa332222dd..f041eb31a8 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -357,6 +357,7 @@ static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t if (code) { return code; } + taosArrayDestroy(pUidList); *(SArray**)p = NULL; } @@ -532,9 +533,11 @@ static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, in return TSDB_CODE_OUT_OF_MEMORY; } if (NULL == taosArrayPush(pArray, pVal)) { + taosArrayDestroy(pArray); return TSDB_CODE_OUT_OF_MEMORY; } if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) { + taosArrayDestroy(pArray); return TSDB_CODE_OUT_OF_MEMORY; } return TSDB_CODE_SUCCESS; @@ -748,6 +751,8 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR } *ppRes = NULL; + setOperatorCompleted(pOperator); + return; } @@ -760,20 +765,32 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) { return pRes; } + int64_t st = 0; + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } + if (!pStbJoin->ctx.prev.joinBuild) { buildStbJoinTableList(pOperator); if (pStbJoin->execInfo.prevBlkRows <= 0) { - pOperator->status = OP_EXEC_DONE; - return NULL; + setOperatorCompleted(pOperator); + goto _return; } } seqJoinContinueCurrRetrieve(pOperator, &pRes); if (pRes) { - return pRes; + goto _return; } seqJoinLaunchNewRetrieve(pOperator, &pRes); + +_return: + + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + } + return pRes; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index c1bd01d2a3..6b21151f93 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -820,6 +820,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { code = addSingleExchangeSource(pOperator, pBasicParam); } + freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM); pOperator->pOperatorGetParam = NULL; return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9bb24e7a7d..4ec134f382 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -474,28 +474,30 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI } int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion, - int32_t* tversion) { + int32_t* tversion, int32_t idx) { ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - if (pTaskInfo->schemaInfo.sw == NULL) { - return TSDB_CODE_SUCCESS; + if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) { + return -1; } - *sversion = pTaskInfo->schemaInfo.sw->version; - *tversion = pTaskInfo->schemaInfo.tversion; - if (pTaskInfo->schemaInfo.dbname) { - strcpy(dbName, pTaskInfo->schemaInfo.dbname); + SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx); + + *sversion = pSchemaInfo->sw->version; + *tversion = pSchemaInfo->tversion; + if (pSchemaInfo->dbname) { + strcpy(dbName, pSchemaInfo->dbname); } else { dbName[0] = 0; } - if (pTaskInfo->schemaInfo.tablename) { - strcpy(tableName, pTaskInfo->schemaInfo.tablename); + if (pSchemaInfo->tablename) { + strcpy(tableName, pSchemaInfo->tablename); } else { tableName[0] = 0; } - return 0; + return TSDB_CODE_SUCCESS; } bool qIsDynamicExecTask(qTaskInfo_t tinfo) { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 6bbc1cb327..c19b567acd 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -801,6 +801,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam* return TSDB_CODE_OUT_OF_MEMORY; } taosWUnLockLatch(&pCtx->grpLock); + + taosArrayDestroy(pParam->pChildren); + pParam->pChildren = NULL; } return TSDB_CODE_SUCCESS; @@ -1275,6 +1278,11 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) { static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { SSDataBlock* pBlock = NULL; + int64_t st = 0; + + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam); if (TSDB_CODE_SUCCESS != code) { @@ -1282,6 +1290,10 @@ static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperator T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); } + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + } + return pBlock; } diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index 77588e02f2..c26ad2dddc 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -735,8 +735,7 @@ static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock* } static void setHJoinDone(struct SOperatorInfo* pOperator) { - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - pOperator->status = OP_EXEC_DONE; + setOperatorCompleted(pOperator); SHJoinOperatorInfo* pInfo = pOperator->info; destroyHJoinKeyHash(&pInfo->pKeyHash); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 16e0c8c1d2..ede5e798d9 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -652,7 +652,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t } static void setMergeJoinDone(SOperatorInfo* pOperator) { - pOperator->status = OP_EXEC_DONE; + setOperatorCompleted(pOperator); freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM); freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM); pOperator->pDownstreamGetParams[0] = NULL; @@ -794,6 +794,11 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { } } + int64_t st = 0; + if (pOperator->cost.openCost == 0) { + st = taosGetTimestampUs(); + } + SSDataBlock* pRes = pJoinInfo->pRes; blockDataCleanup(pRes); @@ -814,6 +819,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { break; } } + + if (pOperator->cost.openCost == 0) { + pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; + } if (pRes->info.rows > 0) { pJoinInfo->resRows += pRes->info.rows; diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index cb01d2be32..b32dd1990f 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -369,7 +369,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR } } - pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); + //pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c index 96fe1bf2f9..642a3e35ce 100644 --- a/source/libs/executor/src/querytask.c +++ b/source/libs/executor/src/querytask.c @@ -57,6 +57,8 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP pTaskInfo->id.taskId = taskId; pTaskInfo->id.str = taosMemoryMalloc(64); buildTaskId(taskId, queryId, pTaskInfo->id.str); + pTaskInfo->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo)); + return pTaskInfo; } @@ -107,7 +109,9 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand } } -void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) { +void cleanupQueriedTableScanInfo(void* p) { + SSchemaInfo* pSchemaInfo = p; + taosMemoryFreeClear(pSchemaInfo->dbname); taosMemoryFreeClear(pSchemaInfo->tablename); tDeleteSchemaWrapper(pSchemaInfo->sw); @@ -133,14 +137,14 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo return terrno; } - SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo; + SSchemaInfo schemaInfo = {0}; - pSchemaInfo->tablename = taosStrdup(mr.me.name); - pSchemaInfo->dbname = taosStrdup(dbName); + schemaInfo.tablename = taosStrdup(mr.me.name); + schemaInfo.dbname = taosStrdup(dbName); if (mr.me.type == TSDB_SUPER_TABLE) { - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; + schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else if (mr.me.type == TSDB_CHILD_TABLE) { tDecoderClear(&mr.coder); @@ -148,18 +152,23 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid); if (code != TSDB_CODE_SUCCESS) { pAPI->metaReaderFn.clearReader(&mr); + taosMemoryFree(schemaInfo.tablename); + taosMemoryFree(schemaInfo.dbname); return terrno; } - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); - pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version; + schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow); + schemaInfo.tversion = mr.me.stbEntry.schemaTag.version; } else { - pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); + schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow); } pAPI->metaReaderFn.clearReader(&mr); - pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode); + schemaInfo.qsw = extractQueriedColumnSchema(pScanNode); + + taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo); + return TSDB_CODE_SUCCESS; } @@ -212,7 +221,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) { destroyOperator(pTaskInfo->pRoot); pTaskInfo->pRoot = NULL; - cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo); + taosArrayDestroyEx(pTaskInfo->schemaInfos, cleanupQueriedTableScanInfo); cleanupStreamInfo(&pTaskInfo->streamInfo); if (!pTaskInfo->localFetch.localExec) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 026ddc9bf4..dfaacecc49 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -933,7 +933,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo, } else { int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE) : (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE); - tagVarChar = taosMemoryMalloc(bufSize); + tagVarChar = taosMemoryCalloc(1, bufSize + 1); int32_t len = -1; convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len); varDataSetLen(tagVarChar, len); diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index e2e26b4737..a53eb22df8 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -1261,6 +1261,10 @@ void nodesDestroyNode(SNode* pNode) { nodesDestroyList(pPhyNode->pOnRight); nodesDestroyNode(pPhyNode->pFilterConditions); nodesDestroyList(pPhyNode->pTargets); + + nodesDestroyNode(pPhyNode->pPrimKeyCond); + nodesDestroyNode(pPhyNode->pColEqCond); + nodesDestroyNode(pPhyNode->pTagEqCond); break; } case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: { diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 6c3f589159..ae577cd4ee 100755 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -1009,10 +1009,10 @@ join_type(A) ::= INNER. /************************************************ query_specification *************************************************/ query_specification(A) ::= - SELECT set_quantifier_opt(B) select_list(C) from_clause_opt(D) + SELECT hint_opt(M) set_quantifier_opt(B) select_list(C) from_clause_opt(D) where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K) fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). { - A = createSelectStmt(pCxt, B, C, D); + A = createSelectStmt(pCxt, B, C, D, M); A = addWhereClause(pCxt, A, E); A = addPartitionByClause(pCxt, A, F); A = addWindowClauseClause(pCxt, A, G); @@ -1023,6 +1023,11 @@ query_specification(A) ::= A = addFillClause(pCxt, A, L); } +%type hint_opt { SQueryHint } +%destructor hint_opt { } +hint_opt(A) ::= . { A.withHint = false; } +hint_opt(A) ::= NO_BATCH_SCAN. { A.withHint = true; A.batchScan = false; } + %type set_quantifier_opt { bool } %destructor set_quantifier_opt { } set_quantifier_opt(A) ::= . { A = false; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index ca7ac1a0b6..42af7180a8 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -398,10 +398,14 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) { *tokenId = TK_NK_SLASH; return 1; } + bool isHint = false; + if (z[2] == '+') { + isHint = TK_NK_HINT; + } for (i = 3; z[i] && (z[i] != '/' || z[i - 1] != '*'); i++) { } if (z[i]) i++; - *tokenId = TK_NK_COMMENT; + *tokenId = isHint ? TK_NK_HINT : TK_NK_COMMENT; return i; } case '%': { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index dfce643408..de2e967f58 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -921,15 +921,12 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil pJoin->joinType = pJoinLogicNode->joinType; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; - SNode* pPrimKeyCond = NULL; - SNode* pColEqCond = NULL; - SNode* pTagEqCond = NULL; - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pPrimKeyCond); + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond); if (TSDB_CODE_SUCCESS == code) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pColEqCond); + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond); } if (TSDB_CODE_SUCCESS == code) { - code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pTagEqCond); + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond); } if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) { code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions); @@ -941,7 +938,7 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin); } if (TSDB_CODE_SUCCESS == code) { - code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pPrimKeyCond, pColEqCond, pTagEqCond, pJoin); + code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin); } if (TSDB_CODE_SUCCESS == code) { code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); @@ -950,10 +947,6 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc); } - nodesDestroyNode(pPrimKeyCond); - nodesDestroyNode(pColEqCond); - nodesDestroyNode(pTagEqCond); - if (TSDB_CODE_SUCCESS == code) { *pPhyNode = (SPhysiNode*)pJoin; } else { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index e086d89d0e..8797a8cf6b 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -144,7 +144,7 @@ typedef struct SQWTaskCtx { SArray *explainRes; void *taskHandle; void *sinkHandle; - STbVerInfo tbInfo; + SArray *tbInfo; // STbVerInfo } SQWTaskCtx; typedef struct SQWSchStatus { diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 09a3af295c..d28667d247 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -128,11 +128,11 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) { QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, " "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " - "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d", + "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbNum:%d, events:%d,%d,%d,%d,%d", ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType, ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, - ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName, - ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], + ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, (int32_t)taosArrayGetSize(ctx->tbInfo), + ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); pIter = taosHashIterate(mgmt->ctxHash, pIter); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index ec4fd8f9ce..0cbcf44ed4 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -63,17 +63,11 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c } int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { - STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL; int64_t affectedRows = ctx ? ctx->affectedRows : 0; SQueryTableRsp rsp = {0}; rsp.code = code; rsp.affectedRows = affectedRows; - - if (tbInfo) { - strcpy(rsp.tbFName, tbInfo->tbFName); - rsp.sversion = tbInfo->sversion; - rsp.tversion = tbInfo->tversion; - } + rsp.tbVerInfo = ctx->tbInfo; int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp); if (msgSize < 0) { diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 13a1a096b0..4c54c3548d 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -312,6 +312,8 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { ctx->sinkHandle = NULL; qDebug("sink handle destroyed"); } + + taosArrayDestroy(ctx->tbInfo); } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { @@ -462,15 +464,29 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) { } void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) { - char dbFName[TSDB_DB_FNAME_LEN] = {0}; - char tbName[TSDB_TABLE_NAME_LEN] = {0}; + char dbFName[TSDB_DB_FNAME_LEN]; + char tbName[TSDB_TABLE_NAME_LEN]; + STbVerInfo tbInfo; + int32_t i = 0; - qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion); + while (true) { + if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) { + break; + } - if (dbFName[0] && tbName[0]) { - sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName); - } else { - ctx->tbInfo.tbFName[0] = 0; + if (dbFName[0] && tbName[0]) { + sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName); + } else { + tbInfo.tbFName[0] = 0; + } + + if (NULL == ctx->tbInfo) { + ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo)); + } + + taosArrayPush(ctx->tbInfo, &tbInfo); + + i++; } } diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 78e0807775..d2ed26d405 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -588,7 +588,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { } int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { - if (rsp->tbFName[0]) { + if (rsp->tbVerInfo) { SCH_LOCK(SCH_WRITE, &pJob->resLock); if (NULL == pJob->execRes.res) { @@ -599,12 +599,9 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { } } - STbVerInfo tbInfo; - strcpy(tbInfo.tbFName, rsp->tbFName); - tbInfo.sversion = rsp->sversion; - tbInfo.tversion = rsp->tversion; - - taosArrayPush((SArray *)pJob->execRes.res, &tbInfo); + taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo)); + taosArrayDestroy(rsp->tbVerInfo); + pJob->execRes.msgType = TDMT_SCH_QUERY; SCH_UNLOCK(SCH_WRITE, &pJob->resLock);