add perf logs
This commit is contained in:
parent
28693ce285
commit
f7a8393c47
|
@ -228,6 +228,10 @@ typedef struct SOperatorInfo {
|
||||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||||
SOperatorFpSet fpSet;
|
SOperatorFpSet fpSet;
|
||||||
|
int64_t downstreamTime;
|
||||||
|
int64_t funcInitTime;
|
||||||
|
int64_t funcExecTime;
|
||||||
|
int64_t funcFinTime;
|
||||||
} SOperatorInfo;
|
} SOperatorInfo;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -517,7 +517,9 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
|
||||||
|
pOperator->funcExecTime += taosGetTimestampUs() - st;
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
|
@ -1035,7 +1037,10 @@ void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowEntryInfoOffset);
|
||||||
|
pOperator->funcInitTime += taosGetTimestampUs() - st;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
static void setExecutionContext(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId) {
|
||||||
|
@ -1252,7 +1257,9 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
||||||
} else {
|
} else {
|
||||||
while (hasRemainResults(pGroupResInfo)) {
|
while (hasRemainResults(pGroupResInfo)) {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
||||||
|
pOperator->funcFinTime += taosGetTimestampUs() - st;
|
||||||
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1577,6 +1584,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
while (1) {
|
while (1) {
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
|
pOperator->downstreamTime += taosGetTimestampUs() - st;
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
if (!hasValidBlock) {
|
if (!hasValidBlock) {
|
||||||
createDataBlockForEmptyInput(pOperator, &pBlock);
|
createDataBlockForEmptyInput(pOperator, &pBlock);
|
||||||
|
@ -1694,6 +1702,15 @@ void destroyOperatorInfo(SOperatorInfo* pOperator) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_AGG) {
|
||||||
|
double downstream = (double)pOperator->downstreamTime / 1000000;
|
||||||
|
double init = (double)pOperator->funcInitTime / 1000000;
|
||||||
|
double exec = (double)pOperator->funcExecTime / 1000000;
|
||||||
|
double fin = (double)pOperator->funcFinTime / 1000000;
|
||||||
|
qError("operator: %s, downstream time:%lf, init time:%lf, exec time:%lf, fin time:%lf",
|
||||||
|
pOperator->name, downstream, init, exec, fin);
|
||||||
|
}
|
||||||
|
|
||||||
if (pOperator->fpSet.closeFn != NULL) {
|
if (pOperator->fpSet.closeFn != NULL) {
|
||||||
pOperator->fpSet.closeFn(pOperator->info);
|
pOperator->fpSet.closeFn(pOperator->info);
|
||||||
}
|
}
|
||||||
|
@ -1910,6 +1927,10 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
||||||
pTaskInfo);
|
pTaskInfo);
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo,
|
||||||
optrDefaultBufFn, NULL);
|
optrDefaultBufFn, NULL);
|
||||||
|
pOperator->downstreamTime = 0;
|
||||||
|
pOperator->funcInitTime = 0;
|
||||||
|
pOperator->funcExecTime = 0;
|
||||||
|
pOperator->funcFinTime = 0;
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pTableScanInfo = downstream->info;
|
STableScanInfo* pTableScanInfo = downstream->info;
|
||||||
|
|
Loading…
Reference in New Issue