formate qid

This commit is contained in:
yihaoDeng 2024-09-29 17:56:11 +08:00
parent f8b46586ed
commit 57f7145878
10 changed files with 123 additions and 108 deletions

View File

@ -23,12 +23,12 @@
#include "tglobal.h" #include "tglobal.h"
#include "tmsgtype.h" #include "tmsgtype.h"
#define RAW_NULL_CHECK(c) \ #define RAW_NULL_CHECK(c) \
do { \ do { \
if (c == NULL) { \ if (c == NULL) { \
code = terrno; \ code = terrno; \
goto end; \ goto end; \
} \ } \
} while (0) } while (0)
#define RAW_FALSE_CHECK(c) \ #define RAW_FALSE_CHECK(c) \
@ -47,7 +47,7 @@
} \ } \
} while (0) } while (0)
#define LOG_ID_TAG "connId:0x%" PRIx64 ",qid:0x%" PRIx64 #define LOG_ID_TAG "connId:0x%" PRIx64 ",QID:0x%" PRIx64
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
#define TMQ_META_VERSION "1.0" #define TMQ_META_VERSION "1.0"
@ -1188,7 +1188,7 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
pCreateReq->ctb.suid = pTableMeta->uid; pCreateReq->ctb.suid = pTableMeta->uid;
SArray* pTagVals = NULL; SArray* pTagVals = NULL;
code = tTagToValArray((STag *)pCreateReq->ctb.pTag, &pTagVals); code = tTagToValArray((STag*)pCreateReq->ctb.pTag, &pTagVals);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
goto end; goto end;
@ -1206,18 +1206,19 @@ static int32_t taosCreateTable(TAOS* taos, void* meta, int32_t metaLen) {
if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) { if (strcmp(tag->name, tName) == 0 && tag->type != TSDB_DATA_TYPE_JSON) {
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
if (pTagVal) { if (pTagVal) {
if (pTagVal->cid != tag->colId){ if (pTagVal->cid != tag->colId) {
pTagVal->cid = tag->colId; pTagVal->cid = tag->colId;
rebuildTag = true; rebuildTag = true;
} }
} else { } else {
uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name, (int)taosArrayGetSize(pTagVals), i, tag->colId); uError("create tb invalid data %s, size:%d index:%d cid:%d", pCreateReq->name,
(int)taosArrayGetSize(pTagVals), i, tag->colId);
} }
} }
} }
} }
taosMemoryFreeClear(pTableMeta); taosMemoryFreeClear(pTableMeta);
if (rebuildTag){ if (rebuildTag) {
STag* ppTag = NULL; STag* ppTag = NULL;
code = tTagNew(pTagVals, 1, false, &ppTag); code = tTagNew(pTagVals, 1, false, &ppTag);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
@ -1815,7 +1816,7 @@ end:
static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) { static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
// find schema data info // find schema data info
int32_t code = 0; int32_t code = 0;
SVCreateTbReq pCreateReq = {0}; SVCreateTbReq pCreateReq = {0};
SDecoder decoderTmp = {0}; SDecoder decoderTmp = {0};
@ -1826,15 +1827,16 @@ static int32_t buildCreateTbMap(STaosxRsp* rsp, SHashObj* pHashObj) {
RAW_NULL_CHECK(lenTmp); RAW_NULL_CHECK(lenTmp);
tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); tDecoderInit(&decoderTmp, *dataTmp, *lenTmp);
RAW_RETURN_CHECK (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq)); RAW_RETURN_CHECK(tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq));
if (pCreateReq.type != TSDB_CHILD_TABLE) { if (pCreateReq.type != TSDB_CHILD_TABLE) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto end; goto end;
} }
if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL){ if (taosHashGet(pHashObj, pCreateReq.name, strlen(pCreateReq.name)) == NULL) {
RAW_RETURN_CHECK(taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq))); RAW_RETURN_CHECK(
} else{ taosHashPut(pHashObj, pCreateReq.name, strlen(pCreateReq.name), &pCreateReq, sizeof(SVCreateTbReq)));
} else {
tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE); tDestroySVCreateTbReq(&pCreateReq, TSDB_MSG_FLG_DECODE);
pCreateReq = (SVCreateTbReq){0}; pCreateReq = (SVCreateTbReq){0};
} }
@ -1927,7 +1929,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
// find schema data info // find schema data info
SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); SVCreateTbReq* pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName));
SVgroupInfo vg = {0}; SVgroupInfo vg = {0};
RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg)); RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vg));
if (pCreateReqDst) { // change stable name to get meta if (pCreateReqDst) { // change stable name to get meta
(void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName);
@ -1957,10 +1959,10 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
fields[i].bytes = pSW->pSchema[i].bytes; fields[i].bytes = pSW->pSchema[i].bytes;
tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name));
} }
void* rawData = getRawDataFromRes(pRetrieve); void* rawData = getRawDataFromRes(pRetrieve);
char err[ERR_MSG_LEN] = {0}; char err[ERR_MSG_LEN] = {0};
SVCreateTbReq* pCreateReqTmp = NULL; SVCreateTbReq* pCreateReqTmp = NULL;
if (pCreateReqDst){ if (pCreateReqDst) {
RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp)); RAW_RETURN_CHECK(cloneSVreateTbReq(pCreateReqDst, &pCreateReqTmp));
} }
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN); code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqTmp, fields, pSW->nCols, true, err, ERR_MSG_LEN);

View File

@ -84,7 +84,6 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); code = qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
if (code) { if (code) {
return code; return code;
} }
} }
@ -363,7 +362,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
} }
pRspHead->vgId = htonl(req.upstreamNodeId); pRspHead->vgId = htonl(req.upstreamNodeId);
if(pRspHead->vgId == 0) { if (pRspHead->vgId == 0) {
tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId); tqError("vgId:%d invalid dispatch msg from upstream to task:0x%x", pMeta->vgId, req.taskId);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
@ -460,7 +459,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually if (code != TSDB_CODE_SUCCESS) { // return error not send rsp manually
tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId, tqError("s-task:0x%x vgId:%d failed to process retrieve request from 0x%x, code:%s", req.dstTaskId, req.dstNodeId,
req.srcTaskId, tstrerror(code)); req.srcTaskId, tstrerror(code));
} else { // send rsp manually only on success. } else { // send rsp manually only on success.
SRpcMsg rsp = {.info = pMsg->info, .code = 0}; SRpcMsg rsp = {.info = pMsg->info, .code = 0};
streamTaskSendRetrieveRsp(&req, &rsp); streamTaskSendRetrieveRsp(&req, &rsp);
} }
@ -515,7 +514,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqDebug("tq task:0x%x (vgId:%d) recv check rsp(qid:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId, tqDebug("tq task:0x%x (vgId:%d) recv check rsp(QID:0x%" PRIx64 ") from 0x%x (vgId:%d) status %d", rsp.upstreamTaskId,
rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status); rsp.upstreamNodeId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.status);
if (!isLeader) { if (!isLeader) {
@ -1272,7 +1271,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId < req.checkpointId) { if (pTask->chkInfo.checkpointId < req.checkpointId) {
tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%"PRId64, tqFatal("s-task:%s vgId:%d invalid consensus-checkpointId:%" PRId64 ", greater than existed checkpointId:%" PRId64,
pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId); pTask->id.idStr, vgId, req.checkpointId, pTask->chkInfo.checkpointId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);

View File

@ -72,7 +72,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname); ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -94,7 +94,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
ctx->pNames = param; ctx->pNames = param;
ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes)); ctx->pResList = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
if (NULL == ctx->pResList) { if (NULL == ctx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbMetaNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbMetaNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -105,7 +105,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -133,7 +133,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName); ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -161,7 +161,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName); ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -189,7 +189,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), dbFName); ctgTaskTypeStr(task.type), dbFName);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -223,7 +223,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tableName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname); ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -245,7 +245,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
ctx->pNames = param; ctx->pNames = param;
ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes)); ctx->pResList = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
if (NULL == ctx->pResList) { if (NULL == ctx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbHashNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbHashNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -256,7 +256,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -275,7 +275,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -293,7 +293,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -320,7 +320,7 @@ int32_t ctgInitGetIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, indexFName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name); ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -348,7 +348,7 @@ int32_t ctgInitGetUdfTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, udfName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name); ctgTaskTypeStr(task.type), name);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -376,7 +376,7 @@ int32_t ctgInitGetUserTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, user:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), user->user); ctgTaskTypeStr(task.type), user->user);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -394,7 +394,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -426,7 +426,7 @@ int32_t ctgInitGetTbIndexTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname); ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -459,7 +459,7 @@ int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname); ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -492,7 +492,7 @@ int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), name->tname); ctgTaskTypeStr(task.type), name->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -514,7 +514,7 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
ctx->pNames = param; ctx->pNames = param;
ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes)); ctx->pResList = taosArrayInit(pJob->viewNum, sizeof(SMetaRes));
if (NULL == ctx->pResList) { if (NULL == ctx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->viewNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -525,7 +525,7 @@ int32_t ctgInitGetViewsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
} }
qDebug("qid:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx, qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%lu, viewNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->viewNum); ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->viewNum);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -546,7 +546,7 @@ int32_t ctgInitGetTbTSMATask(SCtgJob* pJob, int32_t taskId, void* param) {
pTaskCtx->pNames = param; pTaskCtx->pNames = param;
pTaskCtx->pResList = taosArrayInit(pJob->tbTsmaNum, sizeof(SMetaRes)); pTaskCtx->pResList = taosArrayInit(pJob->tbTsmaNum, sizeof(SMetaRes));
if (NULL == pTaskCtx->pResList) { if (NULL == pTaskCtx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbTsmaNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbTsmaNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -574,7 +574,7 @@ int32_t ctgInitGetTSMATask(SCtgJob* pJob, int32_t taskId, void* param) {
pTaskCtx->pNames = param; pTaskCtx->pNames = param;
pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes)); pTaskCtx->pResList = taosArrayInit(pJob->tsmaNum, sizeof(SMetaRes));
if (NULL == pTaskCtx->pResList) { if (NULL == pTaskCtx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tsmaNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tsmaNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -603,7 +603,7 @@ static int32_t ctgInitGetTbNamesTask(SCtgJob* pJob, int32_t taskId, void* param)
pTaskCtx->pNames = param; pTaskCtx->pNames = param;
pTaskCtx->pResList = taosArrayInit(pJob->tbNameNum, sizeof(SMetaRes)); pTaskCtx->pResList = taosArrayInit(pJob->tbNameNum, sizeof(SMetaRes));
if (NULL == pTaskCtx->pResList) { if (NULL == pTaskCtx->pResList) {
qError("qid:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbNameNum, qError("QID:0x%" PRIx64 " taosArrayInit %d SMetaRes %d failed", pJob->queryId, pJob->tbNameNum,
(int32_t)sizeof(SMetaRes)); (int32_t)sizeof(SMetaRes));
ctgFreeTask(&task, true); ctgFreeTask(&task, true);
CTG_ERR_RET(terrno); CTG_ERR_RET(terrno);
@ -1048,7 +1048,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
} }
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
qDebug("qid:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms", qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d, elapsed time:%.2f ms",
pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate, el); pJob->queryId, pJob->refId, taskNum, pReq->forceUpdate, el);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1450,16 +1450,17 @@ _return:
int32_t ctgCallUserCb(void* param) { int32_t ctgCallUserCb(void* param) {
SCtgJob* pJob = (SCtgJob*)param; SCtgJob* pJob = (SCtgJob*)param;
qDebug("qid:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); qDebug("QID:0x%" PRIx64 " ctg start to call user cb with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
(*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode); (*pJob->userFp)(&pJob->jobRes, pJob->userParam, pJob->jobResCode);
qDebug("qid:0x%" PRIx64 " ctg end to call user cb", pJob->queryId); qDebug("QID:0x%" PRIx64 " ctg end to call user cb", pJob->queryId);
int64_t refId = pJob->refId; int64_t refId = pJob->refId;
int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId); int32_t code = taosRemoveRef(gCtgMgmt.jobPool, refId);
if (code) { if (code) {
qError("qid:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId, tstrerror(code)); qError("QID:0x%" PRIx64 " remove ctg job %" PRId64 " from jobPool failed, error:%s", pJob->queryId, refId,
tstrerror(code));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1469,7 +1470,7 @@ void ctgUpdateJobErrCode(SCtgJob* pJob, int32_t errCode) {
if (!NEED_CLIENT_REFRESH_VG_ERROR(errCode) || errCode == TSDB_CODE_SUCCESS) return; if (!NEED_CLIENT_REFRESH_VG_ERROR(errCode) || errCode == TSDB_CODE_SUCCESS) return;
atomic_store_32(&pJob->jobResCode, errCode); atomic_store_32(&pJob->jobResCode, errCode);
qDebug("qid:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode)); qDebug("QID:0x%" PRIx64 " ctg job errCode updated to %s", pJob->queryId, tstrerror(errCode));
return; return;
} }
@ -1481,7 +1482,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
qDebug("qid:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode)); qDebug("QID:0x%" PRIx64 " task %d end with res %s", pJob->queryId, pTask->taskId, tstrerror(rspCode));
pTask->code = rspCode; pTask->code = rspCode;
pTask->status = CTG_TASK_DONE; pTask->status = CTG_TASK_DONE;
@ -1490,7 +1491,7 @@ int32_t ctgHandleTaskEnd(SCtgTask* pTask, int32_t rspCode) {
int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1); int32_t taskDone = atomic_add_fetch_32(&pJob->taskDone, 1);
if (taskDone < taosArrayGetSize(pJob->pTasks)) { if (taskDone < taosArrayGetSize(pJob->pTasks)) {
qDebug("qid:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone, qDebug("QID:0x%" PRIx64 " task done: %d, total: %d", pJob->queryId, taskDone,
(int32_t)taosArrayGetSize(pJob->pTasks)); (int32_t)taosArrayGetSize(pJob->pTasks));
ctgUpdateJobErrCode(pJob, rspCode); ctgUpdateJobErrCode(pJob, rspCode);
@ -4347,7 +4348,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) {
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
qDebug("qid:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId); qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask)); CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
pTask = taosArrayGet(pJob->pTasks, i); pTask = taosArrayGet(pJob->pTasks, i);
@ -4360,7 +4361,7 @@ int32_t ctgLaunchJob(SCtgJob* pJob) {
} }
if (taskNum <= 0) { if (taskNum <= 0) {
qDebug("qid:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode)); qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
CTG_ERR_RET(taosAsyncExec(ctgCallUserCb, pJob, NULL)); CTG_ERR_RET(taosAsyncExec(ctgCallUserCb, pJob, NULL));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH

View File

@ -47,7 +47,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
msgNum = 0; msgNum = 0;
} }
ctgDebug("qid:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId, ctgDebug("QID:0x%" PRIx64 " ctg got batch %d rsp %s", pJob->queryId, cbParam->batchId,
TMSG_INFO(cbParam->reqType + 1)); TMSG_INFO(cbParam->reqType + 1));
SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); SHashObj* pBatchs = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
@ -114,7 +114,7 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
pMsgCtx->pBatchs = pBatchs; pMsgCtx->pBatchs = pBatchs;
ctgDebug("qid:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId, ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s, pBatchs: %p", pJob->queryId, pTask->taskId,
pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs); pRsp->msgIdx, TMSG_INFO(taskMsg.msgType + 1), pBatchs);
(void)(*gCtgAsyncFps[pTask->type].handleRspFp)( (void)(*gCtgAsyncFps[pTask->type].handleRspFp)(
@ -454,7 +454,7 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} }
qDebug("qid:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId, qDebug("QID:0x%" PRIx64 " ctg task %d start to handle rsp %s", pJob->queryId, pTask->taskId,
TMSG_INFO(cbParam->reqType + 1)); TMSG_INFO(cbParam->reqType + 1));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
@ -702,7 +702,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (TDMT_VND_TABLE_CFG == msgType) { if (TDMT_VND_TABLE_CFG == msgType) {
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
pName = ctx->pName; pName = ctx->pName;
} else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) { } else if (TDMT_VND_TABLE_META == msgType || TDMT_VND_TABLE_NAME == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
@ -808,7 +808,7 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
SCtgBatch* pBatch = (SCtgBatch*)p; SCtgBatch* pBatch = (SCtgBatch*)p;
int32_t msgSize = 0; int32_t msgSize = 0;
ctgDebug("qid:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg, &msgSize));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
@ -1124,7 +1124,8 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
int32_t code = tNameExtractFullName(name, tbFName); int32_t code = tNameExtractFullName(name, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type,
name->dbname, name->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1450,7 +1451,8 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t code = tNameExtractFullName(pTableName, tbFName); int32_t code = tNameExtractFullName(pTableName, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type,
pTableName->dbname, pTableName->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1523,7 +1525,8 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
int32_t code = tNameExtractFullName(pTableName, tbFName); int32_t code = tNameExtractFullName(pTableName, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type,
pTableName->dbname, pTableName->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1632,10 +1635,11 @@ int32_t ctgGetViewInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName*
int32_t reqType = TDMT_MND_VIEW_META; int32_t reqType = TDMT_MND_VIEW_META;
SCtgTask* pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char fullName[TSDB_TABLE_FNAME_LEN]; char fullName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(pName, fullName); int32_t code = tNameExtractFullName(pName, fullName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type, pName->dbname, pName->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pName->type,
pName->dbname, pName->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1693,10 +1697,11 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
int32_t msgLen = 0; int32_t msgLen = 0;
SCtgTask* pTask = tReq ? tReq->pTask : NULL; SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(name, tbFName); int32_t code = tNameExtractFullName(name, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type, name->dbname, name->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), name->type,
name->dbname, name->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1757,7 +1762,8 @@ int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, c
char tbFName[TSDB_TABLE_FNAME_LEN]; char tbFName[TSDB_TABLE_FNAME_LEN];
int32_t code = tNameExtractFullName(pTbName, tbFName); int32_t code = tNameExtractFullName(pTbName, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type, pTbName->dbname, pTbName->tname); ctgError("tNameExtractFullName failed, code:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTbName->type,
pTbName->dbname, pTbName->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }

View File

@ -452,7 +452,8 @@ void ctgClearHandleMeta(SCatalog* pCtg, int64_t* pClearedSize, int64_t* pCleardN
code = taosHashRemove(dbCache->tbCache, key, len); code = taosHashRemove(dbCache->tbCache, key, len);
if (code) { if (code) {
qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len, tstrerror(code)); qError("taosHashRemove table cache failed, key:%s, len:%d, error:%s", (char*)key, (int32_t)len,
tstrerror(code));
} }
cacheSize = cacheSize =
@ -1096,7 +1097,7 @@ void ctgFreeJob(void* job) {
taosMemoryFree(job); taosMemoryFree(job);
qDebug("qid:0x%" PRIx64 ", ctg job 0x%" PRIx64 " freed", qid, rid); qDebug("QID:0x%" PRIx64 ", ctg job 0x%" PRIx64 " freed", qid, rid);
} }
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target) { int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target) {
@ -1241,7 +1242,8 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* d
char tbFullName[TSDB_TABLE_FNAME_LEN]; char tbFullName[TSDB_TABLE_FNAME_LEN];
code = tNameExtractFullName(pTableName, tbFullName); code = tNameExtractFullName(pTableName, tbFullName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type, pTableName->dbname, pTableName->tname); ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), pTableName->type,
pTableName->dbname, pTableName->tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -1704,7 +1706,8 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
} }
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo* pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) { int32_t ctgUpdateSendTargetInfo(SMsgSendInfo* pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) {
if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META || msgType == TDMT_VND_TABLE_NAME) { if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META ||
msgType == TDMT_VND_TABLE_NAME) {
pMsgSendInfo->target.type = TARGET_TYPE_VNODE; pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
pMsgSendInfo->target.vgId = vgId; pMsgSendInfo->target.vgId = vgId;
pMsgSendInfo->target.dbFName = taosStrdup(dbFName); pMsgSendInfo->target.dbFName = taosStrdup(dbFName);
@ -2010,7 +2013,8 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
code = tNameExtractFullName(&req->pRawReq->tbName, tbFName); code = tNameExtractFullName(&req->pRawReq->tbName, tbFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code),
req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
@ -2201,7 +2205,8 @@ int32_t ctgChkSetViewAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res)
} else { } else {
code = tNameExtractFullName(&req->pRawReq->tbName, viewFName); code = tNameExtractFullName(&req->pRawReq->tbName, viewFName);
if (code) { if (code) {
ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code), req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname); ctgError("tNameExtractFullName failed, error:%s, type:%d, dbName:%s, tname:%s", tstrerror(code),
req->pRawReq->tbName.type, req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
CTG_ERR_RET(code); CTG_ERR_RET(code);
} }
} }

View File

@ -37,7 +37,7 @@ static int32_t dumpQueryPlan(SQueryPlan* pPlan) {
char* pStr = NULL; char* pStr = NULL;
code = nodesNodeToString((SNode*)pPlan, false, &pStr, NULL); code = nodesNodeToString((SNode*)pPlan, false, &pStr, NULL);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
planDebugL("qid:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr); planDebugL("QID:0x%" PRIx64 " Query Plan, JsonPlan: %s", pPlan->queryId, pStr);
taosMemoryFree(pStr); taosMemoryFree(pStr);
} }
return code; return code;
@ -123,7 +123,7 @@ int32_t qContinuePlanPostQuery(void* pPostPlan) {
} }
int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) {
planDebug("qid:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId); planDebug("QID:0x%" PRIx64 " set subplan execution node, groupId:%d", subplan->id.queryId, groupId);
return setSubplanExecutionNode(subplan->pNode, groupId, pSource); return setSubplanExecutionNode(subplan->pNode, groupId, pSource);
} }
@ -143,7 +143,7 @@ static void clearSubplanExecutionNode(SPhysiNode* pNode) {
} }
void qClearSubplanExecutionNode(SSubplan* pSubplan) { void qClearSubplanExecutionNode(SSubplan* pSubplan) {
planDebug("qid:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId); planDebug("QID:0x%" PRIx64 " clear subplan execution node, groupId:%d", pSubplan->id.queryId, pSubplan->id.groupId);
clearSubplanExecutionNode(pSubplan->pNode); clearSubplanExecutionNode(pSubplan->pNode);
} }

View File

@ -728,7 +728,7 @@ void schFreeJobImpl(void *job) {
uint64_t queryId = pJob->queryId; uint64_t queryId = pJob->queryId;
int64_t refId = pJob->refId; int64_t refId = pJob->refId;
qDebug("qid:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); qDebug("QID:0x%" PRIx64 " begin to free sch job, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
schDropJobAllTasks(pJob); schDropJobAllTasks(pJob);
@ -786,7 +786,7 @@ void schFreeJobImpl(void *job) {
schCloseJobRef(); schCloseJobRef();
} }
qDebug("qid:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob);
} }
int32_t schJobFetchRows(SSchJob *pJob) { int32_t schJobFetchRows(SSchJob *pJob) {
@ -821,7 +821,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
int64_t refId = -1; int64_t refId = -1;
SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob));
if (NULL == pJob) { if (NULL == pJob) {
qError("qid:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob)); qError("QID:0x%" PRIx64 " calloc %d failed", pReq->pDag->queryId, (int32_t)sizeof(SSchJob));
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
@ -831,7 +831,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
if (pReq->sql) { if (pReq->sql) {
pJob->sql = taosStrdup(pReq->sql); pJob->sql = taosStrdup(pReq->sql);
if (NULL == pJob->sql) { if (NULL == pJob->sql) {
qError("qid:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql); qError("QID:0x%" PRIx64 " strdup sql %s failed", pReq->pDag->queryId, pReq->sql);
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
} }
@ -839,7 +839,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
if (pReq->allocatorRefId > 0) { if (pReq->allocatorRefId > 0) {
pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId); pJob->allocatorRefId = nodesMakeAllocatorWeakRef(pReq->allocatorRefId);
if (pJob->allocatorRefId <= 0) { if (pJob->allocatorRefId <= 0) {
qError("qid:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId); qError("QID:0x%" PRIx64 " nodesMakeAllocatorWeakRef failed", pReq->pDag->queryId);
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
} }
@ -851,11 +851,11 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) {
pJob->pWorkerCb = pReq->pWorkerCb; pJob->pWorkerCb = pReq->pWorkerCb;
if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) {
qDebug("qid:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId);
} else { } else {
pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL); pJob->nodeList = taosArrayDup(pReq->pNodeList, NULL);
if (NULL == pJob->nodeList) { if (NULL == pJob->nodeList) {
qError("qid:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId, qError("QID:0x%" PRIx64 " taosArrayDup failed, origNum:%d", pReq->pDag->queryId,
(int32_t)taosArrayGetSize(pReq->pNodeList)); (int32_t)taosArrayGetSize(pReq->pNodeList));
SCH_ERR_JRET(terrno); SCH_ERR_JRET(terrno);
} }
@ -918,7 +918,7 @@ _return:
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
int32_t code = 0; int32_t code = 0;
qDebug("qid:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId); qDebug("QID:0x%" PRIx64 " sch job refId 0x%" PRIx64 " started", pReq->pDag->queryId, pJob->refId);
SCH_ERR_RET(schLaunchJob(pJob)); SCH_ERR_RET(schLaunchJob(pJob));
@ -926,7 +926,7 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) {
SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob));
code = tsem_wait(&pJob->rspSem); code = tsem_wait(&pJob->rspSem);
if (code) { if (code) {
qError("qid:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code)); qError("QID:0x%" PRIx64 " tsem_wait sync rspSem failed, error:%s", pReq->pDag->queryId, tstrerror(code));
SCH_ERR_RET(code); SCH_ERR_RET(code);
} }
} }
@ -1191,7 +1191,7 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_
(void)schAcquireJob(rId, &pJob); (void)schAcquireJob(rId, &pJob);
if (NULL == pJob) { if (NULL == pJob) {
qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId); qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
} }

View File

@ -500,7 +500,7 @@ _return:
int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code); code);
// called if drop task rsp received code // called if drop task rsp received code
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error (void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
@ -513,7 +513,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) { int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param; SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x", pParam->queryId, pParam->taskId,
code); code);
if (pMsg) { if (pMsg) {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);

View File

@ -996,7 +996,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
int32_t code = 0; int32_t code = 0;
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
pStatus->execId, jobTaskStatusStr(pStatus->status)); pStatus->execId, jobTaskStatusStr(pStatus->status));
if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) { if (schProcessOnCbBegin(&pJob, &pTask, pStatus->queryId, pStatus->refId, pStatus->taskId)) {
@ -1043,12 +1043,12 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
continue; continue;
} }
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId); qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
pJob = NULL; pJob = NULL;
(void)schAcquireJob(localRsp->rId, &pJob); (void)schAcquireJob(localRsp->rId, &pJob);
if (NULL == pJob) { if (NULL == pJob) {
qWarn("qid:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId, qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
localRsp->tId, localRsp->rId); localRsp->tId, localRsp->rId);
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST); SCH_ERR_JRET(TSDB_CODE_QRY_JOB_NOT_EXIST);
} }
@ -1068,7 +1068,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
(void)schReleaseJob(pJob->refId); (void)schReleaseJob(pJob->refId);
qDebug("qid:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId, qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
localRsp->tId, code); localRsp->tId, code);
SCH_ERR_JRET(code); SCH_ERR_JRET(code);

View File

@ -108,7 +108,7 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
&pTask->outputInfo.fixedDispatcher.epSet); &pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask); streamTaskStartMonitorCheckRsp(pTask);
@ -171,14 +171,14 @@ void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SS
streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage);
SStreamTaskState pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(qid:0x%" PRIx64 stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(QID:0x%" PRIx64
") task:0x%x (vgId:%d), check_status:%d", ") task:0x%x (vgId:%d), check_status:%d",
pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pTask->id.idStr, pState.name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId,
pRsp->status); pRsp->status);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
} else { } else {
pRsp->status = TASK_DOWNSTREAM_NOT_READY; pRsp->status = TASK_DOWNSTREAM_NOT_READY;
stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(qid:0x%" PRIx64 stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(QID:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp check_status %d", ") from task:0x%x (vgId:%d), rsp check_status %d",
pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status);
} }
@ -259,7 +259,8 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
void* buf = rpcMallocCont(sizeof(SMsgHead) + len); void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
if (buf == NULL) { if (buf == NULL) {
stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__, tstrerror(code)); stError("s-task:0x%x vgId:%d failed prepare msg, %s at line:%d code:%s", taskId, pMeta->vgId, __func__, __LINE__,
tstrerror(code));
return terrno; return terrno;
} }
@ -332,7 +333,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processDownstreamReadyRsp(SStreamTask* pTask) { void processDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST; EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL); int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
if (code) { if (code) {
stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
} }
@ -354,7 +355,7 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr, stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus)); pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
if (code != 0) { // todo: handle error if (code != 0) { // todo: handle error
stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
} }
} }
@ -373,8 +374,9 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { int32_t addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
int32_t code = 0;; int32_t code = 0;
bool existed = false; ;
bool existed = false;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);