[td-11818]update log.
This commit is contained in:
parent
ac2609594d
commit
16cd734b72
|
@ -417,7 +417,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(STsdb* tsdb, STsdbQueryCond* pCond,
|
|||
pReadHandle->currentLoadExternalRows = pCond->loadExternalRows;
|
||||
|
||||
char buf[128] = {0};
|
||||
snprintf(buf, tListLen(buf), "0x%"PRIx64" 0x%"PRIx64, taskId, qId);
|
||||
snprintf(buf, tListLen(buf), "TID:0x%"PRIx64" QID:0x%"PRIx64, taskId, qId);
|
||||
pReadHandle->idStr = strdup(buf);
|
||||
|
||||
if (tsdbInitReadH(&pReadHandle->rhelper, (STsdb*)tsdb) != 0) {
|
||||
|
|
|
@ -378,8 +378,10 @@ typedef struct SExchangeInfo {
|
|||
SSDataBlock *pResult;
|
||||
int32_t current;
|
||||
uint64_t rowsOfCurrentSource;
|
||||
uint64_t bytes; // total load bytes from remote
|
||||
uint64_t totalRows;
|
||||
|
||||
uint64_t totalSize; // total load bytes from remote
|
||||
uint64_t totalRows; // total number of rows
|
||||
uint64_t totalElapsed;// total elapsed time
|
||||
} SExchangeInfo;
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
|
|
|
@ -154,7 +154,7 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
}
|
||||
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
qDebug("%s it is already killed, abort", GET_TASKID(pTaskInfo));
|
||||
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -163,12 +163,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
publishQueryAbortEvent(pTaskInfo, ret);
|
||||
pTaskInfo->code = ret;
|
||||
qDebug("%s query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||
tstrerror(pTaskInfo->code));
|
||||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
qDebug("%s query task is launched", GET_TASKID(pTaskInfo));
|
||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||
|
||||
bool newgroup = false;
|
||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
|
@ -177,7 +177,9 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
st = taosGetTimestampUs();
|
||||
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
||||
|
||||
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
||||
uint64_t el = (taosGetTimestampUs() - st);
|
||||
pTaskInfo->cost.elapsedTime += el;
|
||||
|
||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (NULL == *pRes) {
|
||||
|
@ -187,8 +189,8 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
|||
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
|
||||
pTaskInfo->totalRows += current;
|
||||
|
||||
qDebug("%s task paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0);
|
||||
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
|
||||
|
||||
atomic_store_64(&pTaskInfo->owner, 0);
|
||||
return pTaskInfo->code;
|
||||
|
|
|
@ -5146,6 +5146,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
|
||||
size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources);
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
qDebug("%s all %"PRIzu" source(s) are exhausted, total rows:%"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5166,6 +5168,7 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
epSet.port[0] = pSource->addr.epAddr[0].port;
|
||||
tstrncpy(epSet.fqdn[0], pSource->addr.epAddr[0].fqdn, tListLen(epSet.fqdn[0]));
|
||||
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, epSet.fqdn[0], pSource->taskId, pExchangeInfo->current, totalSources);
|
||||
|
||||
|
@ -5202,6 +5205,11 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
pExchangeInfo->current += 1;
|
||||
|
||||
if (pExchangeInfo->current >= totalSources) {
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
|
||||
qDebug("%s all %"PRIzu" sources are exhausted, total rows: %"PRIu64" bytes:%"PRIu64", elapsed:%.2f ms", pTaskInfo->id.idstr, totalSources,
|
||||
pExchangeInfo->totalRows, pExchangeInfo->totalSize, pExchangeInfo->totalElapsed/1000.0);
|
||||
return NULL;
|
||||
} else {
|
||||
continue;
|
||||
|
@ -5228,21 +5236,24 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
|
|||
pRes->info.numOfCols = pOperator->numOfOutput;
|
||||
pRes->info.rows = pRsp->numOfRows;
|
||||
|
||||
int64_t el = taosGetTimestampUs() - startTs;
|
||||
|
||||
pExchangeInfo->totalRows += pRsp->numOfRows;
|
||||
pExchangeInfo->bytes += pRsp->compLen;
|
||||
pExchangeInfo->totalSize += pRsp->compLen;
|
||||
pExchangeInfo->rowsOfCurrentSource += pRsp->numOfRows;
|
||||
pExchangeInfo->totalElapsed += el;
|
||||
|
||||
if (pRsp->completed == 1) {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64
|
||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->bytes,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->rowsOfCurrentSource, pExchangeInfo->totalRows, pExchangeInfo->totalSize,
|
||||
pExchangeInfo->current + 1, totalSources);
|
||||
|
||||
pExchangeInfo->rowsOfCurrentSource = 0;
|
||||
pExchangeInfo->current += 1;
|
||||
} else {
|
||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->bytes);
|
||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pExchangeInfo->totalRows, pExchangeInfo->totalSize);
|
||||
}
|
||||
|
||||
return pExchangeInfo->pResult;
|
||||
|
@ -5289,7 +5300,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
|
|||
SRpcInit rpcInit;
|
||||
memset(&rpcInit, 0, sizeof(rpcInit));
|
||||
rpcInit.localPort = 0;
|
||||
rpcInit.label = "TSC";
|
||||
rpcInit.label = "EX";
|
||||
rpcInit.numOfThreads = 1;
|
||||
rpcInit.cfp = processRspMsg;
|
||||
rpcInit.sessions = tsMaxConnections;
|
||||
|
|
|
@ -467,11 +467,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
|||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||
|
||||
while (true) {
|
||||
QW_TASK_DLOG("start to execTask in executor, loopIdx:%d", i++);
|
||||
QW_TASK_DLOG("start to execTask, loopIdx:%d", i++);
|
||||
|
||||
code = qExecTask(*taskHandle, &pRes, &useconds);
|
||||
if (code) {
|
||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
||||
QW_TASK_ELOG("qExecTask failed, code:%s", tstrerror(code));
|
||||
QW_ERR_JRET(code);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue