enh: adjust log format
This commit is contained in:
parent
765fcba2f5
commit
e97b5cf4c1
|
@ -94,7 +94,7 @@ static int32_t registerRequest(SRequestObj *pRequest, STscObj *pTscObj) {
|
||||||
int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
|
int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
|
||||||
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
|
||||||
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
|
||||||
", current:%d, app current:%d, total:%d,QID:0x%" PRIx64,
|
", current:%d, app current:%d, total:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -255,7 +255,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
int32_t reqType = SLOW_LOG_TYPE_OTHERS;
|
int32_t reqType = SLOW_LOG_TYPE_OTHERS;
|
||||||
|
|
||||||
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
|
||||||
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ",QID:0x%" PRIx64
|
tscDebug("0x%" PRIx64 " free Request from connObj:0x%" PRIx64 ", QID:0x%" PRIx64
|
||||||
" elapsed:%.2f ms, "
|
" elapsed:%.2f ms, "
|
||||||
"current:%d, app current:%d",
|
"current:%d, app current:%d",
|
||||||
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000.0, num, currentInst);
|
||||||
|
@ -299,7 +299,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
||||||
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
|
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
|
||||||
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
||||||
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
|
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
|
||||||
taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
|
taosPrintSlowLog("PID:%d, Conn:%u, QID:0x%" PRIx64 ", Start:%" PRId64 "us, Duration:%" PRId64 "us, SQL:%s",
|
||||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
||||||
pRequest->sqlstr);
|
pRequest->sqlstr);
|
||||||
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
|
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
|
||||||
|
|
|
@ -245,7 +245,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
||||||
int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
|
int32_t err = taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
|
||||||
sizeof((*pRequest)->self));
|
sizeof((*pRequest)->self));
|
||||||
if (err) {
|
if (err) {
|
||||||
tscError("%" PRId64 " failed to add to request container,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
|
tscError("%" PRId64 " failed to add to request container, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s",
|
||||||
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
|
(*pRequest)->self, (*pRequest)->requestId, pTscObj->id, sql);
|
||||||
destroyRequest(*pRequest);
|
destroyRequest(*pRequest);
|
||||||
*pRequest = NULL;
|
*pRequest = NULL;
|
||||||
|
@ -256,7 +256,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
||||||
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
|
if (tsQueryUseNodeAllocator && !qIsInsertValuesSql((*pRequest)->sqlstr, (*pRequest)->sqlLen)) {
|
||||||
if (TSDB_CODE_SUCCESS !=
|
if (TSDB_CODE_SUCCESS !=
|
||||||
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
|
nodesCreateAllocator((*pRequest)->requestId, tsQueryNodeChunkSize, &((*pRequest)->allocatorRefId))) {
|
||||||
tscError("%" PRId64 " failed to create node allocator,QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
|
tscError("%" PRId64 " failed to create node allocator, QID:0x%" PRIx64 ", conn:%" PRId64 ", %s", (*pRequest)->self,
|
||||||
(*pRequest)->requestId, pTscObj->id, sql);
|
(*pRequest)->requestId, pTscObj->id, sql);
|
||||||
destroyRequest(*pRequest);
|
destroyRequest(*pRequest);
|
||||||
*pRequest = NULL;
|
*pRequest = NULL;
|
||||||
|
@ -264,7 +264,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebugL("0x%" PRIx64 " SQL: %s,QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
|
tscDebugL("0x%" PRIx64 " SQL:%s, QID:0x%" PRIx64, (*pRequest)->self, (*pRequest)->sqlstr, (*pRequest)->requestId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -381,10 +381,10 @@ void asyncExecLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
|
||||||
|
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
@ -1030,7 +1030,7 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
tscError("0x%" PRIx64 ", invalid exec result for request type %d,QID:0x%" PRIx64, pRequest->self, pRequest->type,
|
tscError("0x%" PRIx64 ", invalid exec result for request type %d, QID:0x%" PRIx64, pRequest->self, pRequest->type,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
code = TSDB_CODE_APP_ERROR;
|
code = TSDB_CODE_APP_ERROR;
|
||||||
}
|
}
|
||||||
|
@ -1075,7 +1075,7 @@ void returnToUser(SRequestObj* pRequest) {
|
||||||
(void)releaseRequest(pRequest->relation.userRefId);
|
(void)releaseRequest(pRequest->relation.userRefId);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 ", user ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->relation.userRefId, pRequest->requestId);
|
pRequest->relation.userRefId, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1146,7 +1146,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
||||||
SSDataBlock* pBlock = NULL;
|
SSDataBlock* pBlock = NULL;
|
||||||
pRequest->code = createResultBlock(res, rowNum, &pBlock);
|
pRequest->code = createResultBlock(res, rowNum, &pBlock);
|
||||||
if (TSDB_CODE_SUCCESS != pRequest->code) {
|
if (TSDB_CODE_SUCCESS != pRequest->code) {
|
||||||
tscError("0x%" PRIx64 ", create result block failed,QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
|
tscError("0x%" PRIx64 ", create result block failed, QID:0x%" PRIx64 " %s", pRequest->self, pRequest->requestId,
|
||||||
tstrerror(pRequest->code));
|
tstrerror(pRequest->code));
|
||||||
returnToUser(pRequest);
|
returnToUser(pRequest);
|
||||||
return;
|
return;
|
||||||
|
@ -1157,7 +1157,7 @@ void postSubQueryFetchCb(void* param, TAOS_RES* res, int32_t rowNum) {
|
||||||
continuePostSubQuery(pNextReq, pBlock);
|
continuePostSubQuery(pNextReq, pBlock);
|
||||||
(void)releaseRequest(pRequest->relation.nextRefId);
|
(void)releaseRequest(pRequest->relation.nextRefId);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->relation.nextRefId, pRequest->requestId);
|
pRequest->relation.nextRefId, pRequest->requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1176,7 +1176,7 @@ void handlePostSubQuery(SSqlCallbackWrapper* pWrapper) {
|
||||||
continuePostSubQuery(pNextReq, NULL);
|
continuePostSubQuery(pNextReq, NULL);
|
||||||
(void)releaseRequest(pRequest->relation.nextRefId);
|
(void)releaseRequest(pRequest->relation.nextRefId);
|
||||||
} else {
|
} else {
|
||||||
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there,QID:0x%" PRIx64, pRequest->self,
|
tscError("0x%" PRIx64 ", next req ref 0x%" PRIx64 " is not there, QID:0x%" PRIx64, pRequest->self,
|
||||||
pRequest->relation.nextRefId, pRequest->requestId);
|
pRequest->relation.nextRefId, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1208,14 +1208,14 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pResult);
|
taosMemoryFree(pResult);
|
||||||
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscDebug("0x%" PRIx64 " enter scheduler exec cb, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d,QID:0x%" PRIx64, pRequest->self,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%s, tryCount:%d, QID:0x%" PRIx64, pRequest->self,
|
||||||
tstrerror(code), pRequest->retry, pRequest->requestId);
|
tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
|
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
|
||||||
tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
tscError("0x%" PRIx64 " remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||||
}
|
}
|
||||||
restartAsyncQuery(pRequest, code);
|
restartAsyncQuery(pRequest, code);
|
||||||
return;
|
return;
|
||||||
|
@ -1224,7 +1224,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) {
|
||||||
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
|
tscDebug("schedulerExecCb request type %s", TMSG_INFO(pRequest->type));
|
||||||
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
||||||
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
|
if (TSDB_CODE_SUCCESS != removeMeta(pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type))) {
|
||||||
tscError("0x%" PRIx64 " remove meta failed,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
tscError("0x%" PRIx64 " remove meta failed, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1322,7 +1322,7 @@ void launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, bool keepQuery, void
|
||||||
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
if (NEED_CLIENT_RM_TBLMETA_REQ(pRequest->type) && NULL == pRequest->body.resInfo.execRes.res) {
|
||||||
int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
int ret = removeMeta(pRequest->pTscObj, pRequest->targetTableList, IS_VIEW_REQUEST(pRequest->type));
|
||||||
if (TSDB_CODE_SUCCESS != ret) {
|
if (TSDB_CODE_SUCCESS != ret) {
|
||||||
tscError("0x%" PRIx64 " remove meta failed,code:%d,QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
|
tscError("0x%" PRIx64 " remove meta failed,code:%d, QID:0x%" PRIx64, pRequest->self, ret, pRequest->requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1659,7 +1659,7 @@ int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __ta
|
||||||
*pTscObj = NULL;
|
*pTscObj = NULL;
|
||||||
return terrno;
|
return terrno;
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p,QID:0x%" PRIx64, (*pTscObj)->id,
|
tscDebug("0x%" PRIx64 " connection is opening, connId:%u, dnodeConn:%p, QID:0x%" PRIx64, (*pTscObj)->id,
|
||||||
(*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
|
(*pTscObj)->connId, (*pTscObj)->pAppInfo->pTransporter, pRequest->requestId);
|
||||||
destroyRequest(pRequest);
|
destroyRequest(pRequest);
|
||||||
}
|
}
|
||||||
|
@ -1789,7 +1789,7 @@ int32_t doProcessMsgFromServerImpl(SRpcMsg* pMsg, SEpSet* pEpSet) {
|
||||||
char tbuf[40] = {0};
|
char tbuf[40] = {0};
|
||||||
TRACE_TO_STR(trace, tbuf);
|
TRACE_TO_STR(trace, tbuf);
|
||||||
|
|
||||||
tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s,QID:%s", pMsg->info.handle,
|
tscDebug("processMsgFromServer handle %p, message: %s, size:%d, code: %s, QID:%s", pMsg->info.handle,
|
||||||
TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
|
TMSG_INFO(pMsg->msgType), pMsg->contLen, tstrerror(pMsg->code), tbuf);
|
||||||
|
|
||||||
if (pSendInfo->requestObjRefId != 0) {
|
if (pSendInfo->requestObjRefId != 0) {
|
||||||
|
@ -2000,7 +2000,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
pRequest->self, pResInfo->numOfRows, pResInfo->totalRows, pResInfo->completed, pRequest->requestId);
|
||||||
|
|
||||||
STscObj* pTscObj = pRequest->pTscObj;
|
STscObj* pTscObj = pRequest->pTscObj;
|
||||||
|
@ -3024,7 +3024,7 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||||
|
|
||||||
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
|
||||||
|
|
||||||
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
tscDebug("0x%" PRIx64 " enter scheduler fetch cb, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
|
||||||
pResultInfo->pData = pResult;
|
pResultInfo->pData = pResult;
|
||||||
|
@ -3047,10 +3047,10 @@ static void fetchCallback(void* pResult, void* param, int32_t code) {
|
||||||
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
|
setQueryResultFromRsp(pResultInfo, (const SRetrieveTableRsp*)pResultInfo->pData, pResultInfo->convertUcs4);
|
||||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||||
pResultInfo->numOfRows = 0;
|
pResultInfo->numOfRows = 0;
|
||||||
tscError("0x%" PRIx64 " fetch results failed, code:%s,QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
|
tscError("0x%" PRIx64 " fetch results failed, code:%s, QID:0x%" PRIx64, pRequest->self, tstrerror(pRequest->code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
} else {
|
} else {
|
||||||
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " fetch results, numOfRows:%" PRId64 " total Rows:%" PRId64 ", complete:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
pRequest->self, pResultInfo->numOfRows, pResultInfo->totalRows, pResultInfo->completed,
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
|
|
||||||
|
|
|
@ -1157,7 +1157,7 @@ static void doAsyncQueryFromAnalyse(SMetaData *pResultMeta, void *param, int32_t
|
||||||
SRequestObj *pRequest = pWrapper->pRequest;
|
SRequestObj *pRequest = pWrapper->pRequest;
|
||||||
SQuery *pQuery = pRequest->pQuery;
|
SQuery *pQuery = pRequest->pQuery;
|
||||||
|
|
||||||
qDebug("0x%" PRIx64 " start to semantic analysis,QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
qDebug("0x%" PRIx64 " start to semantic analysis, QID:0x%" PRIx64, pRequest->self, pRequest->requestId);
|
||||||
|
|
||||||
int64_t analyseStart = taosGetTimestampUs();
|
int64_t analyseStart = taosGetTimestampUs();
|
||||||
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
|
pRequest->metric.ctgCostUs = analyseStart - pRequest->metric.ctgStart;
|
||||||
|
@ -1276,14 +1276,14 @@ void handleQueryAnslyseRes(SSqlCallbackWrapper *pWrapper, SMetaData *pResultMeta
|
||||||
pRequest->pQuery = NULL;
|
pRequest->pQuery = NULL;
|
||||||
|
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
restartAsyncQuery(pRequest, code);
|
restartAsyncQuery(pRequest, code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// return to app directly
|
// return to app directly
|
||||||
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app,QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
tscError("0x%" PRIx64 " error occurs, code:%s, return to user app, QID:0x%" PRIx64, pRequest->self, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
pRequest->code = code;
|
pRequest->code = code;
|
||||||
returnToUser(pRequest);
|
returnToUser(pRequest);
|
||||||
|
@ -1333,7 +1333,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
||||||
SQuery *pQuery = pRequest->pQuery;
|
SQuery *pQuery = pRequest->pQuery;
|
||||||
|
|
||||||
pRequest->metric.ctgCostUs += taosGetTimestampUs() - pRequest->metric.ctgStart;
|
pRequest->metric.ctgCostUs += taosGetTimestampUs() - pRequest->metric.ctgStart;
|
||||||
qDebug("0x%" PRIx64 " start to continue parse,QID:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
|
qDebug("0x%" PRIx64 " start to continue parse, QID:0x%" PRIx64 ", code:%s", pRequest->self, pRequest->requestId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1346,7 +1346,7 @@ static void doAsyncQueryFromParse(SMetaData *pResultMeta, void *param, int32_t c
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscError("0x%" PRIx64 " error happens, code:%d - %s,QID:0x%" PRIx64, pWrapper->pRequest->self, code,
|
tscError("0x%" PRIx64 " error happens, code:%d - %s, QID:0x%" PRIx64, pWrapper->pRequest->self, code,
|
||||||
tstrerror(code), pWrapper->pRequest->requestId);
|
tstrerror(code), pWrapper->pRequest->requestId);
|
||||||
destorySqlCallbackWrapper(pWrapper);
|
destorySqlCallbackWrapper(pWrapper);
|
||||||
pRequest->pWrapper = NULL;
|
pRequest->pWrapper = NULL;
|
||||||
|
@ -1363,7 +1363,7 @@ void continueInsertFromCsv(SSqlCallbackWrapper *pWrapper, SRequestObj *pRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscError("0x%" PRIx64 " error happens, code:%d - %s,QID:0x%" PRIx64, pWrapper->pRequest->self, code,
|
tscError("0x%" PRIx64 " error happens, code:%d - %s, QID:0x%" PRIx64, pWrapper->pRequest->self, code,
|
||||||
tstrerror(code), pWrapper->pRequest->requestId);
|
tstrerror(code), pWrapper->pRequest->requestId);
|
||||||
destorySqlCallbackWrapper(pWrapper);
|
destorySqlCallbackWrapper(pWrapper);
|
||||||
pRequest->pWrapper = NULL;
|
pRequest->pWrapper = NULL;
|
||||||
|
@ -1484,7 +1484,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != code) {
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscError("0x%" PRIx64 " error happens, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
tscError("0x%" PRIx64 " error happens, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
destorySqlCallbackWrapper(pWrapper);
|
destorySqlCallbackWrapper(pWrapper);
|
||||||
pRequest->pWrapper = NULL;
|
pRequest->pWrapper = NULL;
|
||||||
|
@ -1492,11 +1492,11 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
pRequest->pQuery = NULL;
|
pRequest->pQuery = NULL;
|
||||||
|
|
||||||
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
if (NEED_CLIENT_HANDLE_ERROR(code)) {
|
||||||
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d,QID:0x%" PRIx64,
|
tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, QID:0x%" PRIx64,
|
||||||
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId);
|
||||||
code = refreshMeta(pRequest->pTscObj, pRequest);
|
code = refreshMeta(pRequest->pTscObj, pRequest);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s,QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
tscWarn("0x%" PRIx64 " refresh meta failed, code:%d - %s, QID:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
}
|
}
|
||||||
pRequest->prevCode = code;
|
pRequest->prevCode = code;
|
||||||
|
|
|
@ -47,7 +47,7 @@
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define LOG_ID_TAG "connId:0x%" PRIx64 ",QID:0x%" PRIx64
|
#define LOG_ID_TAG "connId:0x%" PRIx64 ", QID:0x%" PRIx64
|
||||||
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
|
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
|
||||||
|
|
||||||
#define TMQ_META_VERSION "1.0"
|
#define TMQ_META_VERSION "1.0"
|
||||||
|
|
|
@ -1474,7 +1474,7 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
||||||
pParam = NULL;
|
pParam = NULL;
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode, QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
|
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
@ -2102,7 +2102,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
|
||||||
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
|
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s), QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
|
||||||
|
|
||||||
pRspWrapper->tmqRspType = rspType;
|
pRspWrapper->tmqRspType = rspType;
|
||||||
pRspWrapper->pollRsp.reqId = requestId;
|
pRspWrapper->pollRsp.reqId = requestId;
|
||||||
|
@ -2123,7 +2123,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosFreeQitem(pRspWrapper);
|
taosFreeQitem(pRspWrapper);
|
||||||
tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
|
tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code);
|
||||||
} else {
|
} else {
|
||||||
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d,QID:0x%" PRIx64,
|
tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d(%s), vgId:%d, total in queue:%d, QID:0x%" PRIx64,
|
||||||
tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
|
tmq ? tmq->consumerId : 0, rspType, tmqMsgTypeStr[rspType], vgId, taosQueueItemSize(tmq->mqueue), requestId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2297,7 +2297,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
||||||
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
|
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
|
||||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
|
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
|
||||||
tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
|
tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, QID:0x%" PRIx64, pTmq->consumerId,
|
||||||
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
||||||
TSDB_CHECK_CODE(code, lino, END);
|
TSDB_CHECK_CODE(code, lino, END);
|
||||||
|
|
||||||
|
@ -2523,7 +2523,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &pollRspWrapper->rspOffset);
|
||||||
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
if (pollRspWrapper->dataRsp.blockNum == 0) {
|
||||||
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64
|
||||||
", total:%" PRId64 ",QID:0x%" PRIx64,
|
", total:%" PRId64 ", QID:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId);
|
||||||
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
pVg->emptyBlockReceiveTs = taosGetTimestampMs();
|
||||||
} else {
|
} else {
|
||||||
|
@ -2551,7 +2551,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
|
||||||
", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64,
|
", vg total:%" PRId64 ", total:%" PRId64 ", QID:0x%" PRIx64,
|
||||||
tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
tmq->consumerId, pVg->vgId, buf, pRspObj->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows,
|
||||||
pollRspWrapper->reqId);
|
pollRspWrapper->reqId);
|
||||||
}
|
}
|
||||||
|
@ -3551,7 +3551,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
||||||
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
|
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset);
|
||||||
|
|
||||||
tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId,
|
tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, QID:0x%" PRIx64, tmq->consumerId,
|
||||||
pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
|
||||||
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
|
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, NULL, sendInfo);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -83,12 +83,12 @@ extern "C" {
|
||||||
}\
|
}\
|
||||||
}
|
}
|
||||||
|
|
||||||
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGFatal(param, ...) {if (dDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dFatal(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGError(param, ...) {if (dDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dError(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGWarn(param, ...) {if (dDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dWarn(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGInfo(param, ...) {if (dDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dInfo(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGDebug(param, ...) {if (dDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dDebug(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define dGTrace(param, ...) {if (dDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); dTrace(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -42,12 +42,12 @@ extern "C" {
|
||||||
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND DEBUG ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
#define mDebug(...) { if (mDebugFlag & DEBUG_DEBUG) { taosPrintLog("MND DEBUG ", DEBUG_DEBUG, mDebugFlag, __VA_ARGS__); }}
|
||||||
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND TRACE ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
#define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND TRACE ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }}
|
||||||
|
|
||||||
#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGFatal(param, ...) { if (mDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mFatal(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGError(param, ...) { if (mDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mError(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGWarn(param, ...) { if (mDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mWarn (param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGInfo(param, ...) { if (mDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); mInfo (param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGDebug(param, ...) { if (mDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mDebug(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ",QID:%s", __VA_ARGS__, buf);}}
|
#define mGTrace(param, ...) { if (mDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); mTrace(param ", QID:%s", __VA_ARGS__, buf);}}
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
|
||||||
|
|
|
@ -32,12 +32,12 @@ extern "C" {
|
||||||
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND DEBUG ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND DEBUG ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND TRACE ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND TRACE ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
#define vGTrace(param, ...) do { if (vDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGTrace(param, ...) do { if (vDebugFlag & DEBUG_TRACE) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vTrace(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define vGFatal(param, ...) do { if (vDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGFatal(param, ...) do { if (vDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vFatal(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define vGError(param, ...) do { if (vDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGError(param, ...) do { if (vDebugFlag & DEBUG_ERROR) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vError(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define vGWarn(param, ...) do { if (vDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGWarn(param, ...) do { if (vDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vWarn(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define vGInfo(param, ...) do { if (vDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGInfo(param, ...) do { if (vDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vInfo(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define vGDebug(param, ...) do { if (vDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define vGDebug(param, ...) do { if (vDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); vDebug(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -202,7 +202,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
||||||
dataRsp.blockNum = 0;
|
dataRsp.blockNum = 0;
|
||||||
char buf[TSDB_OFFSET_LEN] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
|
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.reqOffset);
|
||||||
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s,QID:0x%" PRIx64, req.consumerId, vgId, buf,
|
tqInfo("tqPushEmptyDataRsp to consumer:0x%" PRIx64 " vgId:%d, offset:%s, QID:0x%" PRIx64, req.consumerId, vgId, buf,
|
||||||
req.reqId);
|
req.reqId);
|
||||||
|
|
||||||
code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
code = tqSendDataRsp(pHandle, pHandle->msg, &req, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||||
|
@ -225,7 +225,7 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq*
|
||||||
(void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
|
(void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset));
|
||||||
(void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
|
(void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset));
|
||||||
|
|
||||||
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64,
|
tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s, QID:0x%" PRIx64,
|
||||||
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId);
|
||||||
|
|
||||||
return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
|
return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever);
|
||||||
|
@ -477,7 +477,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
char buf[TSDB_OFFSET_LEN] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
|
(void)tFormatOffset(buf, TSDB_OFFSET_LEN, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s,QID:0x%" PRIx64,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, QID:0x%" PRIx64,
|
||||||
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
|
|
|
@ -222,12 +222,12 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
|
||||||
while (offset <= appliedVer) {
|
while (offset <= appliedVer) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset) < 0) {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64
|
||||||
", no more log to return,QID:0x%" PRIx64 " 0x%" PRIx64,
|
", no more log to return, QID:0x%" PRIx64 " 0x%" PRIx64,
|
||||||
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
|
pHandle->consumerId, pHandle->epoch, vgId, offset, reqId, id);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s,QID:0x%" PRIx64 " 0x%" PRIx64,
|
tqDebug("vgId:%d, consumer:0x%" PRIx64 " taosx get msg ver %" PRId64 ", type: %s, QID:0x%" PRIx64 " 0x%" PRIx64,
|
||||||
vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
|
vgId, pHandle->consumerId, offset, TMSG_INFO(pHandle->pWalReader->pHead->head.msgType), reqId, id);
|
||||||
|
|
||||||
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
if (pHandle->pWalReader->pHead->head.msgType == TDMT_VND_SUBMIT) {
|
||||||
|
|
|
@ -190,10 +190,10 @@ end:
|
||||||
char buf[TSDB_OFFSET_LEN] = {0};
|
char buf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
|
tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset);
|
||||||
if (code != 0){
|
if (code != 0){
|
||||||
tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " error msg:%s, line:%d",
|
tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " error msg:%s, line:%d",
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " success",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s, QID:0x%" PRIx64 " success",
|
||||||
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -461,7 +461,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueue
|
// enqueue
|
||||||
tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
tqDebug("s-task:%s (vgId:%d level:%d) recv retrieve req from task:0x%x(vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
|
||||||
pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
|
pTask->pMeta->vgId, pTask->info.taskLevel, req.srcTaskId, req.srcNodeId, req.reqId);
|
||||||
|
|
||||||
// if task is in ck status, set current ck failed
|
// if task is in ck status, set current ck failed
|
||||||
|
|
|
@ -917,7 +917,7 @@ int32_t catalogGetHandle(int64_t clusterId, SCatalog** catalogHandle) {
|
||||||
if (ctg && (*ctg)) {
|
if (ctg && (*ctg)) {
|
||||||
*catalogHandle = *ctg;
|
*catalogHandle = *ctg;
|
||||||
CTG_STAT_HIT_INC(CTG_CI_CLUSTER, 1);
|
CTG_STAT_HIT_INC(CTG_CI_CLUSTER, 1);
|
||||||
qDebug("got catalog handle from cache, clusterId:0x%" PRIx64 ", CTG:%p", clusterId, *ctg);
|
qDebug("CTG:%p, get catalog handle from cache, clusterId:0x%" PRIx64, *ctg, clusterId);
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -873,7 +873,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
|
|
||||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||||
if (NULL == *job) {
|
if (NULL == *job) {
|
||||||
ctgError("failed to calloc, size:%d,QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
|
ctgError("failed to calloc, size:%d, QID:0x%" PRIx64, (int32_t)sizeof(SCtgJob), pConn->requestId);
|
||||||
CTG_ERR_RET(terrno);
|
CTG_ERR_RET(terrno);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -280,14 +280,14 @@ int32_t ctgAcquireTbMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha
|
||||||
|
|
||||||
CTG_LOCK(CTG_READ, &pCache->metaLock);
|
CTG_LOCK(CTG_READ, &pCache->metaLock);
|
||||||
if (NULL == pCache->pMeta) {
|
if (NULL == pCache->pMeta) {
|
||||||
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
|
ctgDebug("tb:%s meta not in cache, dbFName:%s", tbName, dbFName);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pDb = dbCache;
|
*pDb = dbCache;
|
||||||
*pTb = pCache;
|
*pTb = pCache;
|
||||||
|
|
||||||
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
|
ctgDebug("tb:%s meta get in cache, dbFName:%s", tbName, dbFName);
|
||||||
|
|
||||||
CTG_META_HIT_INC(pCache->pMeta->tableType);
|
CTG_META_HIT_INC(pCache->pMeta->tableType);
|
||||||
|
|
||||||
|
@ -338,14 +338,14 @@ int32_t ctgAcquireVgMetaFromCache(SCatalog *pCtg, const char *dbFName, const cha
|
||||||
|
|
||||||
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
||||||
if (NULL == tbCache->pMeta) {
|
if (NULL == tbCache->pMeta) {
|
||||||
ctgDebug("tb %s meta not in cache, dbFName:%s", tbName, dbFName);
|
ctgDebug("tb:%s meta not in cache, dbFName:%s", tbName, dbFName);
|
||||||
CTG_META_NHIT_INC();
|
CTG_META_NHIT_INC();
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
*pTb = tbCache;
|
*pTb = tbCache;
|
||||||
|
|
||||||
ctgDebug("tb %s meta got in cache, dbFName:%s", tbName, dbFName);
|
ctgDebug("tb:%s meta get in cache, dbFName:%s", tbName, dbFName);
|
||||||
|
|
||||||
CTG_META_HIT_INC(tbCache->pMeta->tableType);
|
CTG_META_HIT_INC(tbCache->pMeta->tableType);
|
||||||
|
|
||||||
|
@ -557,7 +557,7 @@ int32_t ctgCopyTbMeta(SCatalog *pCtg, SCtgTbMetaCtx *ctx, SCtgDBCache **pDb, SCt
|
||||||
(*pTableMeta)->schemaExt = NULL;
|
(*pTableMeta)->schemaExt = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgDebug("get tb %s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
|
ctgDebug("get tb:%s meta from cache, type:%d, dbFName:%s", ctx->pName->tname, tbMeta->tableType, dbFName);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -632,7 +632,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
|
||||||
|
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
|
|
||||||
ctgDebug("get tb %s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
|
ctgDebug("get tb:%s meta from cache, dbFName:%s", ctx->pName->tname, dbFName);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -669,7 +669,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
|
||||||
*sver = tbMeta->sversion;
|
*sver = tbMeta->sversion;
|
||||||
*tver = tbMeta->tversion;
|
*tver = tbMeta->tversion;
|
||||||
|
|
||||||
ctgDebug("get tb %s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
|
ctgDebug("get tb:%s ver from cache, dbFName:%s, tbType:%d, sver:%d, tver:%d, suid:0x%" PRIx64, pTableName->tname,
|
||||||
dbFName, *tbType, *sver, *tver, *suid);
|
dbFName, *tbType, *sver, *tver, *suid);
|
||||||
|
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
|
@ -711,7 +711,7 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
|
||||||
|
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
|
|
||||||
ctgDebug("get tb %s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
|
ctgDebug("get tb:%s sver %d tver %d from cache, type:%d, dbFName:%s", pTableName->tname, *sver, *tver, *tbType,
|
||||||
dbFName);
|
dbFName);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -729,7 +729,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog *pCtg, char *dbFName, char *tbName, int3
|
||||||
*tbType = tbCache->pMeta->tableType;
|
*tbType = tbCache->pMeta->tableType;
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
|
|
||||||
ctgDebug("get tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);
|
ctgDebug("get tb:%s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -3517,7 +3517,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
||||||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||||
taosHashRelease(dbCache->tbCache, pCache);
|
taosHashRelease(dbCache->tbCache, pCache);
|
||||||
|
|
||||||
ctgDebug("get tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
ctgDebug("get tb:%s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
||||||
|
|
||||||
res.pRes = pTableMeta;
|
res.pRes = pTableMeta;
|
||||||
if (NULL == taosArrayPush(ctx->pResList, &res)) {
|
if (NULL == taosArrayPush(ctx->pResList, &res)) {
|
||||||
|
@ -3542,7 +3542,7 @@ int32_t ctgGetTbMetasFromCache(SCatalog *pCtg, SRequestConnInfo *pConn, SCtgTbMe
|
||||||
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
CTG_UNLOCK(CTG_READ, &pCache->metaLock);
|
||||||
taosHashRelease(dbCache->tbCache, pCache);
|
taosHashRelease(dbCache->tbCache, pCache);
|
||||||
|
|
||||||
ctgDebug("get tb %s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
ctgDebug("get tb:%s meta from cache, type:%d, dbFName:%s", pName->tname, pTableMeta->tableType, dbFName);
|
||||||
|
|
||||||
res.pRes = pTableMeta;
|
res.pRes = pTableMeta;
|
||||||
if (NULL == taosArrayPush(ctx->pResList, &res)) {
|
if (NULL == taosArrayPush(ctx->pResList, &res)) {
|
||||||
|
|
|
@ -562,7 +562,7 @@ int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob,
|
||||||
CTG_ERR_JRET(code);
|
CTG_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
ctgDebug("ctg req msg sent,QID:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
|
ctgDebug("ctg req msg sent, QID:0x%" PRIx64 ", msg type:%d, %s", pJob->queryId, msgType, TMSG_INFO(msgType));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
|
@ -1293,7 +1293,7 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d
|
||||||
|
|
||||||
*pVgroup = *vgInfo;
|
*pVgroup = *vgInfo;
|
||||||
|
|
||||||
ctgDebug("get tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
|
ctgDebug("get tb:%s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
|
||||||
vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
|
vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
|
||||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
||||||
|
|
||||||
|
@ -1437,7 +1437,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SEpSet* pMgmgEpSet, SCtgTaskR
|
||||||
|
|
||||||
*pNewVg = *vgInfo;
|
*pNewVg = *vgInfo;
|
||||||
|
|
||||||
ctgDebug("get tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
|
ctgDebug("get tb:%s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
|
||||||
vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
|
vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
|
||||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
||||||
|
|
||||||
|
@ -1496,7 +1496,7 @@ int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFNam
|
||||||
|
|
||||||
vgId[i] = vgInfo->vgId;
|
vgId[i] = vgInfo->vgId;
|
||||||
|
|
||||||
ctgDebug("get tb %s vgId:%d", tbFullName, vgInfo->vgId);
|
ctgDebug("get tb:%s vgId:%d", tbFullName, vgInfo->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
|
|
|
@ -20,12 +20,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define nodesFatal(...) qFatal("NODES: " __VA_ARGS__)
|
#define nodesFatal(...) qFatal("NODES:" __VA_ARGS__)
|
||||||
#define nodesError(...) qError("NODES: " __VA_ARGS__)
|
#define nodesError(...) qError("NODES:" __VA_ARGS__)
|
||||||
#define nodesWarn(...) qWarn("NODES: " __VA_ARGS__)
|
#define nodesWarn(...) qWarn ("NODES:" __VA_ARGS__)
|
||||||
#define nodesInfo(...) qInfo("NODES: " __VA_ARGS__)
|
#define nodesInfo(...) qInfo ("NODES:" __VA_ARGS__)
|
||||||
#define nodesDebug(...) qDebug("NODES: " __VA_ARGS__)
|
#define nodesDebug(...) qDebug("NODES:" __VA_ARGS__)
|
||||||
#define nodesTrace(...) qTrace("NODES: " __VA_ARGS__)
|
#define nodesTrace(...) qTrace("NODES:" __VA_ARGS__)
|
||||||
|
|
||||||
#define NODES_ERR_RET(c) \
|
#define NODES_ERR_RET(c) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -26,12 +26,12 @@ extern "C" {
|
||||||
#include "parToken.h"
|
#include "parToken.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
|
||||||
#define parserFatal(param, ...) qFatal("PARSER: " param, ##__VA_ARGS__)
|
#define parserFatal(param, ...) qFatal("PARSER:" param, ##__VA_ARGS__)
|
||||||
#define parserError(param, ...) qError("PARSER: " param, ##__VA_ARGS__)
|
#define parserError(param, ...) qError("PARSER:" param, ##__VA_ARGS__)
|
||||||
#define parserWarn(param, ...) qWarn("PARSER: " param, ##__VA_ARGS__)
|
#define parserWarn(param, ...) qWarn ("PARSER:" param, ##__VA_ARGS__)
|
||||||
#define parserInfo(param, ...) qInfo("PARSER: " param, ##__VA_ARGS__)
|
#define parserInfo(param, ...) qInfo ("PARSER:" param, ##__VA_ARGS__)
|
||||||
#define parserDebug(param, ...) qDebug("PARSER: " param, ##__VA_ARGS__)
|
#define parserDebug(param, ...) qDebug("PARSER:" param, ##__VA_ARGS__)
|
||||||
#define parserTrace(param, ...) qTrace("PARSER: " param, ##__VA_ARGS__)
|
#define parserTrace(param, ...) qTrace("PARSER:" param, ##__VA_ARGS__)
|
||||||
|
|
||||||
#define ROWTS_PSEUDO_COLUMN_NAME "_rowts"
|
#define ROWTS_PSEUDO_COLUMN_NAME "_rowts"
|
||||||
#define C0_PSEUDO_COLUMN_NAME "_c0"
|
#define C0_PSEUDO_COLUMN_NAME "_c0"
|
||||||
|
|
|
@ -3194,14 +3194,14 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpSt
|
||||||
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
||||||
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pQuery->pRoot;
|
||||||
if (pCxt->missCache) {
|
if (pCxt->missCache) {
|
||||||
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables will insert before cache miss", pCxt->pComCxt->requestId,
|
||||||
pStmt->totalRowsNum, pStmt->totalTbNum);
|
pStmt->totalRowsNum, pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
||||||
return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
|
return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables will insert", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
|
||||||
pStmt->totalTbNum);
|
pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
||||||
|
|
|
@ -414,29 +414,29 @@ extern SQueryMgmt gQueryMgmt;
|
||||||
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__)
|
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_TASK_ELOG(param, ...) \
|
#define QW_TASK_ELOG(param, ...) \
|
||||||
qError("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
qError("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_WLOG(param, ...) \
|
#define QW_TASK_WLOG(param, ...) \
|
||||||
qWarn("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
qWarn("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_DLOG(param, ...) \
|
#define QW_TASK_DLOG(param, ...) \
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
qDebug("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_TASK_DLOGL(param, ...) \
|
#define QW_TASK_DLOGL(param, ...) \
|
||||||
qDebugL("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
qDebugL("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_TASK_ELOG_E(param) \
|
#define QW_TASK_ELOG_E(param) \
|
||||||
qError("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
|
qError("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId)
|
||||||
#define QW_TASK_WLOG_E(param) \
|
#define QW_TASK_WLOG_E(param) \
|
||||||
qWarn("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
|
qWarn("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId)
|
||||||
#define QW_TASK_DLOG_E(param) \
|
#define QW_TASK_DLOG_E(param) \
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
|
qDebug("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, qId, sId, cId, tId, eId)
|
||||||
|
|
||||||
#define QW_SCH_TASK_ELOG(param, ...) \
|
#define QW_SCH_TASK_ELOG(param, ...) \
|
||||||
qError("QW:%p SID:%" PRId64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
|
qError("QW:%p, SID:%" PRId64 ", QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, mgmt, sId, \
|
||||||
qId, cId, tId, eId, __VA_ARGS__)
|
qId, cId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_SCH_TASK_WLOG(param, ...) \
|
#define QW_SCH_TASK_WLOG(param, ...) \
|
||||||
qWarn("QW:%p SID:%" PRId64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
|
qWarn("QW:%p, SID:%" PRId64 ", QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, mgmt, sId, qId, \
|
||||||
cId, tId, eId, __VA_ARGS__)
|
cId, tId, eId, __VA_ARGS__)
|
||||||
#define QW_SCH_TASK_DLOG(param, ...) \
|
#define QW_SCH_TASK_DLOG(param, ...) \
|
||||||
qDebug("QW:%p SID:%" PRId64 ",QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
|
qDebug("QW:%p, SID:%" PRId64 ", QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, mgmt, sId, \
|
||||||
qId, cId, tId, eId, __VA_ARGS__)
|
qId, cId, tId, eId, __VA_ARGS__)
|
||||||
|
|
||||||
#define QW_LOCK_DEBUG(...) \
|
#define QW_LOCK_DEBUG(...) \
|
||||||
|
|
|
@ -192,7 +192,7 @@ void qwDbgDumpJobsInfo(void) {
|
||||||
int32_t sessionIdx = 0;
|
int32_t sessionIdx = 0;
|
||||||
SQWSessionInfo* pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, NULL);
|
SQWSessionInfo* pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, NULL);
|
||||||
while (NULL != pSession) {
|
while (NULL != pSession) {
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d the %dth remain session",
|
qDebug("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d the %dth remain session",
|
||||||
pSession->qId, pSession->sId, pSession->cId, pSession->tId, pSession->eId, sessionIdx++);
|
pSession->qId, pSession->sId, pSession->cId, pSession->tId, pSession->eId, sessionIdx++);
|
||||||
|
|
||||||
pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, pSession);
|
pSession = (SQWSessionInfo*)taosHashIterate(pJob->pSessions, pSession);
|
||||||
|
|
|
@ -462,23 +462,23 @@ extern SSchedulerMgmt schMgmt;
|
||||||
(_task)->profile.endTs = us; \
|
(_task)->profile.endTs = us; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
|
#define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 ", SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
|
||||||
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
|
#define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 ", SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
|
||||||
|
|
||||||
#define SCH_TASK_ELOG(param, ...) \
|
#define SCH_TASK_ELOG(param, ...) \
|
||||||
qError("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
qError("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
||||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||||
#define SCH_TASK_DLOG(param, ...) \
|
#define SCH_TASK_DLOG(param, ...) \
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
qDebug("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
||||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||||
#define SCH_TASK_TLOG(param, ...) \
|
#define SCH_TASK_TLOG(param, ...) \
|
||||||
qTrace("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
qTrace("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
||||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||||
#define SCH_TASK_DLOGL(param, ...) \
|
#define SCH_TASK_DLOGL(param, ...) \
|
||||||
qDebugL("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
qDebugL("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
||||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||||
#define SCH_TASK_WLOG(param, ...) \
|
#define SCH_TASK_WLOG(param, ...) \
|
||||||
qWarn("QID:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
qWarn("QID:0x%" PRIx64 ", SID:%" PRId64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
|
||||||
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
|
||||||
|
|
||||||
#define SCH_SET_ERRNO(_err) \
|
#define SCH_SET_ERRNO(_err) \
|
||||||
|
|
|
@ -1246,7 +1246,7 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_
|
||||||
|
|
||||||
(void)schAcquireJob(rId, &pJob);
|
(void)schAcquireJob(rId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
|
qWarn("QID:0x%" PRIx64 ", TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -511,7 +511,7 @@ _return:
|
||||||
|
|
||||||
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x",
|
qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " drop task rsp received, code:0x%x",
|
||||||
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
|
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
|
||||||
// called if drop task rsp received code
|
// called if drop task rsp received code
|
||||||
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
|
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT, 0); // ignore error
|
||||||
|
@ -528,7 +528,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
|
|
||||||
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||||
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x",
|
qDebug("QID:0x%" PRIx64 ", SID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " task notify rsp received, code:0x%x",
|
||||||
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
|
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
|
||||||
if (pMsg) {
|
if (pMsg) {
|
||||||
taosMemoryFreeClear(pMsg->pData);
|
taosMemoryFreeClear(pMsg->pData);
|
||||||
|
|
|
@ -1014,7 +1014,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId,
|
qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", EID:%d task status in server: %s", pStatus->queryId,
|
||||||
pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
|
pStatus->clientId, pStatus->taskId, pStatus->execId, jobTaskStatusStr(pStatus->status));
|
||||||
|
|
||||||
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
|
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
|
||||||
|
@ -1061,13 +1061,13 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
|
qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg",
|
||||||
localRsp->qId, localRsp->cId, localRsp->tId);
|
localRsp->qId, localRsp->cId, localRsp->tId);
|
||||||
|
|
||||||
pJob = NULL;
|
pJob = NULL;
|
||||||
(void)schAcquireJob(localRsp->rId, &pJob);
|
(void)schAcquireJob(localRsp->rId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64,
|
qWarn("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 " job no exist, may be dropped, refId:0x%" PRIx64,
|
||||||
localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
|
localRsp->qId, localRsp->cId, localRsp->tId, localRsp->rId);
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
||||||
}
|
}
|
||||||
|
@ -1087,7 +1087,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||||
|
|
||||||
(void)schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
|
qDebug("QID:0x%" PRIx64 ", CID:0x%" PRIx64 ", TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x",
|
||||||
localRsp->qId, localRsp->cId, localRsp->tId, code);
|
localRsp->qId, localRsp->cId, localRsp->tId, code);
|
||||||
|
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
|
|
|
@ -471,7 +471,7 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&pInfo->checkInfoLock);
|
streamMutexUnlock(&pInfo->checkInfoLock);
|
||||||
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x,QID:%" PRIx64 " discarded", id, taskId,
|
stError("s-task:%s unexpected check rsp msg, invalid downstream task:0x%x, QID:%" PRIx64 " discarded", id, taskId,
|
||||||
reqId);
|
reqId);
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,7 +134,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
||||||
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
|
stError("s-task:%s (child %d) failed to send retrieve req to task:0x%x (vgId:%d) QID:0x%" PRIx64 " code:%s",
|
||||||
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
|
pTask->id.idStr, pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId, tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d),QID:0x%" PRIx64, pTask->id.idStr,
|
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), QID:0x%" PRIx64, pTask->id.idStr,
|
||||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,12 +32,12 @@ extern "C" {
|
||||||
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }}
|
#define tTrace(...) { if (rpcDebugFlag & DEBUG_TRACE) { taosPrintLog("RPC TRACE ", DEBUG_TRACE, rpcDebugFlag, __VA_ARGS__); }}
|
||||||
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } }
|
#define tDump(x, y) { if (rpcDebugFlag & DEBUG_DUMP) { taosDumpData((unsigned char *)x, y); } }
|
||||||
|
|
||||||
#define tGTrace(param, ...) do { if (rpcDebugFlag & DEBUG_TRACE){char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ",QID:%s", __VA_ARGS__, buf);}} while(0)
|
#define tGTrace(param, ...) do { if (rpcDebugFlag & DEBUG_TRACE){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tTrace(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define tGFatal(param, ...) do {if (rpcDebugFlag & DEBUG_FATAL){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tFatal(param ",QID:%s", __VA_ARGS__, buf); }} while (0)
|
#define tGFatal(param, ...) do {if (rpcDebugFlag & DEBUG_FATAL) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tFatal(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define tGError(param, ...) do { if (rpcDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tError(param ",QID:%s", __VA_ARGS__, buf);} } while(0)
|
#define tGError(param, ...) do { if (rpcDebugFlag & DEBUG_ERROR){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tError(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define tGWarn(param, ...) do { if (rpcDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tWarn(param ",QID:%s", __VA_ARGS__, buf); }} while(0)
|
#define tGWarn(param, ...) do { if (rpcDebugFlag & DEBUG_WARN) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tWarn(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define tGInfo(param, ...) do { if (rpcDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tInfo(param ",QID:%s", __VA_ARGS__, buf); }} while(0)
|
#define tGInfo(param, ...) do { if (rpcDebugFlag & DEBUG_INFO) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tInfo(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
#define tGDebug(param,...) do {if (rpcDebugFlag & DEBUG_DEBUG){ char buf[40] = {0}; TRACE_TO_STR(trace, buf); tDebug(param ",QID:%s", __VA_ARGS__, buf); }} while(0)
|
#define tGDebug(param,...) do {if (rpcDebugFlag & DEBUG_DEBUG) { char buf[40] = {0}; TRACE_TO_STR(trace, buf); tDebug(param ", QID:%s", __VA_ARGS__, buf);}} while(0)
|
||||||
|
|
||||||
|
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
|
@ -3765,7 +3765,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
||||||
|
|
||||||
if (pConn->heap != NULL) {
|
if (pConn->heap != NULL) {
|
||||||
p = pConn->heap;
|
p = pConn->heap;
|
||||||
tTrace("conn %p add to heap cache for key:%s,status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
|
tTrace("conn %p add to heap cache for key:%s, status:%d, refCnt:%d, add direct", pConn, pConn->dstAddr,
|
||||||
pConn->inHeap, pConn->reqRefCnt);
|
pConn->inHeap, pConn->reqRefCnt);
|
||||||
} else {
|
} else {
|
||||||
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
|
code = getOrCreateHeap(pConnHeapCacahe, pConn->dstAddr, &p);
|
||||||
|
@ -3781,7 +3781,7 @@ static int32_t addConnToHeapCache(SHashObj* pConnHeapCacahe, SCliConn* pConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
code = transHeapInsert(p, pConn);
|
code = transHeapInsert(p, pConn);
|
||||||
tTrace("conn %p add to heap cache for key:%s,status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
|
tTrace("conn %p add to heap cache for key:%s, status:%d, refCnt:%d", pConn, pConn->dstAddr, pConn->inHeap,
|
||||||
pConn->reqRefCnt);
|
pConn->reqRefCnt);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue