From 16cd734b7274a3da830175b45668545810ac46df Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 25 Jan 2022 14:13:11 +0800 Subject: [PATCH] [td-11818]update log. --- source/dnode/vnode/src/tsdb/tsdbRead.c | 2 +- source/libs/executor/inc/executorimpl.h | 6 ++++-- source/libs/executor/src/executorMain.c | 14 ++++++++------ source/libs/executor/src/executorimpl.c | 19 +++++++++++++++---- source/libs/qworker/src/qworker.c | 4 ++-- 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c3aad88582..372a5820b7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -417,7 +417,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond, pReadHandle->currentLoadExternalRows = pCond->loadExternalRows; char buf[128] = {0}; - snprintf(buf, tListLen(buf), "0x%"PRIx64" 0x%"PRIx64, taskId, qId); + snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId); pReadHandle->idStr = strdup(buf); if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 8af07eb537..f36071f46c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -378,8 +378,10 @@ typedef struct SExchangeInfo { SSDataBlock *pResult; int32_t current; uint64_t rowsOfCurrentSource; - uint64_t bytes; // total load bytes from remote - uint64_t totalRows; + + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed;// total elapsed time } SExchangeInfo; typedef struct STableScanInfo { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index f615a4672f..5082f7661c 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -154,7 +154,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { } if (isTaskKilled(pTaskInfo)) { - qDebug("%s it is already killed, abort", GET_TASKID(pTaskInfo)); + qDebug("%s already killed, abort", GET_TASKID(pTaskInfo)); return TSDB_CODE_SUCCESS; } @@ -163,12 +163,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { if (ret != TSDB_CODE_SUCCESS) { publishQueryAbortEvent(pTaskInfo, ret); pTaskInfo->code = ret; - qDebug("%s query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), + qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); return pTaskInfo->code; } - qDebug("%s query task is launched", GET_TASKID(pTaskInfo)); + qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -177,7 +177,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { st = taosGetTimestampUs(); *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); - pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); + uint64_t el = (taosGetTimestampUs() - st); + pTaskInfo->cost.elapsedTime += el; + publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); if (NULL == *pRes) { @@ -187,8 +189,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0; pTaskInfo->totalRows += current; - qDebug("%s task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", - GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0); + qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms", + GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0); atomic_store_64(&pTaskInfo->owner, 0); return pTaskInfo->code; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d405fecf87..db6d5557d8 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5146,6 +5146,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); if (pExchangeInfo->current >= totalSources) { + qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); return NULL; } @@ -5166,6 +5168,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { epSet.port[0] = pSource->addr.epAddr[0].port; tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0])); + int64_t startTs = taosGetTimestampUs(); qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources); @@ -5202,6 +5205,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pExchangeInfo->current += 1; if (pExchangeInfo->current >= totalSources) { + int64_t el = taosGetTimestampUs() - startTs; + pExchangeInfo->totalElapsed += el; + + qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources, + pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0); return NULL; } else { continue; @@ -5228,21 +5236,24 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { pRes->info.numOfCols = pOperator->numOfOutput; pRes->info.rows = pRsp->numOfRows; + int64_t el = taosGetTimestampUs() - startTs; + pExchangeInfo->totalRows += pRsp->numOfRows; - pExchangeInfo->bytes += pRsp->compLen; + pExchangeInfo->totalSize += pRsp->compLen; pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows; + pExchangeInfo->totalElapsed += el; if (pRsp->completed == 1) { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->current + 1, totalSources); pExchangeInfo->rowsOfCurrentSource = 0; pExchangeInfo->current += 1; } else { qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize); } return pExchangeInfo->pResult; @@ -5289,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = 0; - rpcInit.label = "TSC"; + rpcInit.label = "EX"; rpcInit.numOfThreads = 1; rpcInit.cfp = processRspMsg; rpcInit.sessions = tsMaxConnections; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index ad8700100b..af422a6976 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -467,11 +467,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { DataSinkHandle sinkHandle = ctx->sinkHandle; while (true) { - QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++); + QW_TASK_DLOG("start to execTask, loopIdx:%d", i++); code = qExecTask(*taskHandle, &pRes, &useconds); if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); + QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code)); QW_ERR_JRET(code); }