commit
7c1d1a2436
|
@ -930,6 +930,21 @@ typedef struct {
|
|||
char data[];
|
||||
} SRetrieveMetaTableRsp;
|
||||
|
||||
typedef struct SExplainExecInfo {
|
||||
uint64_t startupCost;
|
||||
uint64_t totalCost;
|
||||
uint64_t numOfRows;
|
||||
void *verboseInfo;
|
||||
} SExplainExecInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfPlans;
|
||||
SExplainExecInfo *subplanInfo;
|
||||
} SExplainRsp;
|
||||
|
||||
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
||||
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
|
||||
int32_t port;
|
||||
|
@ -1067,6 +1082,7 @@ typedef struct SSubQueryMsg {
|
|||
uint64_t taskId;
|
||||
int64_t refId;
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
uint32_t sqlLen; // the query sql,
|
||||
uint32_t phyLen;
|
||||
char msg[];
|
||||
|
|
|
@ -188,6 +188,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_SHOW_TABLES_FETCH, "vnode-show-tables-fetch", SVShowTablesFetchReq, SVShowTablesFetchRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_CONTINUE, "vnode-query-continue", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_QUERY_HEARTBEAT, "vnode-query-heartbeat", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_EXPLAIN, "vnode-explain", NULL, NULL)
|
||||
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
|
||||
|
|
|
@ -146,6 +146,7 @@ typedef struct {
|
|||
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
|
||||
|
||||
#define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t)))
|
||||
#define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP))
|
||||
|
||||
#define IS_VALID_TINYINT(_t) ((_t) > INT8_MIN && (_t) <= INT8_MAX)
|
||||
#define IS_VALID_SMALLINT(_t) ((_t) > INT16_MIN && (_t) <= INT16_MAX)
|
||||
|
|
|
@ -59,6 +59,7 @@ int32_t taosVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool
|
|||
#endif
|
||||
|
||||
int32_t taosVariantTypeSetType(SVariant *pVariant, char type);
|
||||
char * taosVariantGet(SVariant *pVar, int32_t type);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -17,8 +17,14 @@
|
|||
#include "tmsg.h"
|
||||
#include "plannodes.h"
|
||||
|
||||
typedef struct SExplainCtx SExplainCtx;
|
||||
|
||||
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs);
|
||||
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
|
||||
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
|
||||
void qExplainFreeCtx(SExplainCtx *pCtx);
|
||||
|
||||
|
||||
|
|
|
@ -174,6 +174,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
|
|||
|
||||
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -239,6 +239,8 @@ int32_t nodesListToString(const SNodeList* pList, bool format, char** pStr, int3
|
|||
int32_t nodesStringToList(const char* pStr, SNodeList** pList);
|
||||
|
||||
int32_t nodesNodeToSQL(SNode *pNode, char *buf, int32_t bufSize, int32_t *len);
|
||||
char *nodesGetNameFromColumnNode(SNode *pNode);
|
||||
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -334,6 +334,7 @@ typedef enum EExplainMode {
|
|||
typedef struct SExplainInfo {
|
||||
EExplainMode mode;
|
||||
bool verbose;
|
||||
double ratio;
|
||||
} SExplainInfo;
|
||||
|
||||
typedef struct SQueryPlan {
|
||||
|
|
|
@ -22,6 +22,7 @@ extern "C" {
|
|||
|
||||
#include "nodes.h"
|
||||
#include "tmsg.h"
|
||||
#include "tvariant.h"
|
||||
|
||||
#define TABLE_TOTAL_COL_NUM(pMeta) ((pMeta)->tableInfo.numOfColumns + (pMeta)->tableInfo.numOfTags)
|
||||
#define TABLE_META_SIZE(pMeta) (NULL == (pMeta) ? 0 : (sizeof(STableMeta) + TABLE_TOTAL_COL_NUM((pMeta)) * sizeof(SSchema)))
|
||||
|
@ -316,6 +317,8 @@ bool nodesIsTimelineQuery(const SNode* pQuery);
|
|||
|
||||
void* nodesGetValueFromNode(SValueNode *pNode);
|
||||
char* nodesGetStrValueFromNode(SValueNode *pNode);
|
||||
char *getFillModeString(EFillMode mode);
|
||||
void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t op
|
|||
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
|
||||
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
|
||||
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
|
||||
extern int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win);
|
||||
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
|
||||
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
|
||||
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
|
||||
extern void filterFreeInfo(SFilterInfo *info);
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
|
|||
* @param nodeList Qnode/Vnode address list, element is SQueryNodeAddr
|
||||
* @return
|
||||
*/
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan* pDag, int64_t* pJob, const char* sql, SQueryResult *pRes);
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql, int64_t startTs, SQueryResult *pRes);
|
||||
|
||||
/**
|
||||
* Process the query job, generated according to the query physical plan.
|
||||
|
|
|
@ -76,6 +76,7 @@ void taosWLockLatch(SRWLatch *pLatch);
|
|||
void taosWUnLockLatch(SRWLatch *pLatch);
|
||||
void taosRLockLatch(SRWLatch *pLatch);
|
||||
void taosRUnLockLatch(SRWLatch *pLatch);
|
||||
int32_t taosWTryLockLatch(SRWLatch *pLatch);
|
||||
|
||||
// copy on read
|
||||
#define taosCorBeginRead(x) \
|
||||
|
|
|
@ -254,7 +254,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
|
|||
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
|
||||
|
||||
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
|
||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, &res);
|
||||
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (pRequest->body.queryJob != 0) {
|
||||
schedulerFreeJob(pRequest->body.queryJob);
|
||||
|
|
|
@ -2769,6 +2769,48 @@ int32_t tDecodeSMqCMCommitOffsetReq(SCoder *decoder, SMqCMCommitOffsetReq *pReq)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp) {
|
||||
SCoder encoder = {0};
|
||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->numOfPlans) < 0) return -1;
|
||||
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
|
||||
SExplainExecInfo *info = &pRsp->subplanInfo[i];
|
||||
if (tEncodeU64(&encoder, info->startupCost) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, info->totalCost) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, info->numOfRows) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tCoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSExplainRsp(void* buf, int32_t bufLen, SExplainRsp* pRsp) {
|
||||
SCoder decoder = {0};
|
||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->numOfPlans) < 0) return -1;
|
||||
if (pRsp->numOfPlans > 0) {
|
||||
pRsp->subplanInfo = taosMemoryMalloc(pRsp->numOfPlans * sizeof(SExplainExecInfo));
|
||||
if (pRsp->subplanInfo == NULL) return -1;
|
||||
}
|
||||
for (int32_t i = 0; i < pRsp->numOfPlans; ++i) {
|
||||
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].startupCost) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].totalCost) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->subplanInfo[i].numOfRows) < 0) return -1;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tCoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
|
|
|
@ -1014,4 +1014,28 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
char * taosVariantGet(SVariant *pVar, int32_t type) {
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
return (char *)&pVar->i;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
return (char *)&pVar->d;
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
return (char *)pVar->pz;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
return (char *)pVar->ucs4;
|
||||
default:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,38 +26,54 @@ extern "C" {
|
|||
#define EXPLAIN_MAX_GROUP_NUM 100
|
||||
|
||||
//newline area
|
||||
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s columns=%d width=%d"
|
||||
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s columns=%d width=%d"
|
||||
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s columns=%d width=%d"
|
||||
#define EXPLAIN_PROJECTION_FORMAT "Projection columns=%d width=%d"
|
||||
#define EXPLAIN_JOIN_FORMAT "%s between %d tables width=%d"
|
||||
#define EXPLAIN_AGG_FORMAT "Aggragate functions=%d"
|
||||
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1 width=%d"
|
||||
#define EXPLAIN_SORT_FORMAT "Sort on %d Column(s) width=%d"
|
||||
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s functions=%d interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c width=%d"
|
||||
#define EXPLAIN_SESSION_FORMAT "Session gap=%" PRId64 " functions=%d width=%d"
|
||||
#define EXPLAIN_TAG_SCAN_FORMAT "Tag Scan on %s"
|
||||
#define EXPLAIN_TBL_SCAN_FORMAT "Table Scan on %s"
|
||||
#define EXPLAIN_SYSTBL_SCAN_FORMAT "System Table Scan on %s"
|
||||
#define EXPLAIN_PROJECTION_FORMAT "Projection"
|
||||
#define EXPLAIN_JOIN_FORMAT "%s"
|
||||
#define EXPLAIN_AGG_FORMAT "Aggragate"
|
||||
#define EXPLAIN_EXCHANGE_FORMAT "Data Exchange %d:1"
|
||||
#define EXPLAIN_SORT_FORMAT "Sort"
|
||||
#define EXPLAIN_INTERVAL_FORMAT "Interval on Column %s"
|
||||
#define EXPLAIN_SESSION_FORMAT "Session"
|
||||
#define EXPLAIN_ORDER_FORMAT "Order: %s"
|
||||
#define EXPLAIN_FILTER_FORMAT "Filter: "
|
||||
#define EXPLAIN_FILL_FORMAT "Fill: %s"
|
||||
#define EXPLAIN_ON_CONDITIONS_FORMAT "Join Cond: "
|
||||
#define EXPLAIN_TIMERANGE_FORMAT "Time Range: [%" PRId64 ", %" PRId64 "]"
|
||||
#define EXPLAIN_OUTPUT_FORMAT "Output: "
|
||||
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
||||
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
||||
|
||||
//append area
|
||||
#define EXPLAIN_GROUPS_FORMAT " groups=%d"
|
||||
#define EXPLAIN_WIDTH_FORMAT " width=%d"
|
||||
#define EXPLAIN_LOOPS_FORMAT " loops=%d"
|
||||
#define EXPLAIN_REVERSE_FORMAT " reverse=%d"
|
||||
#define EXPLAIN_LEFT_PARENTHESIS_FORMAT " ("
|
||||
#define EXPLAIN_RIGHT_PARENTHESIS_FORMAT ")"
|
||||
#define EXPLAIN_BLANK_FORMAT " "
|
||||
#define EXPLAIN_COST_FORMAT "cost=%.2f..%.2f"
|
||||
#define EXPLAIN_ROWS_FORMAT "rows=%" PRIu64
|
||||
#define EXPLAIN_COLUMNS_FORMAT "columns=%d"
|
||||
#define EXPLAIN_WIDTH_FORMAT "width=%d"
|
||||
#define EXPLAIN_GROUPS_FORMAT "groups=%d"
|
||||
#define EXPLAIN_WIDTH_FORMAT "width=%d"
|
||||
#define EXPLAIN_LOOPS_FORMAT "loops=%d"
|
||||
#define EXPLAIN_REVERSE_FORMAT "reverse=%d"
|
||||
#define EXPLAIN_FUNCTIONS_FORMAT "functions=%d"
|
||||
#define EXPLAIN_EXECINFO_FORMAT "cost=%" PRIu64 "..%" PRIu64 " rows=%" PRIu64
|
||||
|
||||
typedef struct SExplainGroup {
|
||||
int32_t nodeNum;
|
||||
int32_t physiPlanExecNum;
|
||||
int32_t physiPlanNum;
|
||||
int32_t physiPlanExecIdx;
|
||||
SRWLatch lock;
|
||||
SSubplan *plan;
|
||||
void *execInfo; //TODO
|
||||
SArray *nodeExecInfo; //Array<SExplainRsp>
|
||||
} SExplainGroup;
|
||||
|
||||
typedef struct SExplainResNode {
|
||||
SNodeList* pChildren;
|
||||
SPhysiNode* pNode;
|
||||
void* pExecInfo;
|
||||
SNodeList* pChildren;
|
||||
SPhysiNode* pNode;
|
||||
SArray* pExecInfo; // Array<SExplainExecInfo>
|
||||
} SExplainResNode;
|
||||
|
||||
typedef struct SQueryExplainRowInfo {
|
||||
|
@ -67,11 +83,21 @@ typedef struct SQueryExplainRowInfo {
|
|||
} SQueryExplainRowInfo;
|
||||
|
||||
typedef struct SExplainCtx {
|
||||
int32_t totalSize;
|
||||
bool verbose;
|
||||
char *tbuf;
|
||||
SArray *rows;
|
||||
SHashObj *groupHash;
|
||||
EExplainMode mode;
|
||||
double ratio;
|
||||
bool verbose;
|
||||
|
||||
SRWLatch lock;
|
||||
int32_t rootGroupId;
|
||||
int32_t dataSize;
|
||||
bool execDone;
|
||||
int64_t reqStartTs;
|
||||
int64_t jobStartTs;
|
||||
int64_t jobDoneTs;
|
||||
char *tbuf;
|
||||
SArray *rows;
|
||||
int32_t groupDoneNum;
|
||||
SHashObj *groupHash; // Hash<SExplainGroup>
|
||||
} SExplainCtx;
|
||||
|
||||
#define EXPLAIN_ORDER_STRING(_order) ((TSDB_ORDER_ASC == _order) ? "Ascending" : "Descending")
|
||||
|
|
|
@ -17,44 +17,61 @@
|
|||
#include "plannodes.h"
|
||||
#include "commandInt.h"
|
||||
|
||||
int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes);
|
||||
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pRes);
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level);
|
||||
|
||||
|
||||
void qFreeExplainResTree(SExplainResNode *res) {
|
||||
if (NULL == res) {
|
||||
void qExplainFreeResNode(SExplainResNode *resNode) {
|
||||
if (NULL == resNode) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(res->pExecInfo);
|
||||
taosMemoryFreeClear(resNode->pExecInfo);
|
||||
|
||||
SNode* node = NULL;
|
||||
FOREACH(node, res->pChildren) {
|
||||
qFreeExplainResTree((SExplainResNode *)node);
|
||||
FOREACH(node, resNode->pChildren) {
|
||||
qExplainFreeResNode((SExplainResNode *)node);
|
||||
}
|
||||
nodesClearList(res->pChildren);
|
||||
nodesClearList(resNode->pChildren);
|
||||
|
||||
taosMemoryFreeClear(res);
|
||||
taosMemoryFreeClear(resNode);
|
||||
}
|
||||
|
||||
void qFreeExplainCtx(void *ctx) {
|
||||
if (NULL == ctx) {
|
||||
void qExplainFreeCtx(SExplainCtx *pCtx) {
|
||||
if (NULL == pCtx) {
|
||||
return;
|
||||
}
|
||||
|
||||
SExplainCtx *pCtx = (SExplainCtx *)ctx;
|
||||
int32_t rowSize = taosArrayGetSize(pCtx->rows);
|
||||
for (int32_t i = 0; i < rowSize; ++i) {
|
||||
SQueryExplainRowInfo *row = taosArrayGet(pCtx->rows, i);
|
||||
taosMemoryFreeClear(row->buf);
|
||||
}
|
||||
|
||||
if (EXPLAIN_MODE_ANALYZE == pCtx->mode && pCtx->groupHash) {
|
||||
void *pIter = taosHashIterate(pCtx->groupHash, NULL);
|
||||
while (pIter) {
|
||||
SExplainGroup *group = (SExplainGroup *)pIter;
|
||||
if (NULL == group->nodeExecInfo) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t num = taosArrayGetSize(group->nodeExecInfo);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SExplainRsp *rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
taosMemoryFreeClear(rsp->subplanInfo);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(pCtx->groupHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
taosHashCleanup(pCtx->groupHash);
|
||||
taosArrayDestroy(pCtx->rows);
|
||||
taosMemoryFree(pCtx);
|
||||
}
|
||||
|
||||
int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) {
|
||||
int32_t qExplainInitCtx(SExplainCtx **pCtx, SHashObj *groupHash, bool verbose, double ratio, EExplainMode mode) {
|
||||
int32_t code = 0;
|
||||
SExplainCtx *ctx = taosMemoryCalloc(1, sizeof(SExplainCtx));
|
||||
if (NULL == ctx) {
|
||||
|
@ -74,7 +91,9 @@ int32_t qInitExplainCtx(void **pCtx, SHashObj *groupHash, bool verbose) {
|
|||
QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
ctx->mode = mode;
|
||||
ctx->verbose = verbose;
|
||||
ctx->ratio = ratio;
|
||||
ctx->tbuf = tbuf;
|
||||
ctx->rows = rows;
|
||||
ctx->groupHash = groupHash;
|
||||
|
@ -92,35 +111,7 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
|
||||
char *qFillModeString(EFillMode mode) {
|
||||
switch (mode) {
|
||||
case FILL_MODE_NONE:
|
||||
return "none";
|
||||
case FILL_MODE_VALUE:
|
||||
return "value";
|
||||
case FILL_MODE_PREV:
|
||||
return "prev";
|
||||
case FILL_MODE_NULL:
|
||||
return "null";
|
||||
case FILL_MODE_LINEAR:
|
||||
return "linear";
|
||||
case FILL_MODE_NEXT:
|
||||
return "next";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
char *qGetNameFromColumnNode(SNode *pNode) {
|
||||
if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) {
|
||||
return "NULL";
|
||||
}
|
||||
|
||||
return ((SColumnNode *)pNode)->colName;
|
||||
}
|
||||
|
||||
int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeList **pChildren) {
|
||||
int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNodeList **pChildren) {
|
||||
int32_t tlen = 0;
|
||||
SNodeList *pPhysiChildren = NULL;
|
||||
|
||||
|
@ -192,52 +183,119 @@ int32_t qGenerateExplainResChildren(SPhysiNode *pNode, void *pExecInfo, SNodeLis
|
|||
SNode* node = NULL;
|
||||
SExplainResNode *pResNode = NULL;
|
||||
FOREACH(node, pPhysiChildren) {
|
||||
QRY_ERR_RET(qGenerateExplainResNode((SPhysiNode *)node, pExecInfo, &pResNode));
|
||||
QRY_ERR_RET(qExplainGenerateResNode((SPhysiNode *)node, group, &pResNode));
|
||||
QRY_ERR_RET(nodesListAppend(*pChildren, pResNode));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qGenerateExplainResNode(SPhysiNode *pNode, void *pExecInfo, SExplainResNode **pRes) {
|
||||
int32_t qExplainGenerateResNodeExecInfo(SArray **pExecInfo, SExplainGroup *group) {
|
||||
*pExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainExecInfo));
|
||||
if (NULL == (*pExecInfo)) {
|
||||
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
SExplainRsp *rsp = NULL;
|
||||
for (int32_t i = 0; i < group->nodeNum; ++i) {
|
||||
rsp = taosArrayGet(group->nodeExecInfo, i);
|
||||
if (group->physiPlanExecIdx >= rsp->numOfPlans) {
|
||||
qError("physiPlanIdx %d exceed plan num %d", group->physiPlanExecIdx, rsp->numOfPlans);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
taosArrayPush(*pExecInfo, rsp->subplanInfo + group->physiPlanExecIdx);
|
||||
}
|
||||
|
||||
++group->physiPlanExecIdx;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateResNode(SPhysiNode *pNode, SExplainGroup *group, SExplainResNode **pResNode) {
|
||||
if (NULL == pNode) {
|
||||
*pRes = NULL;
|
||||
*pResNode = NULL;
|
||||
qError("physical node is NULL");
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
||||
SExplainResNode *res = taosMemoryCalloc(1, sizeof(SExplainResNode));
|
||||
if (NULL == res) {
|
||||
|
||||
SExplainResNode *resNode = taosMemoryCalloc(1, sizeof(SExplainResNode));
|
||||
if (NULL == resNode) {
|
||||
qError("calloc SPhysiNodeExplainRes failed");
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
res->pNode = pNode;
|
||||
res->pExecInfo = pExecInfo;
|
||||
QRY_ERR_JRET(qGenerateExplainResChildren(pNode, pExecInfo, &res->pChildren));
|
||||
resNode->pNode = pNode;
|
||||
|
||||
*pRes = res;
|
||||
if (group->nodeExecInfo) {
|
||||
QRY_ERR_JRET(qExplainGenerateResNodeExecInfo(&resNode->pExecInfo, group));
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qExplainGenerateResChildren(pNode, group, &resNode->pChildren));
|
||||
|
||||
++group->physiPlanNum;
|
||||
|
||||
*pResNode = resNode;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
qFreeExplainResTree(res);
|
||||
qExplainFreeResNode(resNode);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainBufAppendExecInfo(void *pExecInfo, char *tbuf, int32_t *len) {
|
||||
int32_t qExplainBufAppendExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
int32_t tlen = *len;
|
||||
int32_t nodeNum = taosArrayGetSize(pExecInfo);
|
||||
SExplainExecInfo maxExecInfo = {0};
|
||||
|
||||
EXPLAIN_ROW_APPEND("(exec info here)");
|
||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pExecInfo, i);
|
||||
if (execInfo->startupCost > maxExecInfo.startupCost) {
|
||||
maxExecInfo.startupCost = execInfo->startupCost;
|
||||
}
|
||||
if (execInfo->totalCost > maxExecInfo.totalCost) {
|
||||
maxExecInfo.totalCost = execInfo->totalCost;
|
||||
}
|
||||
if (execInfo->numOfRows > maxExecInfo.numOfRows) {
|
||||
maxExecInfo.numOfRows = execInfo->numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_EXECINFO_FORMAT, maxExecInfo.startupCost, maxExecInfo.totalCost, maxExecInfo.numOfRows);
|
||||
|
||||
*len = tlen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainBufAppendVerboseExecInfo(SArray *pExecInfo, char *tbuf, int32_t *len) {
|
||||
int32_t tlen = 0;
|
||||
bool gotVerbose = false;
|
||||
int32_t nodeNum = taosArrayGetSize(pExecInfo);
|
||||
SExplainExecInfo maxExecInfo = {0};
|
||||
|
||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||
SExplainExecInfo *execInfo = taosArrayGet(pExecInfo, i);
|
||||
if (execInfo->verboseInfo) {
|
||||
gotVerbose = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (gotVerbose) {
|
||||
EXPLAIN_ROW_APPEND("exec verbose info");
|
||||
}
|
||||
|
||||
*len = tlen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t level) {
|
||||
SQueryExplainRowInfo row = {0};
|
||||
row.buf = taosMemoryMalloc(len);
|
||||
|
@ -249,7 +307,7 @@ int32_t qExplainResAppendRow(SExplainCtx *ctx, char *tbuf, int32_t len, int32_t
|
|||
memcpy(row.buf, tbuf, len);
|
||||
row.level = level;
|
||||
row.len = len;
|
||||
ctx->totalSize += len;
|
||||
ctx->dataSize += len;
|
||||
|
||||
if (NULL == taosArrayPush(ctx->rows, &row)) {
|
||||
qError("taosArrayPush row to explain res rows failed");
|
||||
|
@ -275,39 +333,77 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
switch (pNode->type) {
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
|
||||
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname, pTagScanNode->pScanCols->length, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTagScanNode->pScanCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count);
|
||||
if (pTagScanNode->reverse) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pTagScanNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
if (tlen) {
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
|
||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:{
|
||||
STableScanPhysiNode *pTblScanNode = (STableScanPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname, pTblScanNode->scan.pScanCols->length, pTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_TBL_SCAN_FORMAT, pTblScanNode->scan.tableName.tname);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pTblScanNode->scan.pScanCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count);
|
||||
if (pTblScanNode->scan.reverse) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pTblScanNode->scan.node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
@ -327,18 +423,33 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:{
|
||||
SSystemTableScanPhysiNode *pSTblScanNode = (SSystemTableScanPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname, pSTblScanNode->scan.pScanCols->length, pSTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SYSTBL_SCAN_FORMAT, pSTblScanNode->scan.tableName.tname);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSTblScanNode->scan.pScanCols->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count);
|
||||
if (pSTblScanNode->scan.reverse) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pSTblScanNode->scan.node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
@ -355,14 +466,27 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:{
|
||||
SProjectPhysiNode *pPrjNode = (SProjectPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT, pPrjNode->pProjections->length, pPrjNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_PROJECTION_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pPrjNode->pProjections->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPrjNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pPrjNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPrjNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pPrjNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pPrjNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
|
@ -374,14 +498,27 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_JOIN:{
|
||||
SJoinPhysiNode *pJoinNode = (SJoinPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType), pJoinNode->pTargets->length, pJoinNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_JOIN_FORMAT, EXPLAIN_JOIN_STRING(pJoinNode->joinType));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pJoinNode->pTargets->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pJoinNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pJoinNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pJoinNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
|
@ -398,19 +535,31 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_AGG:{
|
||||
SAggPhysiNode *pAggNode = (SAggPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT, pAggNode->pAggFuncs->length);
|
||||
if (pAggNode->pGroupKeys) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pAggNode->pGroupKeys->length);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_AGG_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pAggNode->pAggFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
if (pAggNode->pGroupKeys) {
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pAggNode->pGroupKeys->length);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pAggNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pAggNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pAggNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pAggNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
|
@ -428,14 +577,25 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum, pExchNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_EXCHANGE_FORMAT, group->nodeNum);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pExchNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pExchNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pExchNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pExchNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pExchNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
|
@ -444,19 +604,33 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
}
|
||||
|
||||
QRY_ERR_RET(qAppendTaskExplainResRows(ctx, pExchNode->srcGroupId, level + 1));
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1));
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SORT:{
|
||||
SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT, pSortNode->pSortKeys->length, pSortNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pSortNode->pSortKeys->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pSortNode->node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSortNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pSortNode->node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pSortNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
|
@ -468,20 +642,34 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:{
|
||||
SIntervalPhysiNode *pIntNode = (SIntervalPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, qGetNameFromColumnNode(pIntNode->pTspk), pIntNode->window.pFuncs->length,
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision), pIntNode->intervalUnit,
|
||||
pIntNode->offset, getPrecisionUnit(pIntNode->precision),
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit,
|
||||
pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_INTERVAL_FORMAT, nodesGetNameFromColumnNode(pIntNode->pTspk));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
if (verbose) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pIntNode->window.node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIME_WINDOWS_FORMAT, INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->interval, pIntNode->intervalUnit, pIntNode->precision),
|
||||
pIntNode->intervalUnit, pIntNode->offset, getPrecisionUnit(pIntNode->precision),
|
||||
INVERAL_TIME_FROM_PRECISION_TO_UNIT(pIntNode->sliding, pIntNode->slidingUnit, pIntNode->precision), pIntNode->slidingUnit);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pIntNode->pFill) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, qFillModeString(pIntNode->pFill->mode));
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILL_FORMAT, getFillModeString(pIntNode->pFill->mode));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -496,18 +684,36 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:{
|
||||
SSessionWinodwPhysiNode *pIntNode = (SSessionWinodwPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT, pIntNode->gap, pIntNode->window.pFuncs->length, pIntNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
SSessionWinodwPhysiNode *pSessNode = (SSessionWinodwPhysiNode *)pNode;
|
||||
EXPLAIN_ROW_NEW(level, EXPLAIN_SESSION_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||
if (pResNode->pExecInfo) {
|
||||
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
}
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pSessNode->window.pFuncs->length);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSessNode->window.node.pOutputDataBlockDesc->totalRowSize);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||
|
||||
|
||||
if (verbose) {
|
||||
if (pIntNode->window.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pSessNode->window.node.pOutputDataBlockDesc->pSlots));
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSessNode->window.node.pOutputDataBlockDesc->outputRowSize);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_WINDOW_FORMAT, pSessNode->gap);
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
|
||||
if (pSessNode->window.node.pConditions) {
|
||||
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||
QRY_ERR_RET(nodesNodeToSQL(pIntNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
QRY_ERR_RET(nodesNodeToSQL(pSessNode->window.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||
EXPLAIN_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||
}
|
||||
|
@ -540,7 +746,7 @@ int32_t qExplainResNodeToRows(SExplainResNode *pResNode, SExplainCtx *ctx, int32
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) {
|
||||
int32_t qExplainAppendGroupResRows(void *pCtx, int32_t groupId, int32_t level) {
|
||||
SExplainResNode *node = NULL;
|
||||
int32_t code = 0;
|
||||
SExplainCtx *ctx = (SExplainCtx *)pCtx;
|
||||
|
@ -551,19 +757,24 @@ int32_t qAppendTaskExplainResRows(void *pCtx, int32_t groupId, int32_t level) {
|
|||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
QRY_ERR_RET(qGenerateExplainResNode(group->plan->pNode, group->execInfo, &node));
|
||||
QRY_ERR_RET(qExplainGenerateResNode(group->plan->pNode, group, &node));
|
||||
|
||||
if ((EXPLAIN_MODE_ANALYZE == ctx->mode) && (group->physiPlanNum != group->physiPlanExecNum)) {
|
||||
qError("physiPlanNum %d mismatch with physiExecNum %d in group %d", group->physiPlanNum, group->physiPlanExecNum, groupId);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qExplainResNodeToRows(node, ctx, level));
|
||||
|
||||
_return:
|
||||
|
||||
qFreeExplainResTree(node);
|
||||
qExplainFreeResNode(node);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||
int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||
SExplainCtx *pCtx = (SExplainCtx *)ctx;
|
||||
int32_t rowNum = taosArrayGetSize(pCtx->rows);
|
||||
if (rowNum <= 0) {
|
||||
|
@ -572,7 +783,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
}
|
||||
|
||||
int32_t colNum = 1;
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->totalSize;
|
||||
int32_t rspSize = sizeof(SRetrieveTableRsp) + sizeof(int32_t) * colNum + sizeof(int32_t) * rowNum + pCtx->dataSize;
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == rsp) {
|
||||
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
|
||||
|
@ -582,7 +793,7 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
rsp->completed = 1;
|
||||
rsp->numOfRows = htonl(rowNum);
|
||||
|
||||
*(int32_t *)rsp->data = htonl(pCtx->totalSize);
|
||||
*(int32_t *)rsp->data = htonl(pCtx->dataSize);
|
||||
|
||||
int32_t *offset = (int32_t *)((char *)rsp->data + sizeof(int32_t));
|
||||
char *data = (char *)(offset + rowNum);
|
||||
|
@ -604,13 +815,13 @@ int32_t qGetExplainRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
||||
|
||||
int32_t qExplainPrepareCtx(SQueryPlan *pDag, SExplainCtx **pCtx) {
|
||||
int32_t code = 0;
|
||||
SNodeListNode *plans = NULL;
|
||||
int32_t taskNum = 0;
|
||||
SExplainGroup *pGroup = NULL;
|
||||
void *pCtx = NULL;
|
||||
int32_t rootGroupId = 0;
|
||||
SExplainCtx *ctx = NULL;
|
||||
|
||||
if (pDag->numOfSubplans <= 0) {
|
||||
qError("invalid subplan num:%d", pDag->numOfSubplans);
|
||||
|
@ -629,7 +840,7 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
|||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qInitExplainCtx(&pCtx, groupHash, pDag->explainInfo.verbose));
|
||||
QRY_ERR_JRET(qExplainInitCtx(&ctx, groupHash, pDag->explainInfo.verbose, pDag->explainInfo.ratio, pDag->explainInfo.mode));
|
||||
|
||||
for (int32_t i = 0; i < levelNum; ++i) {
|
||||
plans = (SNodeListNode *)nodesListGetNode(pDag->pSubplans, i);
|
||||
|
@ -653,7 +864,10 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
|||
continue;
|
||||
}
|
||||
|
||||
SExplainGroup group = {.nodeNum = 1, .plan = plan, .execInfo = NULL};
|
||||
SExplainGroup group = {0};
|
||||
group.nodeNum = 1;
|
||||
group.plan = plan;
|
||||
|
||||
if (0 != taosHashPut(groupHash, &plan->id.groupId, sizeof(plan->id.groupId), &group, sizeof(group))) {
|
||||
qError("taosHashPut to explainGroupHash failed, taskIdx:%d", n);
|
||||
QRY_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -666,22 +880,130 @@ int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
|||
QRY_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
rootGroupId = plan->id.groupId;
|
||||
ctx->rootGroupId = plan->id.groupId;
|
||||
}
|
||||
|
||||
qDebug("level %d group handled, taskNum:%d", i, taskNum);
|
||||
}
|
||||
|
||||
QRY_ERR_JRET(qAppendTaskExplainResRows(pCtx, rootGroupId, 0));
|
||||
|
||||
QRY_ERR_JRET(qGetExplainRspFromCtx(pCtx, pRsp));
|
||||
*pCtx = ctx;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
qFreeExplainCtx(pCtx);
|
||||
qExplainFreeCtx(ctx);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
|
||||
|
||||
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp) {
|
||||
SExplainResNode *node = NULL;
|
||||
int32_t code = 0;
|
||||
bool groupDone = false;
|
||||
SExplainCtx *ctx = (SExplainCtx *)pCtx;
|
||||
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &groupId, sizeof(groupId));
|
||||
if (NULL == group) {
|
||||
qError("group %d not in groupHash", groupId);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
taosWLockLatch(&group->lock);
|
||||
if (NULL == group->nodeExecInfo) {
|
||||
group->nodeExecInfo = taosArrayInit(group->nodeNum, sizeof(SExplainRsp));
|
||||
if (NULL == group->nodeExecInfo) {
|
||||
qError("taosArrayInit %d explainExecInfo failed", group->nodeNum);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
group->physiPlanExecNum = pRspMsg->numOfPlans;
|
||||
} else if (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum) {
|
||||
qError("group execInfo already full, size:%d, nodeNum:%d", (int32_t)taosArrayGetSize(group->nodeExecInfo), group->nodeNum);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
if (group->physiPlanExecNum != pRspMsg->numOfPlans) {
|
||||
qError("physiPlanExecNum %d mismatch with others %d in group %d", pRspMsg->numOfPlans, group->physiPlanExecNum, groupId);
|
||||
taosMemoryFreeClear(pRspMsg->subplanInfo);
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
taosArrayPush(group->nodeExecInfo, pRspMsg);
|
||||
groupDone = (taosArrayGetSize(group->nodeExecInfo) >= group->nodeNum);
|
||||
|
||||
taosWUnLockLatch(&group->lock);
|
||||
|
||||
if (groupDone && (taosHashGetSize(pCtx->groupHash) == atomic_add_fetch_32(&pCtx->groupDoneNum, 1))) {
|
||||
if (atomic_load_8((int8_t *)&pCtx->execDone)) {
|
||||
if (0 == taosWTryLockLatch(&pCtx->lock)) {
|
||||
QRY_ERR_RET(qExplainGenerateRsp(pCtx, pRsp));
|
||||
// LEAVE LOCK THERE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp) {
|
||||
int32_t code = 0;
|
||||
SExplainCtx *pCtx = NULL;
|
||||
|
||||
QRY_ERR_RET(qExplainPrepareCtx(pDag, &pCtx));
|
||||
|
||||
QRY_ERR_JRET(qExplainGenerateRsp(pCtx, pRsp));
|
||||
|
||||
_return:
|
||||
|
||||
qExplainFreeCtx(pCtx);
|
||||
|
||||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs) {
|
||||
QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx));
|
||||
|
||||
(*pCtx)->reqStartTs = startTs;
|
||||
(*pCtx)->jobStartTs = taosGetTimestampMs();
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
int32_t code = 0;
|
||||
pCtx->jobDoneTs = taosGetTimestampMs();
|
||||
|
||||
atomic_store_8((int8_t *)&pCtx->execDone, true);
|
||||
|
||||
if (taosHashGetSize(pCtx->groupHash) == atomic_load_32(&pCtx->groupDoneNum)) {
|
||||
if (0 == taosWTryLockLatch(&pCtx->lock)) {
|
||||
QRY_ERR_RET(qExplainGenerateRsp(pCtx, pRsp));
|
||||
// LEAVE LOCK THERE
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ extern "C" {
|
|||
#include "thash.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tpagedbuf.h"
|
||||
#include "tmsg.h"
|
||||
|
||||
struct SColumnFilterElem;
|
||||
|
||||
|
@ -165,7 +166,7 @@ typedef struct STaskCostInfo {
|
|||
|
||||
typedef struct SOperatorCostInfo {
|
||||
uint64_t openCost;
|
||||
uint64_t execCost;
|
||||
uint64_t totalCost;
|
||||
} SOperatorCostInfo;
|
||||
|
||||
typedef struct SOrder {
|
||||
|
@ -238,6 +239,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result
|
|||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
|
||||
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
||||
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void **pOptrExplain);
|
||||
|
||||
typedef struct STaskIdInfo {
|
||||
uint64_t queryId; // this is also a request id
|
||||
|
@ -306,26 +308,27 @@ enum {
|
|||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
uint8_t operatorType;
|
||||
bool blockingOptr; // block operator or not
|
||||
uint8_t status; // denote if current operator is completed
|
||||
int32_t numOfOutput; // number of columns of the current operator results
|
||||
char* name; // name, used to show the query execution plan
|
||||
void* info; // extension attribution
|
||||
SExprInfo* pExpr;
|
||||
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
|
||||
SExecTaskInfo* pTaskInfo;
|
||||
SOperatorCostInfo cost;
|
||||
SResultInfo resultInfo;
|
||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||
__optr_fn_t getNextFn;
|
||||
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
|
||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||
__optr_close_fn_t closeFn;
|
||||
__optr_encode_fn_t encodeResultRow;
|
||||
__optr_decode_fn_t decodeResultRow;
|
||||
uint8_t operatorType;
|
||||
bool blockingOptr; // block operator or not
|
||||
uint8_t status; // denote if current operator is completed
|
||||
int32_t numOfOutput; // number of columns of the current operator results
|
||||
char* name; // name, used to show the query execution plan
|
||||
void* info; // extension attribution
|
||||
SExprInfo* pExpr;
|
||||
STaskRuntimeEnv* pRuntimeEnv; // todo remove it
|
||||
SExecTaskInfo* pTaskInfo;
|
||||
SOperatorCostInfo cost;
|
||||
SResultInfo resultInfo;
|
||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||
__optr_fn_t getNextFn;
|
||||
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
|
||||
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||
__optr_close_fn_t closeFn;
|
||||
__optr_encode_fn_t encodeResultRow;
|
||||
__optr_decode_fn_t decodeResultRow;
|
||||
__optr_get_explain_fn_t getExplainFn;
|
||||
} SOperatorInfo;
|
||||
|
||||
typedef struct {
|
||||
|
@ -722,6 +725,7 @@ int32_t getMaximumIdleDurationSec();
|
|||
void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type);
|
||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, EOPTR_EXEC_MODEL model);
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -229,3 +229,12 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) {
|
|||
queryCostStatis(pTaskInfo); // print the query cost summary
|
||||
doDestroyTask(pTaskInfo);
|
||||
}
|
||||
|
||||
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes) {
|
||||
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)tinfo;
|
||||
int32_t capacity = 0;
|
||||
|
||||
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -1307,6 +1307,12 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
|
|||
pResult->info.rows = dest.numOfRows;
|
||||
taosArrayDestroy(pBlockList);
|
||||
}
|
||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
|
||||
for (int32_t i = 0; i < pSrcBlock->info.rows; ++i) {
|
||||
colDataAppend(pColInfoData, i, taosVariantGet(&pExpr[k].base.pParam[0].param, pExpr[k].base.pParam[0].type), TSDB_DATA_TYPE_NULL == pExpr[k].base.pParam[0].param.nType);
|
||||
}
|
||||
pResult->info.rows = pSrcBlock->info.rows;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -1938,7 +1944,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
|
|||
}
|
||||
}
|
||||
pCtx->resDataInfo.interBufSize = env.calcMemSize;
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) {
|
||||
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element.
|
||||
}
|
||||
|
||||
|
@ -7104,6 +7110,17 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
|||
|
||||
// pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||
// pExp->base.pParam[0].pCol = createColumn(pTargetNode->dataBlockId, pTargetNode->slotId, pType);
|
||||
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_VALUE) {
|
||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
|
||||
|
||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||
pExp->base.numOfParams = 1;
|
||||
|
||||
SDataType* pType = &pValNode->node.resType;
|
||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pValNode->node.aliasName);
|
||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
||||
valueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -7669,3 +7686,42 @@ void releaseQueryBuf(size_t numOfTables) {
|
|||
// restore value is not enough buffer available
|
||||
atomic_add_fetch_64(&tsQueryBufferSizeBytes, t);
|
||||
}
|
||||
|
||||
int32_t getOperatorExplainExecInfo(SOperatorInfo *operatorInfo, SExplainExecInfo **pRes, int32_t *capacity, int32_t *resNum) {
|
||||
if (*resNum >= *capacity) {
|
||||
*capacity += 10;
|
||||
|
||||
*pRes = taosMemoryRealloc(*pRes, (*capacity) * sizeof(SExplainExecInfo));
|
||||
if (NULL == *pRes) {
|
||||
qError("malloc %d failed", (*capacity) * (int32_t)sizeof(SExplainExecInfo));
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
(*pRes)[*resNum].numOfRows = operatorInfo->resultInfo.totalRows;
|
||||
(*pRes)[*resNum].startupCost = operatorInfo->cost.openCost;
|
||||
(*pRes)[*resNum].totalCost = operatorInfo->cost.totalCost;
|
||||
|
||||
if (operatorInfo->getExplainFn) {
|
||||
int32_t code = (*operatorInfo->getExplainFn)(operatorInfo, &(*pRes)->verboseInfo);
|
||||
if (code) {
|
||||
qError("operator getExplainFn failed, error:%s", tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
++(*resNum);
|
||||
|
||||
int32_t code = 0;
|
||||
for (int32_t i = 0; i < operatorInfo->numOfDownstream; ++i) {
|
||||
code = getOperatorExplainExecInfo(operatorInfo->pDownstream[i], pRes, capacity, resNum);
|
||||
if (code) {
|
||||
taosMemoryFreeClear(*pRes);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ extern "C" {
|
|||
#define NODES_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define NODES_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1047,3 +1047,96 @@ int32_t nodesCollectFuncs(SSelectStmt* pSelect, FFuncClassifier classifier, SNod
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
char *getFillModeString(EFillMode mode) {
|
||||
switch (mode) {
|
||||
case FILL_MODE_NONE:
|
||||
return "none";
|
||||
case FILL_MODE_VALUE:
|
||||
return "value";
|
||||
case FILL_MODE_PREV:
|
||||
return "prev";
|
||||
case FILL_MODE_NULL:
|
||||
return "null";
|
||||
case FILL_MODE_LINEAR:
|
||||
return "linear";
|
||||
case FILL_MODE_NEXT:
|
||||
return "next";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
char *nodesGetNameFromColumnNode(SNode *pNode) {
|
||||
if (NULL == pNode || QUERY_NODE_COLUMN != pNode->type) {
|
||||
return "NULL";
|
||||
}
|
||||
|
||||
return ((SColumnNode *)pNode)->colName;
|
||||
}
|
||||
|
||||
int32_t nodesGetOutputNumFromSlotList(SNodeList* pSlots) {
|
||||
if (NULL == pSlots || pSlots->length <= 0) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
SNode* pNode = NULL;
|
||||
int32_t num = 0;
|
||||
FOREACH(pNode, pSlots) {
|
||||
if (QUERY_NODE_SLOT_DESC != pNode->type) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SSlotDescNode *descNode = (SSlotDescNode *)pNode;
|
||||
if (descNode->output) {
|
||||
++num;
|
||||
}
|
||||
}
|
||||
|
||||
return num;
|
||||
}
|
||||
|
||||
|
||||
void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||
pVal->nType = pNode->node.resType.type;
|
||||
pVal->nLen = pNode->node.resType.bytes;
|
||||
switch (pNode->node.resType.type) {
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pVal->i = pNode->datum.b;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
pVal->i = pNode->datum.i;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
pVal->u = pNode->datum.u;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pVal->d = pNode->datum.d;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
pVal->pz = pNode->datum.p;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -438,8 +438,12 @@ static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
|
|||
|
||||
static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||
if (nodesIsUnaryOp(pOp)) {
|
||||
if (OP_TYPE_MINUS == pOp->opType && !IS_NUMERIC_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
if (OP_TYPE_MINUS == pOp->opType) {
|
||||
if (!IS_MATHABLE_TYPE(((SExprNode*)(pOp->pLeft))->resType.type)) {
|
||||
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pLeft))->aliasName);
|
||||
}
|
||||
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
|
||||
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
@ -2338,46 +2342,6 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap,
|
|||
}
|
||||
}
|
||||
|
||||
static void valueNodeToVariant(const SValueNode* pNode, SVariant* pVal) {
|
||||
pVal->nType = pNode->node.resType.type;
|
||||
pVal->nLen = pNode->node.resType.bytes;
|
||||
switch (pNode->node.resType.type) {
|
||||
case TSDB_DATA_TYPE_NULL:
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
pVal->i = pNode->datum.b;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||
pVal->i = pNode->datum.i;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
pVal->u = pNode->datum.u;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
pVal->d = pNode->datum.d;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
case TSDB_DATA_TYPE_VARCHAR:
|
||||
case TSDB_DATA_TYPE_VARBINARY:
|
||||
pVal->pz = pNode->datum.p;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_JSON:
|
||||
case TSDB_DATA_TYPE_DECIMAL:
|
||||
case TSDB_DATA_TYPE_BLOB:
|
||||
// todo
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t addValToKVRow(STranslateContext* pCxt, SValueNode* pVal, const SSchema* pSchema, SKVRowBuilder* pBuilder) {
|
||||
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
|
||||
return pCxt->errCode;
|
||||
|
|
|
@ -107,15 +107,17 @@ typedef struct SQWTaskCtx {
|
|||
SRWLatch lock;
|
||||
int8_t phase;
|
||||
int8_t taskType;
|
||||
int8_t explain;
|
||||
|
||||
bool emptyRes;
|
||||
bool queryFetched;
|
||||
bool queryEnd;
|
||||
bool queryContinue;
|
||||
bool queryInQueue;
|
||||
int32_t rspCode;
|
||||
|
||||
SQWConnInfo connInfo;
|
||||
SQWConnInfo ctrlConnInfo;
|
||||
SQWConnInfo dataConnInfo;
|
||||
|
||||
int8_t events[QW_EVENT_MAX];
|
||||
|
||||
qTaskInfo_t taskHandle;
|
||||
|
|
|
@ -23,7 +23,7 @@ extern "C" {
|
|||
#include "qworkerInt.h"
|
||||
#include "dataSinkMgt.h"
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType);
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain);
|
||||
int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg);
|
||||
|
@ -38,6 +38,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple
|
|||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
|
||||
int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendQueryRsp(SQWConnInfo *pConn, int32_t code);
|
||||
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num);
|
||||
void qwFreeFetchRsp(void *msg);
|
||||
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
|
||||
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
|
||||
|
|
|
@ -432,8 +432,10 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
|
||||
|
||||
void qwFreeTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
tmsgReleaseHandle(ctx->connInfo.handle, TAOS_CONN_SERVER);
|
||||
ctx->connInfo.handle = NULL;
|
||||
tmsgReleaseHandle(ctx->ctrlConnInfo.handle, TAOS_CONN_SERVER);
|
||||
ctx->ctrlConnInfo.handle = NULL;
|
||||
|
||||
// NO need to release dataConnInfo
|
||||
|
||||
qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle);
|
||||
|
||||
|
@ -537,6 +539,29 @@ int32_t qwDropTask(QW_FPARAMS_DEF) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||
qTaskInfo_t *taskHandle = &ctx->taskHandle;
|
||||
|
||||
if (TASK_TYPE_TEMP == ctx->taskType) {
|
||||
if (ctx->explain) {
|
||||
SExplainExecInfo *execInfo = NULL;
|
||||
int32_t resNum = 0;
|
||||
QW_ERR_RET(qGetExplainExecInfo(ctx->taskHandle, &resNum, &execInfo));
|
||||
|
||||
SQWConnInfo connInfo = {0};
|
||||
connInfo.handle = ctx->ctrlConnInfo.handle;
|
||||
|
||||
QW_ERR_RET(qwBuildAndSendExplainRsp(&connInfo, execInfo, resNum));
|
||||
}
|
||||
|
||||
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
|
||||
int32_t code = 0;
|
||||
bool qcontinue = true;
|
||||
|
@ -562,10 +587,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
|
|||
QW_TASK_DLOG("qExecTask end with empty res, useconds:%"PRIu64, useconds);
|
||||
|
||||
dsEndPut(sinkHandle, useconds);
|
||||
|
||||
if (TASK_TYPE_TEMP == ctx->taskType) {
|
||||
qwFreeTaskHandle(QW_FPARAMS(), taskHandle);
|
||||
}
|
||||
|
||||
QW_ERR_RET(qwHandleTaskComplete(QW_FPARAMS(), ctx));
|
||||
|
||||
if (queryEnd) {
|
||||
*queryEnd = true;
|
||||
|
@ -658,19 +681,6 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void
|
|||
bool queryEnd = false;
|
||||
int32_t code = 0;
|
||||
|
||||
if (ctx->emptyRes) {
|
||||
QW_TASK_DLOG_E("query end with empty result");
|
||||
|
||||
qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED);
|
||||
QW_ERR_RET(qwMallocFetchRsp(len, &rsp));
|
||||
|
||||
*rspMsg = rsp;
|
||||
*dataLen = 0;
|
||||
pOutput->queryEnd = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
||||
|
||||
if (len < 0) {
|
||||
|
@ -760,12 +770,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
}
|
||||
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
dropConnection = &ctx->connInfo;
|
||||
dropConnection = &ctx->ctrlConnInfo;
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
dropConnection = NULL;
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
break;
|
||||
|
@ -798,12 +808,12 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
}
|
||||
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) {
|
||||
dropConnection = &ctx->connInfo;
|
||||
dropConnection = &ctx->ctrlConnInfo;
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
dropConnection = NULL;
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
}
|
||||
|
@ -863,17 +873,13 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
}
|
||||
|
||||
if (QW_PHASE_POST_QUERY == phase) {
|
||||
if (NULL == ctx->taskHandle && NULL == ctx->sinkHandle) {
|
||||
ctx->emptyRes = true;
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_READY)) {
|
||||
readyConnection = &ctx->connInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||
}
|
||||
#else
|
||||
connInfo.handle = ctx->connInfo.handle;
|
||||
connInfo.handle = ctx->ctrlConnInfo.handle;
|
||||
readyConnection = &connInfo;
|
||||
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_READY);
|
||||
|
@ -886,8 +892,8 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp
|
|||
QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
qwBuildAndSendDropRsp(&ctx->connInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->connInfo.handle, code, tstrerror(code));
|
||||
qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code);
|
||||
QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(qwDropTask(QW_FPARAMS()));
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
|
@ -931,7 +937,7 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
||||
int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain) {
|
||||
int32_t code = 0;
|
||||
bool queryRsped = false;
|
||||
struct SSubplan *plan = NULL;
|
||||
|
@ -947,9 +953,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType) {
|
|||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
||||
atomic_store_8(&ctx->taskType, taskType);
|
||||
atomic_store_8(&ctx->explain, explain);
|
||||
|
||||
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
|
||||
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
|
||||
atomic_store_ptr(&ctx->ctrlConnInfo.handle, qwMsg->connInfo.handle);
|
||||
atomic_store_ptr(&ctx->ctrlConnInfo.ahandle, qwMsg->connInfo.ahandle);
|
||||
|
||||
QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg);
|
||||
|
||||
|
@ -1011,8 +1018,8 @@ int32_t qwProcessReady(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
}
|
||||
|
||||
if (ctx->phase == QW_PHASE_PRE_QUERY) {
|
||||
ctx->connInfo.handle == qwMsg->connInfo.handle;
|
||||
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
|
||||
ctx->ctrlConnInfo.handle == qwMsg->connInfo.handle;
|
||||
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_READY);
|
||||
needRsp = false;
|
||||
QW_TASK_DLOG_E("ready msg will not rsp now");
|
||||
|
@ -1089,10 +1096,13 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
|
||||
if (rsp) {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t*)&ctx->queryEnd, true);
|
||||
}
|
||||
|
||||
qwMsg->connInfo = ctx->connInfo;
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||
|
||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
||||
|
@ -1113,7 +1123,7 @@ _return:
|
|||
qwFreeFetchRsp(rsp);
|
||||
rsp = NULL;
|
||||
|
||||
qwMsg->connInfo = ctx->connInfo;
|
||||
qwMsg->connInfo = ctx->dataConnInfo;
|
||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, 0, code);
|
||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), 0);
|
||||
}
|
||||
|
@ -1151,14 +1161,17 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
QW_ERR_JRET(qwGetResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||
|
||||
if (NULL == rsp) {
|
||||
atomic_store_ptr(&ctx->connInfo.handle, qwMsg->connInfo.handle);
|
||||
atomic_store_ptr(&ctx->connInfo.ahandle, qwMsg->connInfo.ahandle);
|
||||
atomic_store_ptr(&ctx->dataConnInfo.handle, qwMsg->connInfo.handle);
|
||||
atomic_store_ptr(&ctx->dataConnInfo.ahandle, qwMsg->connInfo.ahandle);
|
||||
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH);
|
||||
} else {
|
||||
bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd);
|
||||
|
||||
qwBuildFetchRsp(rsp, &sOutput, dataLen, qComplete);
|
||||
atomic_store_8((int8_t*)&ctx->queryEnd, qComplete);
|
||||
if (qComplete) {
|
||||
atomic_store_8((int8_t*)&ctx->queryEnd, true);
|
||||
}
|
||||
}
|
||||
|
||||
if ((!sOutput.queryEnd) && (DS_BUF_LOW == sOutput.bufStatus || DS_BUF_EMPTY == sOutput.bufStatus)) {
|
||||
|
@ -1236,8 +1249,8 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
}
|
||||
|
||||
if (!rsped) {
|
||||
ctx->connInfo.handle = qwMsg->connInfo.handle;
|
||||
ctx->connInfo.ahandle = qwMsg->connInfo.ahandle;
|
||||
ctx->ctrlConnInfo.handle = qwMsg->connInfo.handle;
|
||||
ctx->ctrlConnInfo.ahandle = qwMsg->connInfo.ahandle;
|
||||
|
||||
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
|
||||
}
|
||||
|
|
|
@ -85,6 +85,27 @@ int32_t qwBuildAndSendReadyRsp(SQWConnInfo *pConn, int32_t code) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendExplainRsp(SQWConnInfo *pConn, SExplainExecInfo *execInfo, int32_t num) {
|
||||
SExplainRsp rsp = {.numOfPlans = num, .subplanInfo = execInfo};
|
||||
|
||||
int32_t contLen = tSerializeSExplainRsp(NULL, 0, &rsp);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
tSerializeSExplainRsp(pRsp, contLen, &rsp);
|
||||
|
||||
SRpcMsg rpcRsp = {
|
||||
.msgType = TDMT_VND_EXPLAIN_RSP,
|
||||
.handle = pConn->handle,
|
||||
.ahandle = pConn->ahandle,
|
||||
.pCont = pRsp,
|
||||
.contLen = contLen,
|
||||
.code = 0,
|
||||
};
|
||||
|
||||
tmsgSendRsp(&rpcRsp);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *pStatus, int32_t code) {
|
||||
int32_t contLen = tSerializeSSchedulerHbRsp(NULL, 0, pStatus);
|
||||
void *pRsp = rpcMallocCont(contLen);
|
||||
|
@ -327,7 +348,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
|
|||
QW_SCH_TASK_DLOG("processQuery start, node:%p, handle:%p, sql:%s", node, pMsg->handle, sql);
|
||||
taosMemoryFreeClear(sql);
|
||||
|
||||
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType));
|
||||
QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType, msg->explain));
|
||||
|
||||
QW_SCH_TASK_DLOG("processQuery end, node:%p", node);
|
||||
|
||||
|
|
|
@ -75,6 +75,8 @@ static FORCE_INLINE _getDoubleValue_fn_t getVectorDoubleValueFn(int32_t srcType)
|
|||
p = getVectorDoubleValue_FLOAT;
|
||||
} else if (srcType == TSDB_DATA_TYPE_DOUBLE) {
|
||||
p = getVectorDoubleValue_DOUBLE;
|
||||
} else if (srcType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
p = getVectorDoubleValue_BIGINT;
|
||||
} else {
|
||||
assert(0);
|
||||
}
|
||||
|
|
|
@ -3314,7 +3314,8 @@ bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t
|
|||
|
||||
|
||||
|
||||
int32_t filterGetTimeRange(SFilterInfo *info, STimeWindow *win) {
|
||||
int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict) {
|
||||
SFilterInfo *info = NULL;
|
||||
SFilterRange ra = {0};
|
||||
SFilterRangeCtx *prev = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||
SFilterRangeCtx *tmpc = filterInitRangeCtx(TSDB_DATA_TYPE_TIMESTAMP, FLT_OPTION_TIMESTAMP);
|
||||
|
|
|
@ -10,7 +10,8 @@
|
|||
|
||||
int32_t scalarGetOperatorParamNum(EOperatorType type) {
|
||||
if (OP_TYPE_IS_NULL == type || OP_TYPE_IS_NOT_NULL == type || OP_TYPE_IS_TRUE == type || OP_TYPE_IS_NOT_TRUE == type
|
||||
|| OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type) {
|
||||
|| OP_TYPE_IS_FALSE == type || OP_TYPE_IS_NOT_FALSE == type || OP_TYPE_IS_UNKNOWN == type || OP_TYPE_IS_NOT_UNKNOWN == type
|
||||
|| OP_TYPE_MINUS == type) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
|
|
|
@ -773,6 +773,32 @@ void vectorMathRemainder(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam
|
|||
doReleaseVec(pRightCol, rightConvert);
|
||||
}
|
||||
|
||||
void vectorMathMinus(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) {
|
||||
SColumnInfoData *pOutputCol = pOut->columnData;
|
||||
|
||||
pOut->numOfRows = pLeft->numOfRows;
|
||||
|
||||
int32_t i = ((_ord) == TSDB_ORDER_ASC)? 0 : (pLeft->numOfRows - 1);
|
||||
int32_t step = ((_ord) == TSDB_ORDER_ASC)? 1 : -1;
|
||||
|
||||
int32_t leftConvert = 0;
|
||||
SColumnInfoData *pLeftCol = doVectorConvert(pLeft, &leftConvert);
|
||||
|
||||
_getDoubleValue_fn_t getVectorDoubleValueFnLeft = getVectorDoubleValueFn(pLeftCol->info.type);
|
||||
|
||||
double *output = (double *)pOutputCol->pData;
|
||||
for (; i < pLeft->numOfRows && i >= 0; i += step, output += 1) {
|
||||
*output = - getVectorDoubleValueFnLeft(pLeftCol->pData, i);
|
||||
}
|
||||
|
||||
pOutputCol->hasNull = pLeftCol->hasNull;
|
||||
if (pOutputCol->hasNull) {
|
||||
memcpy(pOutputCol->nullbitmap, pLeftCol->nullbitmap, BitmapLen(pLeft->numOfRows));
|
||||
}
|
||||
|
||||
doReleaseVec(pLeftCol, leftConvert);
|
||||
}
|
||||
|
||||
void vectorConcat(SScalarParam* pLeft, SScalarParam* pRight, void *out, int32_t _ord) {
|
||||
#if 0
|
||||
int32_t len = pLeft->bytes + pRight->bytes;
|
||||
|
@ -1102,6 +1128,8 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) {
|
|||
return vectorMathDivide;
|
||||
case OP_TYPE_MOD:
|
||||
return vectorMathRemainder;
|
||||
case OP_TYPE_MINUS:
|
||||
return vectorMathMinus;
|
||||
case OP_TYPE_GREATER_THAN:
|
||||
return vectorGreater;
|
||||
case OP_TYPE_GREATER_EQUAL:
|
||||
|
@ -1140,4 +1168,4 @@ _bin_scalar_fn_t getBinScalarOperatorFn(int32_t binFunctionId) {
|
|||
assert(0);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -234,15 +234,16 @@ TEST(timerangeTest, greater) {
|
|||
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
|
||||
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_THAN, TSDB_DATA_TYPE_BOOL, pcol, pval);
|
||||
|
||||
SFilterInfo *filter = NULL;
|
||||
int32_t code = filterInitFromNode(opNode1, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||
ASSERT_EQ(code, 0);
|
||||
//SFilterInfo *filter = NULL;
|
||||
//int32_t code = filterInitFromNode(opNode1, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||
//ASSERT_EQ(code, 0);
|
||||
STimeWindow win = {0};
|
||||
code = filterGetTimeRange(filter, &win);
|
||||
bool isStrict = false;
|
||||
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(win.skey, tsmall);
|
||||
ASSERT_EQ(win.ekey, INT64_MAX);
|
||||
filterFreeInfo(filter);
|
||||
//filterFreeInfo(filter);
|
||||
nodesDestroyNode(opNode1);
|
||||
}
|
||||
|
||||
|
@ -263,15 +264,16 @@ TEST(timerangeTest, greater_and_lower) {
|
|||
|
||||
flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
|
||||
|
||||
SFilterInfo *filter = NULL;
|
||||
int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||
ASSERT_EQ(code, 0);
|
||||
//SFilterInfo *filter = NULL;
|
||||
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
|
||||
//ASSERT_EQ(code, 0);
|
||||
STimeWindow win = {0};
|
||||
code = filterGetTimeRange(filter, &win);
|
||||
bool isStrict = false;
|
||||
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(win.skey, tsmall);
|
||||
ASSERT_EQ(win.ekey, tbig);
|
||||
filterFreeInfo(filter);
|
||||
//filterFreeInfo(filter);
|
||||
nodesDestroyNode(logicNode);
|
||||
}
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
#include "scheduler.h"
|
||||
#include "thash.h"
|
||||
#include "trpc.h"
|
||||
#include "command.h"
|
||||
|
||||
#define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000
|
||||
#define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000
|
||||
|
@ -165,6 +166,7 @@ typedef struct SSchJob {
|
|||
SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask*
|
||||
SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask*
|
||||
|
||||
SExplainCtx *explainCtx;
|
||||
int8_t status;
|
||||
SQueryNodeAddr resNode;
|
||||
tsem_t rspSem;
|
||||
|
@ -211,6 +213,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_JOB_NEED_FETCH(_job) SCH_IS_QUERY_JOB(_job)
|
||||
#define SCH_IS_WAIT_ALL_JOB(_job) (!SCH_IS_QUERY_JOB(_job))
|
||||
#define SCH_IS_NEED_DROP_JOB(_job) (SCH_IS_QUERY_JOB(_job))
|
||||
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
||||
|
||||
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
|
||||
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
|
||||
|
@ -251,6 +254,8 @@ int32_t schFetchFromRemote(SSchJob *pJob);
|
|||
int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
|
||||
int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId);
|
||||
int32_t schCloneSMsgSendInfo(void *src, void **dst);
|
||||
int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob);
|
||||
void schFreeJobImpl(void *job);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
|
|
@ -67,6 +67,81 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schInitJob(SSchJob **pSchJob, SQueryPlan *pDag, void *transport, SArray *pNodeList, const char *sql,
|
||||
int64_t startTs, bool syncSchedule) {
|
||||
int32_t code = 0;
|
||||
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
||||
if (NULL == pJob) {
|
||||
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||
pJob->attr.syncSchedule = syncSchedule;
|
||||
pJob->transport = transport;
|
||||
pJob->sql = sql;
|
||||
|
||||
if (pNodeList != NULL) {
|
||||
pJob->nodeList = taosArrayDup(pNodeList);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_JRET(qExecExplainBegin(pDag, &pJob->explainCtx, startTs));
|
||||
}
|
||||
|
||||
pJob->execTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->execTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pJob->succTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->succTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pJob->failTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->failTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
tsem_init(&pJob->rspSem, 0, 0);
|
||||
|
||||
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
|
||||
if (refId < 0) {
|
||||
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
if (NULL == schAcquireJob(refId)) {
|
||||
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
|
||||
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
pJob->refId = refId;
|
||||
|
||||
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
|
||||
|
||||
pJob->status = JOB_TASK_STATUS_NOT_START;
|
||||
|
||||
*pSchJob = pJob;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
schFreeJobImpl(pJob);
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
||||
void schFreeRpcCtx(SRpcCtx *pCtx) {
|
||||
if (NULL == pCtx) {
|
||||
return;
|
||||
|
@ -124,6 +199,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m
|
|||
int32_t reqMsgType = msgType - 1;
|
||||
switch (msgType) {
|
||||
case TDMT_SCH_LINK_BROKEN:
|
||||
case TDMT_VND_EXPLAIN_RSP:
|
||||
return TSDB_CODE_SUCCESS;
|
||||
case TDMT_VND_QUERY_RSP: // query_rsp may be processed later than ready_rsp
|
||||
if (lastMsgType != reqMsgType && -1 != lastMsgType && TDMT_VND_FETCH != lastMsgType) {
|
||||
|
@ -968,6 +1044,19 @@ _return:
|
|||
SCH_RET(schProcessOnTaskFailure(pJob, pJob->fetchTask, code));
|
||||
}
|
||||
|
||||
int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp) {
|
||||
SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed);
|
||||
|
||||
atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows));
|
||||
atomic_store_ptr(&pJob->resData, pRsp);
|
||||
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED);
|
||||
|
||||
schProcessOnDataFetched(pJob);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, char *msg, int32_t msgSize,
|
||||
int32_t rspCode) {
|
||||
|
@ -986,7 +1075,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
case TDMT_VND_CREATE_TABLE_RSP: {
|
||||
SVCreateTbBatchRsp batchRsp = {0};
|
||||
if (msg) {
|
||||
tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp);
|
||||
SCH_ERR_JRET(tDeserializeSVCreateTbBatchRsp(msg, msgSize, &batchRsp));
|
||||
if (batchRsp.rspList) {
|
||||
int32_t num = taosArrayGetSize(batchRsp.rspList);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
|
@ -1024,7 +1113,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
case TDMT_VND_QUERY_RSP: {
|
||||
SQueryTableRsp rsp = {0};
|
||||
if (msg) {
|
||||
tDeserializeSQueryTableRsp(msg, msgSize, &rsp);
|
||||
SCH_ERR_JRET(tDeserializeSQueryTableRsp(msg, msgSize, &rsp));
|
||||
SCH_ERR_JRET(rsp.code);
|
||||
}
|
||||
|
||||
|
@ -1050,6 +1139,36 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_EXPLAIN_RSP: {
|
||||
SCH_ERR_JRET(rspCode);
|
||||
if (NULL == msg) {
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (!SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_TASK_ELOG("invalid msg received for none explain query, msg type:%s", TMSG_INFO(msgType));
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (pJob->resData) {
|
||||
SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
SExplainRsp rsp = {0};
|
||||
if (tDeserializeSExplainRsp(msg, msgSize, &rsp)) {
|
||||
taosMemoryFree(rsp.subplanInfo);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qExplainUpdateExecInfo(pJob->explainCtx, &rsp, pTask->plan->id.groupId, &pRsp));
|
||||
|
||||
if (pRsp) {
|
||||
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_FETCH_RSP: {
|
||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||
|
||||
|
@ -1058,6 +1177,24 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
|||
SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
if (rsp->completed) {
|
||||
SRetrieveTableRsp *pRsp = NULL;
|
||||
SCH_ERR_JRET(qExecExplainEnd(pJob->explainCtx, &pRsp));
|
||||
if (pRsp) {
|
||||
SCH_ERR_JRET(schProcessOnExplainDone(pJob, pTask, pRsp));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
atomic_val_compare_exchange_32(&pJob->remoteFetch, 1, 0);
|
||||
|
||||
SCH_ERR_JRET(schFetchFromRemote(pJob));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pJob->resData) {
|
||||
SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData);
|
||||
taosMemoryFreeClear(rsp);
|
||||
|
@ -1158,6 +1295,10 @@ int32_t schHandleReadyCallback(void *param, const SDataBuf *pMsg, int32_t code)
|
|||
return schHandleCallback(param, pMsg, TDMT_VND_RES_READY_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleExplainCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
return schHandleCallback(param, pMsg, TDMT_VND_EXPLAIN_RSP, code);
|
||||
}
|
||||
|
||||
int32_t schHandleDropCallback(void *param, const SDataBuf *pMsg, int32_t code) {
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
qDebug("QID:%" PRIx64 ",TID:%" PRIx64 " drop task rsp received, code:%x", pParam->queryId, pParam->taskId, code);
|
||||
|
@ -1246,6 +1387,9 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
case TDMT_VND_RES_READY:
|
||||
*fp = schHandleReadyCallback;
|
||||
break;
|
||||
case TDMT_VND_EXPLAIN:
|
||||
*fp = schHandleExplainCallback;
|
||||
break;
|
||||
case TDMT_VND_FETCH:
|
||||
*fp = schHandleFetchCallback;
|
||||
break;
|
||||
|
@ -1266,6 +1410,43 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schGenerateTaskCallBackAHandle(SSchJob *pJob, SSchTask *pTask, int32_t msgType, SMsgSendInfo **pMsgSendInfo) {
|
||||
int32_t code = 0;
|
||||
SMsgSendInfo *msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == msgSendInfo) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
|
||||
if (NULL == param) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
__async_send_cb_fn_t fp = NULL;
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
|
||||
|
||||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->transport = pJob->transport;
|
||||
|
||||
msgSendInfo->param = param;
|
||||
msgSendInfo->fp = fp;
|
||||
|
||||
*pMsgSendInfo = msgSendInfo;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFree(param);
|
||||
taosMemoryFree(msgSendInfo);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
void schFreeRpcCtxVal(const void *arg) {
|
||||
if (NULL == arg) {
|
||||
return;
|
||||
|
@ -1352,8 +1533,8 @@ _return:
|
|||
|
||||
int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||
int32_t code = 0;
|
||||
SSchTaskCallbackParam *param = NULL;
|
||||
SMsgSendInfo *pMsgSendInfo = NULL;
|
||||
SMsgSendInfo *pReadyMsgSendInfo = NULL;
|
||||
SMsgSendInfo *pExplainMsgSendInfo = NULL;
|
||||
|
||||
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pCtx->args) {
|
||||
|
@ -1361,31 +1542,18 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
|
||||
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_RES_READY, &pReadyMsgSendInfo));
|
||||
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, TDMT_VND_EXPLAIN, &pExplainMsgSendInfo));
|
||||
|
||||
int32_t msgType = TDMT_VND_RES_READY_RSP;
|
||||
SRpcCtxVal ctxVal = {.val = pReadyMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
|
||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
|
||||
if (NULL == param) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
int32_t msgType = TDMT_VND_RES_READY_RSP;
|
||||
__async_send_cb_fn_t fp = NULL;
|
||||
SCH_ERR_JRET(schGetCallbackFp(TDMT_VND_RES_READY, &fp));
|
||||
|
||||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->transport = pJob->transport;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
SRpcCtxVal ctxVal = {.val = pMsgSendInfo, .clone = schCloneSMsgSendInfo, .freeFunc = schFreeRpcCtxVal};
|
||||
msgType = TDMT_VND_EXPLAIN_RSP;
|
||||
ctxVal.val = pExplainMsgSendInfo;
|
||||
if (taosHashPut(pCtx->args, &msgType, sizeof(msgType), &ctxVal, sizeof(ctxVal))) {
|
||||
SCH_TASK_ELOG("taosHashPut msg %d to rpcCtx failed", msgType);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -1398,8 +1566,16 @@ int32_t schMakeQueryRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
|||
_return:
|
||||
|
||||
taosHashCleanup(pCtx->args);
|
||||
taosMemoryFreeClear(param);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
|
||||
if (pReadyMsgSendInfo) {
|
||||
taosMemoryFreeClear(pReadyMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pReadyMsgSendInfo);
|
||||
}
|
||||
|
||||
if (pExplainMsgSendInfo) {
|
||||
taosMemoryFreeClear(pExplainMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pExplainMsgSendInfo);
|
||||
}
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
@ -1582,32 +1758,13 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
|
|||
|
||||
SSchTrans *trans = (SSchTrans *)transport;
|
||||
|
||||
SMsgSendInfo *pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (NULL == pMsgSendInfo) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SMsgSendInfo));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
SMsgSendInfo *pMsgSendInfo = NULL;
|
||||
SCH_ERR_JRET(schGenerateTaskCallBackAHandle(pJob, pTask, msgType, &pMsgSendInfo));
|
||||
|
||||
SSchTaskCallbackParam *param = taosMemoryCalloc(1, sizeof(SSchTaskCallbackParam));
|
||||
if (NULL == param) {
|
||||
SCH_TASK_ELOG("calloc %d failed", (int32_t)sizeof(SSchTaskCallbackParam));
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
__async_send_cb_fn_t fp = NULL;
|
||||
SCH_ERR_JRET(schGetCallbackFp(msgType, &fp));
|
||||
|
||||
param->queryId = pJob->queryId;
|
||||
param->refId = pJob->refId;
|
||||
param->taskId = SCH_TASK_ID(pTask);
|
||||
param->transport = trans->transInst;
|
||||
|
||||
pMsgSendInfo->param = param;
|
||||
pMsgSendInfo->msgInfo.pData = msg;
|
||||
pMsgSendInfo->msgInfo.len = msgSize;
|
||||
pMsgSendInfo->msgInfo.handle = trans->transHandle;
|
||||
pMsgSendInfo->msgType = msgType;
|
||||
pMsgSendInfo->fp = fp;
|
||||
|
||||
qDebug("start to send %s msg to node[%d,%s,%d], refId:%" PRIx64 "instance:%p, handle:%p", TMSG_INFO(msgType),
|
||||
ntohl(((SMsgHead *)msg)->vgId), epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, pJob->refId,
|
||||
|
@ -1624,8 +1781,11 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, void *transport, SEpSet
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
if (pMsgSendInfo) {
|
||||
taosMemoryFreeClear(pMsgSendInfo->param);
|
||||
taosMemoryFreeClear(pMsgSendInfo);
|
||||
}
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1767,6 +1927,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
pMsg->taskId = htobe64(pTask->taskId);
|
||||
pMsg->refId = htobe64(pJob->refId);
|
||||
pMsg->taskType = TASK_TYPE_TEMP;
|
||||
pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
pMsg->phyLen = htonl(pTask->msgLen);
|
||||
pMsg->sqlLen = htonl(len);
|
||||
|
||||
|
@ -2083,6 +2244,8 @@ void schFreeJobImpl(void *job) {
|
|||
taosArrayDestroy(pJob->levels);
|
||||
taosArrayDestroy(pJob->nodeList);
|
||||
|
||||
qExplainFreeCtx(pJob->explainCtx);
|
||||
|
||||
taosMemoryFreeClear(pJob->resData);
|
||||
taosMemoryFreeClear(pJob);
|
||||
|
||||
|
@ -2090,70 +2253,17 @@ void schFreeJobImpl(void *job) {
|
|||
}
|
||||
|
||||
static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan *pDag, int64_t *job, const char *sql,
|
||||
bool syncSchedule) {
|
||||
int64_t startTs, bool syncSchedule) {
|
||||
qDebug("QID:0x%" PRIx64 " job started", pDag->queryId);
|
||||
|
||||
if (pNodeList == NULL || (pNodeList && taosArrayGetSize(pNodeList) <= 0)) {
|
||||
if (pNodeList == NULL || taosArrayGetSize(pNodeList) <= 0) {
|
||||
qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pDag->queryId);
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
|
||||
if (NULL == pJob) {
|
||||
qError("QID:%" PRIx64 " calloc %d failed", pDag->queryId, (int32_t)sizeof(SSchJob));
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
SSchJob *pJob = NULL;
|
||||
SCH_ERR_JRET(schInitJob(&pJob, pDag, transport, pNodeList, sql, startTs, syncSchedule));
|
||||
|
||||
pJob->attr.explainMode = pDag->explainInfo.mode;
|
||||
pJob->attr.syncSchedule = syncSchedule;
|
||||
pJob->transport = transport;
|
||||
pJob->sql = sql;
|
||||
|
||||
if (pNodeList != NULL) {
|
||||
pJob->nodeList = taosArrayDup(pNodeList);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateAndBuildJob(pDag, pJob));
|
||||
|
||||
pJob->execTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->execTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d execTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pJob->succTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->succTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d succTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pJob->failTasks =
|
||||
taosHashInit(pDag->numOfSubplans, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == pJob->failTasks) {
|
||||
SCH_JOB_ELOG("taosHashInit %d failTasks failed", pDag->numOfSubplans);
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
tsem_init(&pJob->rspSem, 0, 0);
|
||||
|
||||
int64_t refId = taosAddRef(schMgmt.jobRef, pJob);
|
||||
if (refId < 0) {
|
||||
SCH_JOB_ELOG("taosAddRef job failed, error:%s", tstrerror(terrno));
|
||||
SCH_ERR_JRET(terrno);
|
||||
}
|
||||
|
||||
if (NULL == schAcquireJob(refId)) {
|
||||
SCH_JOB_ELOG("schAcquireJob job failed, refId:%" PRIx64, refId);
|
||||
SCH_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
||||
pJob->refId = refId;
|
||||
|
||||
SCH_JOB_DLOG("job refId:%" PRIx64, pJob->refId);
|
||||
|
||||
pJob->status = JOB_TASK_STATUS_NOT_START;
|
||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||
|
||||
*job = pJob->refId;
|
||||
|
@ -2266,7 +2376,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
|||
}
|
||||
|
||||
int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, int64_t *pJob, const char *sql,
|
||||
SQueryResult *pRes) {
|
||||
int64_t startTs, SQueryResult *pRes) {
|
||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
@ -2274,7 +2384,7 @@ int32_t schedulerExecJob(void *transport, SArray *nodeList, SQueryPlan *pDag, in
|
|||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, nodeList, pDag, pJob, sql, true));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, true));
|
||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, sql, startTs, true));
|
||||
}
|
||||
|
||||
SSchJob *job = schAcquireJob(*pJob);
|
||||
|
@ -2292,7 +2402,11 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryPlan *pD
|
|||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, false));
|
||||
if (EXPLAIN_MODE_STATIC == pDag->explainInfo.mode) {
|
||||
SCH_ERR_RET(schExecStaticExplain(transport, pNodeList, pDag, pJob, sql, false));
|
||||
} else {
|
||||
SCH_ERR_RET(schExecJobImpl(transport, pNodeList, pDag, pJob, sql, 0, false));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -951,7 +951,7 @@ TEST(insertTest, normalCase) {
|
|||
taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId);
|
||||
|
||||
SQueryResult res = {0};
|
||||
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", &res);
|
||||
code = schedulerExecJob(mockPointer, qnodeList, &dag, &insertJobRefId, "insert into tb values(now,1)", 0, &res);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(res.numOfRows, 20);
|
||||
|
||||
|
|
|
@ -53,6 +53,21 @@ void taosWLockLatch(SRWLatch *pLatch) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t taosWTryLockLatch(SRWLatch *pLatch) {
|
||||
SRWLatch oLatch, nLatch;
|
||||
oLatch = atomic_load_32(pLatch);
|
||||
if (oLatch) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
nLatch = oLatch | TD_RWLATCH_WRITE_FLAG;
|
||||
if (atomic_val_compare_exchange_32(pLatch, oLatch, nLatch) == oLatch) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
void taosWUnLockLatch(SRWLatch *pLatch) { atomic_store_32(pLatch, 0); }
|
||||
|
||||
void taosRLockLatch(SRWLatch *pLatch) {
|
||||
|
|
Loading…
Reference in New Issue