add get tb index async api
This commit is contained in:
parent
d4c7154cde
commit
001061fcd2
|
@ -2510,6 +2510,7 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
||||
int32_t tDeserializeSTableIndexRsp(void* buf, int32_t bufLen, STableIndexRsp* pRsp);
|
||||
void tFreeSTableIndexInfo(void *pInfo);
|
||||
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef struct SCatalogReq {
|
|||
SArray *pUdf; // element is udf name
|
||||
SArray *pIndex; // element is index name
|
||||
SArray *pUser; // element is SUserAuthInfo
|
||||
SArray *pTableIndex; // element is SNAME
|
||||
bool qNodeRequired; // valid qnode
|
||||
bool forceUpdate;
|
||||
} SCatalogReq;
|
||||
|
@ -82,6 +83,7 @@ typedef struct SMetaData {
|
|||
SArray *pDbInfo; // pRes = SDbInfo*
|
||||
SArray *pTableMeta; // pRes = STableMeta*
|
||||
SArray *pTableHash; // pRes = SVgroupInfo*
|
||||
SArray *pTableIndex; // pRes = SArray<STableIndexInfo>*
|
||||
SArray *pUdfList; // pRes = SFuncInfo*
|
||||
SArray *pIndex; // pRes = SIndexInfo*
|
||||
SArray *pUser; // pRes = bool*
|
||||
|
@ -277,7 +279,7 @@ int32_t catalogGetDBCfg(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
|||
|
||||
int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* indexName, SIndexInfo* pInfo);
|
||||
|
||||
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes);
|
||||
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes);
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* funcName, SFuncInfo* pInfo);
|
||||
|
||||
|
|
|
@ -304,6 +304,7 @@ typedef struct SDownstreamSourceNode {
|
|||
typedef struct SExchangePhysiNode {
|
||||
SPhysiNode node;
|
||||
int32_t srcGroupId; // group id of datasource suplans
|
||||
bool singleChannel;
|
||||
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhysiNode;
|
||||
|
||||
|
|
|
@ -2491,6 +2491,15 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSTableIndexInfo(void* info) {
|
||||
if (NULL == info) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableIndexInfo *pInfo = (STableIndexInfo*)info;
|
||||
|
||||
taosMemoryFree(pInfo->expr);
|
||||
}
|
||||
|
||||
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef enum {
|
|||
CTG_TASK_GET_DB_INFO,
|
||||
CTG_TASK_GET_TB_META,
|
||||
CTG_TASK_GET_TB_HASH,
|
||||
CTG_TASK_GET_TB_INDEX,
|
||||
CTG_TASK_GET_INDEX,
|
||||
CTG_TASK_GET_UDF,
|
||||
CTG_TASK_GET_USER,
|
||||
|
@ -93,6 +94,10 @@ typedef struct SCtgTbMetaCtx {
|
|||
int32_t flag;
|
||||
} SCtgTbMetaCtx;
|
||||
|
||||
typedef struct SCtgTbIndexCtx {
|
||||
SName* pName;
|
||||
} SCtgTbIndexCtx;
|
||||
|
||||
typedef struct SCtgDbVgCtx {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
} SCtgDbVgCtx;
|
||||
|
@ -189,6 +194,7 @@ typedef struct SCtgJob {
|
|||
int32_t indexNum;
|
||||
int32_t userNum;
|
||||
int32_t dbInfoNum;
|
||||
int32_t tbIndexNum;
|
||||
} SCtgJob;
|
||||
|
||||
typedef struct SCtgMsgCtx {
|
||||
|
@ -490,7 +496,7 @@ int32_t ctgGetDBVgInfoFromMnode(CTG_PARAMS, SBuildUseDBInput *input, SUseDbOutpu
|
|||
int32_t ctgGetQnodeListFromMnode(CTG_PARAMS, SArray *out, SCtgTask* pTask);
|
||||
int32_t ctgGetDBCfgFromMnode(CTG_PARAMS, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName* name, SArray** out, SCtgTask* pTask);
|
||||
int32_t ctgGetUdfInfoFromMnode(CTG_PARAMS, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetUserDbAuthFromMnode(CTG_PARAMS, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbMetaFromMnodeImpl(CTG_PARAMS, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
|
||||
|
|
|
@ -1136,14 +1136,14 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps
|
|||
CTG_API_LEAVE(ctgGetIndexInfoFromMnode(CTG_PARAMS_LIST(), indexName, pInfo, NULL));
|
||||
}
|
||||
|
||||
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const char* tbFName, SArray** pRes) {
|
||||
int32_t catalogGetTableIndex(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pRes) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == tbFName || NULL == pRes) {
|
||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName || NULL == pRes) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), tbFName, pRes, NULL));
|
||||
CTG_API_LEAVE(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), (SName*)pTableName, pRes, NULL));
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -232,6 +232,35 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
||||
SCtgTask task = {0};
|
||||
|
||||
task.type = CTG_TASK_GET_TB_INDEX;
|
||||
task.taskId = taskIdx;
|
||||
task.pJob = pJob;
|
||||
|
||||
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbIndexCtx));
|
||||
if (NULL == task.taskCtx) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SCtgTbIndexCtx* ctx = task.taskCtx;
|
||||
ctx->pName = taosMemoryMalloc(sizeof(*name));
|
||||
if (NULL == ctx->pName) {
|
||||
taosMemoryFree(task.taskCtx);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(ctx->pName, name, sizeof(*name));
|
||||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:%" PRIx64 " the %d task type %s initialized, tbName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
|
||||
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
|
||||
if (dbNum > 0) {
|
||||
|
@ -329,8 +358,9 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
int32_t indexNum = (int32_t)taosArrayGetSize(pReq->pIndex);
|
||||
int32_t userNum = (int32_t)taosArrayGetSize(pReq->pUser);
|
||||
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
|
||||
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
||||
|
||||
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum;
|
||||
*taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dbCfgNum + indexNum + userNum + dbInfoNum + tbIndexNum;
|
||||
if (*taskNum <= 0) {
|
||||
ctgDebug("Empty input for job, no need to retrieve meta, reqId:0x%" PRIx64, reqId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -360,6 +390,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
pJob->indexNum = indexNum;
|
||||
pJob->userNum = userNum;
|
||||
pJob->dbInfoNum = dbInfoNum;
|
||||
pJob->tbIndexNum = tbIndexNum;
|
||||
|
||||
pJob->pTasks = taosArrayInit(*taskNum, sizeof(SCtgTask));
|
||||
|
||||
|
@ -398,6 +429,11 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
CTG_ERR_JRET(ctgInitGetTbHashTask(pJob, taskIdx++, name));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbIndexNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableIndex, i);
|
||||
CTG_ERR_JRET(ctgInitGetTbIndexTask(pJob, taskIdx++, name));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < indexNum; ++i) {
|
||||
char* indexName = taosArrayGet(pReq->pIndex, i);
|
||||
CTG_ERR_JRET(ctgInitGetIndexTask(pJob, taskIdx++, indexName));
|
||||
|
@ -479,6 +515,21 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pTableIndex) {
|
||||
pJob->jobRes.pTableIndex = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
|
||||
if (NULL == pJob->jobRes.pTableIndex) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
}
|
||||
|
||||
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||
taosArrayPush(pJob->jobRes.pTableHash, &res);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
||||
SCtgJob* pJob = pTask->pJob;
|
||||
if (NULL == pJob->jobRes.pIndex) {
|
||||
|
@ -817,6 +868,20 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
||||
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||
|
||||
_return:
|
||||
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
@ -1056,13 +1121,24 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
|
||||
int32_t code = 0;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
void *pTrans = pTask->pJob->pTrans;
|
||||
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
|
||||
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
||||
|
||||
CTG_ERR_RET(ctgGetTbIndexFromMnode(CTG_PARAMS_LIST(), pCtx->pName, NULL, pTask));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgLaunchGetQnodeTask(SCtgTask *pTask) {
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
void *pTrans = pTask->pJob->pTrans;
|
||||
const SEpSet* pMgmtEps = &pTask->pJob->pMgmtEps;
|
||||
|
||||
CTG_ERR_RET(ctgGetQnodeListFromMnode(CTG_PARAMS_LIST(), NULL, pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1174,6 +1250,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
|||
{ctgLaunchGetDbInfoTask, ctgHandleGetDbInfoRsp, ctgDumpDbInfoRes},
|
||||
{ctgLaunchGetTbMetaTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetaRes},
|
||||
{ctgLaunchGetTbHashTask, ctgHandleGetTbHashRsp, ctgDumpTbHashRes},
|
||||
{ctgLaunchGetTbIndexTask, ctgHandleGetTbIndexRsp, ctgDumpTbIndexRes},
|
||||
{ctgLaunchGetIndexTask, ctgHandleGetIndexRsp, ctgDumpIndexRes},
|
||||
{ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes},
|
||||
{ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes},
|
||||
|
|
|
@ -427,11 +427,13 @@ int32_t ctgGetIndexInfoFromMnode(CTG_PARAMS, const char *indexName, SIndexInfo *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, const char *tbFName, SArray** out, SCtgTask* pTask) {
|
||||
int32_t ctgGetTbIndexFromMnode(CTG_PARAMS, SName *name, SArray** out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(name, tbFName);
|
||||
|
||||
ctgDebug("try to get tb index from mnode, tbFName:%s", tbFName);
|
||||
|
||||
|
|
|
@ -60,6 +60,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
|
|||
taosArrayDestroy(pData->pTableHash);
|
||||
pData->pTableHash = NULL;
|
||||
|
||||
taosArrayDestroy(pData->pTableIndex);
|
||||
pData->pTableIndex = NULL;
|
||||
|
||||
taosArrayDestroy(pData->pUdfList);
|
||||
pData->pUdfList = NULL;
|
||||
|
||||
|
@ -248,6 +251,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
|
|||
taosMemoryFreeClear(pCtx->out);
|
||||
break;
|
||||
}
|
||||
case TDMT_MND_GET_TABLE_INDEX: {
|
||||
SArray** pOut = (SArray**)pCtx->out;
|
||||
if (pOut) {
|
||||
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_MND_RETRIEVE_FUNC: {
|
||||
SFuncInfo* pOut = (SFuncInfo*)pCtx->out;
|
||||
taosMemoryFree(pOut->pCode);
|
||||
|
@ -344,6 +355,13 @@ void ctgFreeTask(SCtgTask* pTask) {
|
|||
taosMemoryFreeClear(pTask->res);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_TB_INDEX: {
|
||||
SCtgTbIndexCtx* taskCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
||||
taosMemoryFreeClear(taskCtx->pName);
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosArrayDestroyEx(pTask->res, tFreeSTableIndexInfo);
|
||||
break;
|
||||
}
|
||||
case CTG_TASK_GET_INDEX: {
|
||||
taosMemoryFreeClear(pTask->taskCtx);
|
||||
taosMemoryFreeClear(pTask->res);
|
||||
|
|
|
@ -32,15 +32,17 @@ extern "C" {
|
|||
#define EXPLAIN_PROJECTION_FORMAT "Projection"
|
||||
#define EXPLAIN_JOIN_FORMAT "%s"
|
||||
#define EXPLAIN_AGG_FORMAT "Aggragate"
|
||||
#define EXPLAIN_INDEF_ROWS_FORMAT "Indefinite Rows Function"
|
||||
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
|
||||
#define EXPLAIN_SORT_FORMAT "Sort"
|
||||
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
|
||||
#define EXPLAIN_FILL_FORMAT "Fill"
|
||||
#define EXPLAIN_SESSION_FORMAT "Session"
|
||||
#define EXPLAIN_STATE_WINDOW_FORMAT "StateWindow on Column %s"
|
||||
#define EXPLAIN_PARITION_FORMAT "Partition on Column %s"
|
||||
#define EXPLAIN_ORDER_FORMAT "Order: %s"
|
||||
#define EXPLAIN_FILTER_FORMAT "Filter: "
|
||||
#define EXPLAIN_FILL_FORMAT "Fill: %s"
|
||||
#define EXPLAIN_FILL_VALUE_FORMAT "Fill Values: "
|
||||
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
|
||||
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
|
||||
#define EXPLAIN_OUTPUT_FORMAT "Output: "
|
||||
|
@ -66,6 +68,8 @@ extern "C" {
|
|||
#define EXPLAIN_WIDTH_FORMAT "width=%d"
|
||||
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
|
||||
#define EXPLAIN_EXECINFO_FORMAT "cost=%.3f..%.3f rows=%" PRIu64
|
||||
#define EXPLAIN_MODE_FORMAT "mode=%s"
|
||||
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
|
||||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
|
|
|
@ -179,6 +179,21 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
|
|||
pPhysiChildren = mergePhysiNode->node.pChildren;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
|
||||
SIndefRowsFuncPhysiNode *indefPhysiNode = (SIndefRowsFuncPhysiNode *)pNode;
|
||||
pPhysiChildren = indefPhysiNode->node.pChildren;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
|
||||
SMergeIntervalPhysiNode *intPhysiNode = (SMergeIntervalPhysiNode *)pNode;
|
||||
pPhysiChildren = intPhysiNode->window.node.pChildren;
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
|
||||
SFillPhysiNode *fillPhysiNode = (SFillPhysiNode *)pNode;
|
||||
pPhysiChildren = fillPhysiNode->node.pChildren;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("not supported physical node type %d", pNode->type);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
|
@ -212,12 +227,15 @@ int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group
|
|||
SExplainRsp *rsp = NULL;
|
||||
for (int32_t i = 0; i < group->nodeNum; ++i) {
|
||||
rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
/*
|
||||
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
|
||||
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
|
||||
*/
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo);
|
||||
}
|
||||
|
||||
++group->physiPlanExecIdx;
|
||||
|
@ -599,6 +617,42 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC: {
|
||||
SIndefRowsFuncPhysiNode *pIndefNode = (SIndefRowsFuncPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_INDEF_ROWS_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
if (pIndefNode->pVectorFuncs) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIndefNode->pVectorFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||
nodesGetOutputNumFromSlotList(pIndefNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIndefNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pIndefNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIndefNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
|
||||
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
|
||||
|
@ -607,7 +661,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, pExchNode->singleChannel ? 1 : group->nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
|
@ -750,6 +804,106 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL: {
|
||||
SMergeIntervalPhysiNode *pIntNode = (SMergeIntervalPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->window.pTspk));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||
nodesGetOutputNumFromSlotList(pIntNode->window.node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
uint8_t precision = getIntervalPrecision(pIntNode);
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT,
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, precision),
|
||||
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(precision),
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, precision),
|
||||
pIntNode->slidingUnit);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pIntNode->window.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE,
|
||||
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_FILL: {
|
||||
SFillPhysiNode *pFillNode = (SFillPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_FILL_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pFillNode->mode));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||
nodesGetOutputNumFromSlotList(pFillNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
if (pFillNode->pValues) {
|
||||
SNodeListNode *pValues = (SNodeListNode*)pFillNode->pValues;
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_VALUE_FORMAT);
|
||||
SNode* tNode = NULL;
|
||||
int32_t i = 0;
|
||||
FOREACH(tNode, pValues->pNodeList) {
|
||||
if (i) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
SValueNode* tValue = (SValueNode*)tNode;
|
||||
char *value = nodesGetStrValueFromNode(tValue);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_STRING_TYPE_FORMAT, value);
|
||||
taosMemoryFree(value);
|
||||
++i;
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pFillNode->timeRange.skey,
|
||||
pFillNode->timeRange.ekey);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pFillNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pFillNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
|
||||
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: {
|
||||
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);
|
||||
|
|
|
@ -1186,6 +1186,7 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pMerge->srcGroupId;
|
||||
pExchange->singleChannel = true;
|
||||
pExchange->node.pParent = (SPhysiNode*)pMerge;
|
||||
pExchange->node.pOutputDataBlockDesc = nodesCloneNode(pMerge->node.pOutputDataBlockDesc);
|
||||
if (NULL == pExchange->node.pOutputDataBlockDesc) {
|
||||
|
|
Loading…
Reference in New Issue