feature/qnode
This commit is contained in:
parent
abd60582d4
commit
6f12bc1093
|
@ -22,7 +22,7 @@ typedef struct SExplainCtx SExplainCtx;
|
|||
int32_t qExecCommand(SNode* pStmt, SRetrieveTableRsp** pRsp);
|
||||
|
||||
int32_t qExecStaticExplain(SQueryPlan *pDag, SRetrieveTableRsp **pRsp);
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs);
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs);
|
||||
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp);
|
||||
int32_t qExplainUpdateExecInfo(SExplainCtx *pCtx, SExplainRsp *pRspMsg, int32_t groupId, SRetrieveTableRsp **pRsp);
|
||||
void qExplainFreeCtx(SExplainCtx *pCtx);
|
||||
|
|
|
@ -84,10 +84,10 @@ typedef struct {
|
|||
} SClientHbMgr;
|
||||
|
||||
typedef struct SQueryExecMetric {
|
||||
int64_t start; // start timestamp
|
||||
int64_t parsed; // start to parse
|
||||
int64_t send; // start to send to server
|
||||
int64_t rsp; // receive response from server
|
||||
int64_t start; // start timestamp, us
|
||||
int64_t parsed; // start to parse, us
|
||||
int64_t send; // start to send to server, us
|
||||
int64_t rsp; // receive response from server, us
|
||||
} SQueryExecMetric;
|
||||
|
||||
typedef struct SInstanceSummary {
|
||||
|
|
|
@ -65,10 +65,10 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
int32_t currentInst = atomic_sub_fetch_64(&pActivity->currentRequests, 1);
|
||||
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
|
||||
|
||||
int64_t duration = taosGetTimestampMs() - pRequest->metric.start;
|
||||
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
|
||||
" ms, current:%d, app current:%d",
|
||||
pRequest->self, pTscObj->id, pRequest->requestId, duration, num, currentInst);
|
||||
pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst);
|
||||
taosReleaseRef(clientConnRefPool, pTscObj->id);
|
||||
}
|
||||
|
||||
|
@ -151,7 +151,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
|
|||
|
||||
pRequest->pDb = getDbOfConnection(pObj);
|
||||
pRequest->requestId = generateRequestId();
|
||||
pRequest->metric.start = taosGetTimestampMs();
|
||||
pRequest->metric.start = taosGetTimestampUs();
|
||||
|
||||
pRequest->type = type;
|
||||
pRequest->pTscObj = pObj;
|
||||
|
|
|
@ -435,11 +435,11 @@ static int32_t hbCreateThread() {
|
|||
taosThreadAttrInit(&thAttr);
|
||||
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
// if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
|
||||
// terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
// return -1;
|
||||
// }
|
||||
// taosThreadAttrDestroy(&thAttr);
|
||||
if (taosThreadCreate(&clientHbMgr.thread, &thAttr, hbThreadFunc, NULL) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
taosThreadAttrDestroy(&thAttr);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -500,8 +500,6 @@ SAppHbMgr *appHbMgrInit(SAppInstInfo *pAppInstInfo, char *key) {
|
|||
}
|
||||
|
||||
void appHbMgrCleanup(void) {
|
||||
taosThreadMutexLock(&clientHbMgr.lock);
|
||||
|
||||
int sz = taosArrayGetSize(clientHbMgr.appHbMgrs);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
SAppHbMgr *pTarget = taosArrayGetP(clientHbMgr.appHbMgrs, i);
|
||||
|
@ -510,8 +508,6 @@ void appHbMgrCleanup(void) {
|
|||
taosHashCleanup(pTarget->connInfo);
|
||||
pTarget->connInfo = NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
|
@ -532,7 +528,6 @@ int hbMgrInit() {
|
|||
}
|
||||
|
||||
void hbMgrCleanUp() {
|
||||
#if 0
|
||||
hbStopThread();
|
||||
|
||||
// destroy all appHbMgr
|
||||
|
@ -545,7 +540,6 @@ void hbMgrCleanUp() {
|
|||
taosThreadMutexUnlock(&clientHbMgr.lock);
|
||||
|
||||
clientHbMgr.appHbMgrs = NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
int hbRegisterConnImpl(SAppHbMgr *pAppHbMgr, SClientHbKey connKey, SHbConnInfo *info) {
|
||||
|
|
|
@ -520,7 +520,7 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
assert(pRequest->self == pSendInfo->requestObjRefId);
|
||||
|
||||
pRequest->metric.rsp = taosGetTimestampMs();
|
||||
pRequest->metric.rsp = taosGetTimestampUs();
|
||||
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
if (pEpSet) {
|
||||
|
@ -536,10 +536,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
|
|||
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
|
||||
if (pMsg->code == TSDB_CODE_SUCCESS) {
|
||||
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
|
||||
} else {
|
||||
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed, pRequest->requestId);
|
||||
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
|
||||
}
|
||||
|
||||
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
|
||||
|
|
|
@ -44,6 +44,9 @@ extern "C" {
|
|||
#define EXPLAIN_OUTPUT_FORMAT "Output: "
|
||||
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
||||
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
||||
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
|
||||
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
|
||||
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
|
||||
|
||||
//append area
|
||||
#define EXPLAIN_LEFT_PARENTHESIS_FORMAT " ("
|
||||
|
@ -108,16 +111,19 @@ typedef struct SExplainCtx {
|
|||
#define EXPLAIN_ROW_NEW(level, ...) \
|
||||
do { \
|
||||
if (isVerboseLine) { \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s", (level) * 2 + 3, ""); \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, "%*s", (level) * 2 + 3, ""); \
|
||||
} else { \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, "%*s%s", (level) * 2, "", "-> "); \
|
||||
tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, "%*s%s", (level) * 2, "", "-> "); \
|
||||
} \
|
||||
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__); \
|
||||
tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__); \
|
||||
} while (0)
|
||||
|
||||
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - tlen, __VA_ARGS__)
|
||||
#define EXPLAIN_ROW_APPEND(...) tlen += snprintf(tbuf + VARSTR_HEADER_SIZE + tlen, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE - tlen, __VA_ARGS__)
|
||||
#define EXPLAIN_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; isVerboseLine = true; } while (0)
|
||||
|
||||
#define EXPLAIN_SUM_ROW_NEW(...) tlen = snprintf(tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE - VARSTR_HEADER_SIZE, __VA_ARGS__)
|
||||
#define EXPLAIN_SUM_ROW_END() do { varDataSetLen(tbuf, tlen); tlen += VARSTR_HEADER_SIZE; } while (0)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -894,9 +894,33 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExplainAppendPlanRows(SExplainCtx *pCtx) {
|
||||
if (EXPLAIN_MODE_ANALYZE != pCtx->mode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tlen = 0;
|
||||
char *tbuf = pCtx->tbuf;
|
||||
|
||||
EXPLAIN_SUM_ROW_NEW(EXPLAIN_RATIO_TIME_FORMAT, pCtx->ratio);
|
||||
EXPLAIN_SUM_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
|
||||
|
||||
EXPLAIN_SUM_ROW_NEW(EXPLAIN_PLANNING_TIME_FORMAT, (double)(pCtx->jobStartTs - pCtx->reqStartTs) / 1000.0);
|
||||
EXPLAIN_SUM_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
|
||||
|
||||
EXPLAIN_SUM_ROW_NEW(EXPLAIN_EXEC_TIME_FORMAT, (double)(pCtx->jobDoneTs - pCtx->jobStartTs) / 1000.0);
|
||||
EXPLAIN_SUM_ROW_END();
|
||||
QRY_ERR_RET(qExplainResAppendRow(pCtx, tbuf, tlen, 0));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExplainGenerateRsp(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(pCtx, pCtx->rootGroupId, 0));
|
||||
|
||||
QRY_ERR_RET(qExplainAppendPlanRows(pCtx));
|
||||
|
||||
QRY_ERR_RET(qExplainGetRspFromCtx(pCtx, pRsp));
|
||||
|
||||
|
@ -977,18 +1001,18 @@ _return:
|
|||
QRY_RET(code);
|
||||
}
|
||||
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int32_t startTs) {
|
||||
int32_t qExecExplainBegin(SQueryPlan *pDag, SExplainCtx **pCtx, int64_t startTs) {
|
||||
QRY_ERR_RET(qExplainPrepareCtx(pDag, pCtx));
|
||||
|
||||
(*pCtx)->reqStartTs = startTs;
|
||||
(*pCtx)->jobStartTs = taosGetTimestampMs();
|
||||
(*pCtx)->jobStartTs = taosGetTimestampUs();
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t qExecExplainEnd(SExplainCtx *pCtx, SRetrieveTableRsp **pRsp) {
|
||||
int32_t code = 0;
|
||||
pCtx->jobDoneTs = taosGetTimestampMs();
|
||||
pCtx->jobDoneTs = taosGetTimestampUs();
|
||||
|
||||
atomic_store_8((int8_t *)&pCtx->execDone, true);
|
||||
|
||||
|
|
|
@ -207,7 +207,9 @@ SNode* releaseRawExprNode(SAstCreateContext* pCxt, SNode* pNode) {
|
|||
CHECK_RAW_EXPR_NODE(pNode);
|
||||
SRawExprNode* pRawExpr = (SRawExprNode*)pNode;
|
||||
SNode* pExpr = pRawExpr->pNode;
|
||||
strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, pRawExpr->n);
|
||||
if (nodesIsExprNode(pExpr)) {
|
||||
strncpy(((SExprNode*)pExpr)->aliasName, pRawExpr->p, pRawExpr->n);
|
||||
}
|
||||
taosMemoryFreeClear(pNode);
|
||||
return pExpr;
|
||||
}
|
||||
|
@ -498,8 +500,9 @@ SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* p
|
|||
if (NULL == pNode || !pCxt->valid) {
|
||||
return pNode;
|
||||
}
|
||||
uint32_t maxLen = sizeof(((SExprNode*)pNode)->aliasName);
|
||||
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, pAlias->n > maxLen ? maxLen : pAlias->n);
|
||||
int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n);
|
||||
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len);
|
||||
((SExprNode*)pNode)->aliasName[len] = '\0';
|
||||
return pNode;
|
||||
}
|
||||
|
||||
|
|
|
@ -99,4 +99,4 @@ sql explain analyze verbose true select * from (select min(f1),count(*) a from s
|
|||
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
|
||||
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue