add debug info
This commit is contained in:
parent
963d56cfc1
commit
c84f9f8634
|
@ -346,7 +346,7 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
|
||||||
pCtx->inRedirect = true;
|
pCtx->inRedirect = true;
|
||||||
pCtx->periodMs = tsRedirectPeriod;
|
pCtx->periodMs = tsRedirectPeriod;
|
||||||
pCtx->startTs = taosGetTimestampMs();
|
pCtx->startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
if (pEpSet) {
|
if (pEpSet) {
|
||||||
pCtx->roundTotal = pEpSet->numOfEps;
|
pCtx->roundTotal = pEpSet->numOfEps;
|
||||||
|
@ -360,7 +360,7 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
|
||||||
|
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->totalTimes++;
|
pCtx->totalTimes++;
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
|
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
|
||||||
|
@ -378,8 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
|
||||||
int64_t nowTs = taosGetTimestampMs();
|
int64_t nowTs = taosGetTimestampMs();
|
||||||
int64_t lastTime = nowTs - pCtx->startTs;
|
int64_t lastTime = nowTs - pCtx->startTs;
|
||||||
if (lastTime > tsMaxRetryWaitTime) {
|
if (lastTime > tsMaxRetryWaitTime) {
|
||||||
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
||||||
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
||||||
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
|
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -398,8 +398,9 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet)
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal, pCtx->totalTimes, pTask->delayExecMs);
|
SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal,
|
||||||
|
pCtx->totalTimes, pTask->delayExecMs);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -415,7 +416,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL));
|
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL));
|
||||||
|
|
||||||
pTask->waitRetry = true;
|
pTask->waitRetry = true;
|
||||||
|
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
schRemoveTaskFromExecList(pJob, pTask);
|
schRemoveTaskFromExecList(pJob, pTask);
|
||||||
|
@ -435,8 +436,9 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
||||||
} else {
|
} else {
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
|
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
|
||||||
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port);
|
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse,
|
||||||
|
addr->epSet.numOfEps, pEp->fqdn, pEp->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
|
@ -1146,7 +1148,7 @@ void schHandleTimerEvent(void *param, void *tmrId) {
|
||||||
SSchTask *pTask = NULL;
|
SSchTask *pTask = NULL;
|
||||||
SSchJob *pJob = NULL;
|
SSchJob *pJob = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) {
|
if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1160,15 +1162,15 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
if (pTask->delayExecMs > 0) {
|
if (pTask->delayExecMs > 0) {
|
||||||
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
|
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
|
||||||
if (NULL == param) {
|
if (NULL == param) {
|
||||||
SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam));
|
SCH_TASK_ELOG("taosMemoryMalloc %d failed", (int)sizeof(SSchTimerParam));
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
param->rId = pJob->refId;
|
param->rId = pJob->refId;
|
||||||
param->queryId = pJob->queryId;
|
param->queryId = pJob->queryId;
|
||||||
param->taskId = pTask->taskId;
|
param->taskId = pTask->taskId;
|
||||||
|
|
||||||
if (NULL == pTask->delayTimer) {
|
if (NULL == pTask->delayTimer) {
|
||||||
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
|
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
|
||||||
if (NULL == pTask->delayTimer) {
|
if (NULL == pTask->delayTimer) {
|
||||||
SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
|
SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
|
||||||
|
@ -1178,7 +1180,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void*)param, schMgmt.timer, &pTask->delayTimer);
|
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1035,12 +1035,14 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
|
||||||
addr.sin_family = AF_INET;
|
addr.sin_family = AF_INET;
|
||||||
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
|
addr.sin_addr.s_addr = cliGetIpFromFqdnCache(pThrd->fqdn2ipCache, conn->ip);
|
||||||
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
addr.sin_port = (uint16_t)htons((uint16_t)conn->port);
|
||||||
tTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
|
||||||
|
STraceId* trace = &(pMsg->msg.info.traceId);
|
||||||
|
tGTrace("%s conn %p try to connect to %s:%d", pTransInst->label, conn, conn->ip, conn->port);
|
||||||
|
|
||||||
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
|
tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
|
||||||
uv_err_name(ret));
|
uv_err_name(ret));
|
||||||
|
|
||||||
uv_timer_stop(conn->timer);
|
uv_timer_stop(conn->timer);
|
||||||
conn->timer->data = NULL;
|
conn->timer->data = NULL;
|
||||||
|
|
|
@ -1233,17 +1233,17 @@ int transReleaseSrvHandle(void* handle) {
|
||||||
m->msg = tmsg;
|
m->msg = tmsg;
|
||||||
m->type = Release;
|
m->type = Release;
|
||||||
|
|
||||||
tTrace("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
tDebug("%s conn %p start to release", transLabel(pThrd->pTransInst), exh->handle);
|
||||||
transAsyncSend(pThrd->asyncPool, &m->q);
|
transAsyncSend(pThrd->asyncPool, &m->q);
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
_return1:
|
_return1:
|
||||||
tTrace("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return -1;
|
||||||
_return2:
|
_return2:
|
||||||
tTrace("handle %p failed to send to release handle", exh);
|
tDebug("handle %p failed to send to release handle", exh);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
int transSendResponse(const STransMsg* msg) {
|
int transSendResponse(const STransMsg* msg) {
|
||||||
|
@ -1268,19 +1268,19 @@ int transSendResponse(const STransMsg* msg) {
|
||||||
m->type = Normal;
|
m->type = Normal;
|
||||||
|
|
||||||
STraceId* trace = (STraceId*)&msg->info.traceId;
|
STraceId* trace = (STraceId*)&msg->info.traceId;
|
||||||
tGTrace("conn %p start to send resp (1/2)", exh->handle);
|
tGDebug("conn %p start to send resp (1/2)", exh->handle);
|
||||||
transAsyncSend(pThrd->asyncPool, &m->q);
|
transAsyncSend(pThrd->asyncPool, &m->q);
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_return1:
|
_return1:
|
||||||
tTrace("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return -1;
|
||||||
_return2:
|
_return2:
|
||||||
tTrace("handle %p failed to send resp", exh);
|
tDebug("handle %p failed to send resp", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1302,19 +1302,19 @@ int transRegisterMsg(const STransMsg* msg) {
|
||||||
m->type = Register;
|
m->type = Register;
|
||||||
|
|
||||||
STrans* pTransInst = pThrd->pTransInst;
|
STrans* pTransInst = pThrd->pTransInst;
|
||||||
tTrace("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
tDebug("%s conn %p start to register brokenlink callback", transLabel(pTransInst), exh->handle);
|
||||||
transAsyncSend(pThrd->asyncPool, &m->q);
|
transAsyncSend(pThrd->asyncPool, &m->q);
|
||||||
|
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_return1:
|
_return1:
|
||||||
tTrace("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
transReleaseExHandle(transGetRefMgt(), refId);
|
transReleaseExHandle(transGetRefMgt(), refId);
|
||||||
return -1;
|
return -1;
|
||||||
_return2:
|
_return2:
|
||||||
tTrace("handle %p failed to register brokenlink", exh);
|
tDebug("handle %p failed to register brokenlink", exh);
|
||||||
rpcFreeCont(msg->pCont);
|
rpcFreeCont(msg->pCont);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue