fix: fix memory issues

This commit is contained in:
dapan1121 2023-08-11 17:39:41 +08:00
parent ce1b294c51
commit c9656d3f77
29 changed files with 357 additions and 86 deletions

View File

@ -949,6 +949,11 @@ typedef struct STimeWindow {
TSKEY ekey;
} STimeWindow;
typedef struct SQueryHint {
bool withHint;
bool batchScan;
} SQueryHint;
typedef struct {
int32_t tsOffset; // offset value in current msg body, NOTE: ts list is compressed
int32_t tsLen; // total length of ts comp block
@ -967,12 +972,18 @@ typedef struct {
int64_t offset;
} SInterval;
typedef struct {
int32_t code;
typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} STbVerInfo;
typedef struct {
int32_t code;
int64_t affectedRows;
SArray* tbVerInfo; // STbVerInfo
} SQueryTableRsp;
int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);

View File

@ -361,6 +361,7 @@
#define TK_NK_HEX 603 // hex number 0x123
#define TK_NK_OCT 604 // oct number
#define TK_NK_BIN 605 // bin format data 0b111
#define TK_NK_HINT 606
#define TK_NK_NIL 65535

View File

@ -156,7 +156,7 @@ int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId,
* @return
*/
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion);
int32_t* tversion, int32_t idx);
/**
* The main task execution function, including query on both table and multiple tables,

View File

@ -451,6 +451,10 @@ typedef struct SHashJoinPhysiNode {
SNode* pFilterConditions;
SNodeList* pTargets;
SQueryStat inputStat[2];
SNode* pPrimKeyCond;
SNode* pColEqCond;
SNode* pTagEqCond;
} SHashJoinPhysiNode;
typedef struct SGroupCachePhysiNode {

View File

@ -90,11 +90,6 @@ typedef struct SExecResult {
void* res;
} SExecResult;
typedef struct STbVerInfo {
char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t sversion;
int32_t tversion;
} STbVerInfo;
#pragma pack(push, 1)
typedef struct SCTableMeta {

View File

@ -5818,11 +5818,18 @@ int32_t tSerializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pRsp
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->code) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbFName) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->sversion) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->tversion) < 0) return -1;
if (tEncodeI64(&encoder, pRsp->affectedRows) < 0) return -1;
int32_t tbNum = taosArrayGetSize(pRsp->tbVerInfo);
if (tEncodeI32(&encoder, tbNum) < 0) return -1;
if (tbNum > 0) {
for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo *pVer = taosArrayGet(pRsp->tbVerInfo, i);
if (tEncodeCStr(&encoder, pVer->tbFName) < 0) return -1;
if (tEncodeI32(&encoder, pVer->sversion) < 0) return -1;
if (tEncodeI32(&encoder, pVer->tversion) < 0) return -1;
}
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -5838,11 +5845,19 @@ int32_t tDeserializeSQueryTableRsp(void *buf, int32_t bufLen, SQueryTableRsp *pR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->code) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbFName) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->sversion) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->tversion) < 0) return -1;
if (tDecodeI64(&decoder, &pRsp->affectedRows) < 0) return -1;
int32_t tbNum = 0;
if (tDecodeI32(&decoder, &tbNum) < 0) return -1;
if (tbNum > 0) {
pRsp->tbVerInfo = taosArrayInit(tbNum, sizeof(STbVerInfo));
if (NULL == pRsp->tbVerInfo) return -1;
STbVerInfo tbVer;
if (tDecodeCStrTo(&decoder, tbVer.tbFName) < 0) return -1;
if (tDecodeI32(&decoder, &tbVer.sversion) < 0) return -1;
if (tDecodeI32(&decoder, &tbVer.tversion) < 0) return -1;
if (NULL == taosArrayPush(pRsp->tbVerInfo, &tbVer)) return -1;
}
tEndDecode(&decoder);
tDecoderClear(&decoder);

View File

@ -67,6 +67,8 @@ extern "C" {
#define EXPLAIN_EVENT_FORMAT "Event"
#define EXPLAIN_EVENT_START_FORMAT "Start Cond: "
#define EXPLAIN_EVENT_END_FORMAT "End Cond: "
#define EXPLAIN_GROUP_CACHE_FORMAT "Group Cache"
#define EXPLAIN_DYN_QRY_CTRL_FORMAT "Dynamic Query Control for %s"
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
@ -96,6 +98,12 @@ extern "C" {
#define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64
#define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64
#define EXPLAIN_PARTITIONS_FORMAT "partitions=%d"
#define EXPLAIN_GLOBAL_GROUP_FORMAT "global_group=%d"
#define EXPLAIN_GROUP_BY_UID_FORMAT "group_by_uid=%d"
#define EXPLAIN_BATCH_SCAN_FORMAT "batch_scan=%d"
#define EXPLAIN_VGROUP_SLOT_FORMAT "vgroup_slot=%d,%d"
#define EXPLAIN_UID_SLOT_FORMAT "uid_slot=%d,%d"
#define EXPLAIN_SRC_SCAN_FORMAT "src_scan=%d,%d"
#define COMMAND_RESET_LOG "resetLog"
#define COMMAND_SCHEDULE_POLICY "schedulePolicy"

View File

@ -24,6 +24,17 @@
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level, bool singleChannel);
char *qExplainGetDynQryCtrlType(EDynQueryType type) {
switch (type) {
case DYN_QTYPE_STB_HASH:
return "STable Join";
default:
break;
}
return "unknown task";
}
void qExplainFreeResNode(SExplainResNode *resNode) {
if (NULL == resNode) {
return;
@ -1522,6 +1533,168 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_JOIN:{
SHashJoinPhysiNode *pJoinNode = (SHashJoinPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pJoinNode->pTargets->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pJoinNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pJoinNode->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pJoinNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pJoinNode->node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pJoinNode->node.pConditions || pJoinNode->pFilterConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
if (pJoinNode->node.pConditions) {
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
if (pJoinNode->pFilterConditions) {
if (pJoinNode->node.pConditions) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->pFilterConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
bool conditionsGot = false;
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ON_CONDITIONS_FORMAT);
if (pJoinNode->pPrimKeyCond) {
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pPrimKeyCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
conditionsGot = true;
}
if (pJoinNode->pColEqCond) {
if (conditionsGot) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pColEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
conditionsGot = true;
}
if (pJoinNode->pTagEqCond) {
if (conditionsGot) {
EXPLAIN_ROW_APPEND(" AND ");
}
QRY_ERR_RET(
nodesNodeToSQL(pJoinNode->pTagEqCond, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
conditionsGot = true;
}
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE:{
SGroupCachePhysiNode *pGroupCache = (SGroupCachePhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_GROUP_CACHE_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_GLOBAL_GROUP_FORMAT, pGroupCache->globalGrp);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_GROUP_BY_UID_FORMAT, pGroupCache->grpByUid);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_BATCH_SCAN_FORMAT, pGroupCache->batchFetch);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pGroupCache->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pGroupCache->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pGroupCache->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pGroupCache->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pGroupCache->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pGroupCache->node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pGroupCache->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pGroupCache->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
case QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL:{
SDynQueryCtrlPhysiNode *pDyn = (SDynQueryCtrlPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_DYN_QRY_CTRL_FORMAT, qExplainGetDynQryCtrlType(pDyn->qType));
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_BATCH_SCAN_FORMAT, pDyn->stbJoin.batchFetch);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_VGROUP_SLOT_FORMAT, pDyn->stbJoin.vgSlot[0], pDyn->stbJoin.vgSlot[1]);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_UID_SLOT_FORMAT, pDyn->stbJoin.uidSlot[0], pDyn->stbJoin.uidSlot[1]);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_SRC_SCAN_FORMAT, pDyn->stbJoin.srcScan[0], pDyn->stbJoin.srcScan[1]);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDyn->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pDyn->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pDyn->node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDyn->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pDyn->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pDyn->node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
if (pDyn->node.pConditions) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
QRY_ERR_RET(nodesNodeToSQL(pDyn->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
}
}
break;
}
default:
qError("not supported physical node type %d", pNode->type);
return TSDB_CODE_APP_ERROR;

View File

@ -641,7 +641,7 @@ typedef struct SStreamFillOperatorInfo {
SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode);
int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNode, const char* dbName, SExecTaskInfo* pTaskInfo);
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo);
void cleanupQueriedTableScanInfo(void* p);
void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock);
void cleanupBasicInfo(SOptrBasicInfo* pInfo);

View File

@ -84,6 +84,7 @@ struct SExecTaskInfo {
int32_t qbufQuota; // total available buffer (in KB) during execution query
int64_t version; // used for stream to record wal version, why not move to sschemainfo
SStreamTaskInfo streamInfo;
SArray* schemaInfos;
SSchemaInfo schemaInfo;
const char* sql; // query sql string
jmp_buf env; // jump to this position when error happens.

View File

@ -330,7 +330,8 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
SSchemaInfo* pSchemaInfo = taosArrayGetLast(pTaskInfo->schemaInfos);
SSchemaWrapper* pWrapper = pSchemaInfo->sw;
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);

View File

@ -357,6 +357,7 @@ static int32_t buildBatchTableScanOperatorParam(SOperatorParam** ppRes, int32_t
if (code) {
return code;
}
taosArrayDestroy(pUidList);
*(SArray**)p = NULL;
}
@ -532,9 +533,11 @@ static FORCE_INLINE int32_t addToJoinVgroupHash(SSHashObj* pHash, void* pKey, in
return TSDB_CODE_OUT_OF_MEMORY;
}
if (NULL == taosArrayPush(pArray, pVal)) {
taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
if (tSimpleHashPut(pHash, pKey, keySize, &pArray, POINTER_BYTES)) {
taosArrayDestroy(pArray);
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
@ -748,6 +751,8 @@ static void seqJoinLaunchNewRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppR
}
*ppRes = NULL;
setOperatorCompleted(pOperator);
return;
}
@ -760,20 +765,32 @@ SSDataBlock* seqStableJoin(SOperatorInfo* pOperator) {
return pRes;
}
int64_t st = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
if (!pStbJoin->ctx.prev.joinBuild) {
buildStbJoinTableList(pOperator);
if (pStbJoin->execInfo.prevBlkRows <= 0) {
pOperator->status = OP_EXEC_DONE;
return NULL;
setOperatorCompleted(pOperator);
goto _return;
}
}
seqJoinContinueCurrRetrieve(pOperator, &pRes);
if (pRes) {
return pRes;
goto _return;
}
seqJoinLaunchNewRetrieve(pOperator, &pRes);
_return:
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
return pRes;
}

View File

@ -820,6 +820,7 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
code = addSingleExchangeSource(pOperator, pBasicParam);
}
freeOperatorParam(pOperator->pOperatorGetParam, OP_GET_PARAM);
pOperator->pOperatorGetParam = NULL;
return TSDB_CODE_SUCCESS;

View File

@ -474,28 +474,30 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI
}
int32_t qGetQueryTableSchemaVersion(qTaskInfo_t tinfo, char* dbName, char* tableName, int32_t* sversion,
int32_t* tversion) {
int32_t* tversion, int32_t idx) {
ASSERT(tinfo != NULL && dbName != NULL && tableName != NULL);
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
if (pTaskInfo->schemaInfo.sw == NULL) {
return TSDB_CODE_SUCCESS;
if (taosArrayGetSize(pTaskInfo->schemaInfos) <= idx) {
return -1;
}
*sversion = pTaskInfo->schemaInfo.sw->version;
*tversion = pTaskInfo->schemaInfo.tversion;
if (pTaskInfo->schemaInfo.dbname) {
strcpy(dbName, pTaskInfo->schemaInfo.dbname);
SSchemaInfo* pSchemaInfo = taosArrayGet(pTaskInfo->schemaInfos, idx);
*sversion = pSchemaInfo->sw->version;
*tversion = pSchemaInfo->tversion;
if (pSchemaInfo->dbname) {
strcpy(dbName, pSchemaInfo->dbname);
} else {
dbName[0] = 0;
}
if (pTaskInfo->schemaInfo.tablename) {
strcpy(tableName, pTaskInfo->schemaInfo.tablename);
if (pSchemaInfo->tablename) {
strcpy(tableName, pSchemaInfo->tablename);
} else {
tableName[0] = 0;
}
return 0;
return TSDB_CODE_SUCCESS;
}
bool qIsDynamicExecTask(qTaskInfo_t tinfo) {

View File

@ -801,6 +801,9 @@ static int32_t addNewGroupData(struct SOperatorInfo* pOperator, SOperatorParam*
return TSDB_CODE_OUT_OF_MEMORY;
}
taosWUnLockLatch(&pCtx->grpLock);
taosArrayDestroy(pParam->pChildren);
pParam->pChildren = NULL;
}
return TSDB_CODE_SUCCESS;
@ -1275,6 +1278,11 @@ static int32_t initGroupCacheDownstreamCtx(SOperatorInfo* pOperator) {
static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperatorParam* pParam) {
SSDataBlock* pBlock = NULL;
int64_t st = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
int32_t code = getBlkFromGroupCache(pOperator, &pBlock, pParam);
if (TSDB_CODE_SUCCESS != code) {
@ -1282,6 +1290,10 @@ static SSDataBlock* groupCacheGetNext(struct SOperatorInfo* pOperator, SOperator
T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code);
}
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
return pBlock;
}

View File

@ -735,8 +735,7 @@ static int32_t launchBlockHashJoin(struct SOperatorInfo* pOperator, SSDataBlock*
}
static void setHJoinDone(struct SOperatorInfo* pOperator) {
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
pOperator->status = OP_EXEC_DONE;
setOperatorCompleted(pOperator);
SHJoinOperatorInfo* pInfo = pOperator->info;
destroyHJoinKeyHash(&pInfo->pKeyHash);

View File

@ -652,7 +652,7 @@ static int32_t mergeJoinJoinDownstreamTsRanges(SOperatorInfo* pOperator, int64_t
}
static void setMergeJoinDone(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE;
setOperatorCompleted(pOperator);
freeOperatorParam(pOperator->pDownstreamGetParams[0], OP_GET_PARAM);
freeOperatorParam(pOperator->pDownstreamGetParams[1], OP_GET_PARAM);
pOperator->pDownstreamGetParams[0] = NULL;
@ -794,6 +794,11 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
}
}
int64_t st = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
}
SSDataBlock* pRes = pJoinInfo->pRes;
blockDataCleanup(pRes);
@ -814,6 +819,10 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
break;
}
}
if (pOperator->cost.openCost == 0) {
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
}
if (pRes->info.rows > 0) {
pJoinInfo->resRows += pRes->info.rows;

View File

@ -369,7 +369,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
}
}
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
//pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTableListInfo, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;

View File

@ -57,6 +57,8 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
pTaskInfo->id.taskId = taskId;
pTaskInfo->id.str = taosMemoryMalloc(64);
buildTaskId(taskId, queryId, pTaskInfo->id.str);
pTaskInfo->schemaInfos = taosArrayInit(1, sizeof(SSchemaInfo));
return pTaskInfo;
}
@ -107,7 +109,9 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
}
}
void cleanupQueriedTableScanInfo(SSchemaInfo* pSchemaInfo) {
void cleanupQueriedTableScanInfo(void* p) {
SSchemaInfo* pSchemaInfo = p;
taosMemoryFreeClear(pSchemaInfo->dbname);
taosMemoryFreeClear(pSchemaInfo->tablename);
tDeleteSchemaWrapper(pSchemaInfo->sw);
@ -133,14 +137,14 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
return terrno;
}
SSchemaInfo* pSchemaInfo = &pTaskInfo->schemaInfo;
SSchemaInfo schemaInfo = {0};
pSchemaInfo->tablename = taosStrdup(mr.me.name);
pSchemaInfo->dbname = taosStrdup(dbName);
schemaInfo.tablename = taosStrdup(mr.me.name);
schemaInfo.dbname = taosStrdup(dbName);
if (mr.me.type == TSDB_SUPER_TABLE) {
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else if (mr.me.type == TSDB_CHILD_TABLE) {
tDecoderClear(&mr.coder);
@ -148,18 +152,23 @@ int32_t initQueriedTableSchemaInfo(SReadHandle* pHandle, SScanPhysiNode* pScanNo
code = pAPI->metaReaderFn.getEntryGetUidCache(&mr, suid);
if (code != TSDB_CODE_SUCCESS) {
pAPI->metaReaderFn.clearReader(&mr);
taosMemoryFree(schemaInfo.tablename);
taosMemoryFree(schemaInfo.dbname);
return terrno;
}
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
pSchemaInfo->tversion = mr.me.stbEntry.schemaTag.version;
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.stbEntry.schemaRow);
schemaInfo.tversion = mr.me.stbEntry.schemaTag.version;
} else {
pSchemaInfo->sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
schemaInfo.sw = tCloneSSchemaWrapper(&mr.me.ntbEntry.schemaRow);
}
pAPI->metaReaderFn.clearReader(&mr);
pSchemaInfo->qsw = extractQueriedColumnSchema(pScanNode);
schemaInfo.qsw = extractQueriedColumnSchema(pScanNode);
taosArrayPush(pTaskInfo->schemaInfos, &schemaInfo);
return TSDB_CODE_SUCCESS;
}
@ -212,7 +221,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
destroyOperator(pTaskInfo->pRoot);
pTaskInfo->pRoot = NULL;
cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo);
taosArrayDestroyEx(pTaskInfo->schemaInfos, cleanupQueriedTableScanInfo);
cleanupStreamInfo(&pTaskInfo->streamInfo);
if (!pTaskInfo->localFetch.localExec) {

View File

@ -933,7 +933,7 @@ static int32_t sysTableUserTagsFillOneTableTags(const SSysTableScanInfo* pInfo,
} else {
int32_t bufSize = IS_VAR_DATA_TYPE(tagType) ? (tagLen + VARSTR_HEADER_SIZE)
: (3 + DBL_MANT_DIG - DBL_MIN_EXP + VARSTR_HEADER_SIZE);
tagVarChar = taosMemoryMalloc(bufSize);
tagVarChar = taosMemoryCalloc(1, bufSize + 1);
int32_t len = -1;
convertTagDataToStr(varDataVal(tagVarChar), tagType, tagData, tagLen, &len);
varDataSetLen(tagVarChar, len);

View File

@ -1261,6 +1261,10 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyList(pPhyNode->pOnRight);
nodesDestroyNode(pPhyNode->pFilterConditions);
nodesDestroyList(pPhyNode->pTargets);
nodesDestroyNode(pPhyNode->pPrimKeyCond);
nodesDestroyNode(pPhyNode->pColEqCond);
nodesDestroyNode(pPhyNode->pTagEqCond);
break;
}
case QUERY_NODE_PHYSICAL_PLAN_HASH_AGG: {

View File

@ -1009,10 +1009,10 @@ join_type(A) ::= INNER.
/************************************************ query_specification *************************************************/
query_specification(A) ::=
SELECT set_quantifier_opt(B) select_list(C) from_clause_opt(D)
SELECT hint_opt(M) set_quantifier_opt(B) select_list(C) from_clause_opt(D)
where_clause_opt(E) partition_by_clause_opt(F) range_opt(J) every_opt(K)
fill_opt(L) twindow_clause_opt(G) group_by_clause_opt(H) having_clause_opt(I). {
A = createSelectStmt(pCxt, B, C, D);
A = createSelectStmt(pCxt, B, C, D, M);
A = addWhereClause(pCxt, A, E);
A = addPartitionByClause(pCxt, A, F);
A = addWindowClauseClause(pCxt, A, G);
@ -1023,6 +1023,11 @@ query_specification(A) ::=
A = addFillClause(pCxt, A, L);
}
%type hint_opt { SQueryHint }
%destructor hint_opt { }
hint_opt(A) ::= . { A.withHint = false; }
hint_opt(A) ::= NO_BATCH_SCAN. { A.withHint = true; A.batchScan = false; }
%type set_quantifier_opt { bool }
%destructor set_quantifier_opt { }
set_quantifier_opt(A) ::= . { A = false; }

View File

@ -398,10 +398,14 @@ uint32_t tGetToken(const char* z, uint32_t* tokenId) {
*tokenId = TK_NK_SLASH;
return 1;
}
bool isHint = false;
if (z[2] == '+') {
isHint = TK_NK_HINT;
}
for (i = 3; z[i] && (z[i] != '/' || z[i - 1] != '*'); i++) {
}
if (z[i]) i++;
*tokenId = TK_NK_COMMENT;
*tokenId = isHint ? TK_NK_HINT : TK_NK_COMMENT;
return i;
}
case '%': {

View File

@ -921,15 +921,12 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
pJoin->joinType = pJoinLogicNode->joinType;
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
SNode* pPrimKeyCond = NULL;
SNode* pColEqCond = NULL;
SNode* pTagEqCond = NULL;
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pPrimKeyCond);
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond, &pJoin->pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pColEqCond);
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pColEqCond, &pJoin->pColEqCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pTagEqCond);
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pTagEqCond, &pJoin->pTagEqCond);
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pOtherOnCond) {
code = setNodeSlotId(pCxt, ((SPhysiNode*)pJoin)->pOutputDataBlockDesc->dataBlockId, -1, pJoinLogicNode->pOtherOnCond, &pJoin->pFilterConditions);
@ -941,7 +938,7 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
code = setConditionsSlotId(pCxt, (const SLogicNode*)pJoinLogicNode, (SPhysiNode*)pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pPrimKeyCond, pColEqCond, pTagEqCond, pJoin);
code = createHashJoinColList(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin->pPrimKeyCond, pJoin->pColEqCond, pJoin->pTagEqCond, pJoin);
}
if (TSDB_CODE_SUCCESS == code) {
code = sortHashJoinTargets(pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
@ -950,10 +947,6 @@ static int32_t createHashJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil
code = addDataBlockSlots(pCxt, pJoin->pTargets, pJoin->node.pOutputDataBlockDesc);
}
nodesDestroyNode(pPrimKeyCond);
nodesDestroyNode(pColEqCond);
nodesDestroyNode(pTagEqCond);
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pJoin;
} else {

View File

@ -144,7 +144,7 @@ typedef struct SQWTaskCtx {
SArray *explainRes;
void *taskHandle;
void *sinkHandle;
STbVerInfo tbInfo;
SArray *tbInfo; // STbVerInfo
} SQWTaskCtx;
typedef struct SQWSchStatus {

View File

@ -128,11 +128,11 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, "
"sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbNum:%d, events:%d,%d,%d,%d,%d",
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType,
ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, (int32_t)taosArrayGetSize(ctx->tbInfo),
ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
pIter = taosHashIterate(mgmt->ctxHash, pIter);

View File

@ -63,17 +63,11 @@ int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t c
}
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) {
STbVerInfo *tbInfo = ctx ? &ctx->tbInfo : NULL;
int64_t affectedRows = ctx ? ctx->affectedRows : 0;
SQueryTableRsp rsp = {0};
rsp.code = code;
rsp.affectedRows = affectedRows;
if (tbInfo) {
strcpy(rsp.tbFName, tbInfo->tbFName);
rsp.sversion = tbInfo->sversion;
rsp.tversion = tbInfo->tversion;
}
rsp.tbVerInfo = ctx->tbInfo;
int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp);
if (msgSize < 0) {

View File

@ -312,6 +312,8 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
ctx->sinkHandle = NULL;
qDebug("sink handle destroyed");
}
taosArrayDestroy(ctx->tbInfo);
}
int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
@ -462,15 +464,29 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam) {
}
void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
char tbName[TSDB_TABLE_NAME_LEN] = {0};
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
STbVerInfo tbInfo;
int32_t i = 0;
qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &ctx->tbInfo.sversion, &ctx->tbInfo.tversion);
while (true) {
if (qGetQueryTableSchemaVersion(pTaskInfo, dbFName, tbName, &tbInfo.sversion, &tbInfo.tversion, i) < 0) {
break;
}
if (dbFName[0] && tbName[0]) {
sprintf(ctx->tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else {
ctx->tbInfo.tbFName[0] = 0;
if (dbFName[0] && tbName[0]) {
sprintf(tbInfo.tbFName, "%s.%s", dbFName, tbName);
} else {
tbInfo.tbFName[0] = 0;
}
if (NULL == ctx->tbInfo) {
ctx->tbInfo = taosArrayInit(1, sizeof(tbInfo));
}
taosArrayPush(ctx->tbInfo, &tbInfo);
i++;
}
}

View File

@ -588,7 +588,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
}
int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
if (rsp->tbFName[0]) {
if (rsp->tbVerInfo) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
@ -599,12 +599,9 @@ int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) {
}
}
STbVerInfo tbInfo;
strcpy(tbInfo.tbFName, rsp->tbFName);
tbInfo.sversion = rsp->sversion;
tbInfo.tversion = rsp->tversion;
taosArrayPush((SArray *)pJob->execRes.res, &tbInfo);
taosArrayAddBatch((SArray *)pJob->execRes.res, taosArrayGet(rsp->tbVerInfo, 0), taosArrayGetSize(rsp->tbVerInfo));
taosArrayDestroy(rsp->tbVerInfo);
pJob->execRes.msgType = TDMT_SCH_QUERY;
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);