This commit is contained in:
Hongze Cheng 2024-08-15 16:35:37 +08:00
parent 6170cc1799
commit 7cd086c8f9
3 changed files with 140 additions and 138 deletions

View File

@ -1441,7 +1441,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
while ((code = syncAskEp(tmq)) != 0) { while ((code = syncAskEp(tmq)) != 0) {
if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { if (retryCnt++ > MAX_RETRY_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s",
tmq->consumerId, strerror(code)); tmq->consumerId, tstrerror(code));
if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) {
code = 0; code = 0;
} }

View File

@ -70,24 +70,24 @@
} while (0) } while (0)
static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *pMonitorParas) { static int32_t tSerializeSMonitorParas(SEncoder *encoder, const SMonitorParas *pMonitorParas) {
if (tEncodeI8(encoder, pMonitorParas->tsEnableMonitor) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(encoder, pMonitorParas->tsEnableMonitor));
if (tEncodeI32(encoder, pMonitorParas->tsMonitorInterval) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsMonitorInterval));
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogScope) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogScope));
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogMaxLen));
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThreshold));
if (tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(encoder, pMonitorParas->tsSlowLogThresholdTest));
if (tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStr(encoder, pMonitorParas->tsSlowLogExceptDb));
return 0; return 0;
} }
static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMonitorParas) { static int32_t tDeserializeSMonitorParas(SDecoder *decoder, SMonitorParas *pMonitorParas) {
if (tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(decoder, (int8_t *)&pMonitorParas->tsEnableMonitor));
if (tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsMonitorInterval));
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogScope));
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogMaxLen));
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThreshold));
if (tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(decoder, &pMonitorParas->tsSlowLogThresholdTest));
if (tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(decoder, pMonitorParas->tsSlowLogExceptDb));
return 0; return 0;
} }
@ -98,8 +98,7 @@ static int32_t tDecodeTableTSMAInfoRsp(SDecoder *pDecoder, STableTSMAInfoRsp *pR
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
} }
pIter->totalLen = htonl(pMsg->length); pIter->totalLen = htonl(pMsg->length);
@ -108,8 +107,7 @@ int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
pIter->len = 0; pIter->len = 0;
pIter->pMsg = pMsg; pIter->pMsg = pMsg;
if (pIter->totalLen <= sizeof(SSubmitReq)) { if (pIter->totalLen <= sizeof(SSubmitReq)) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
} }
return 0; return 0;
@ -130,9 +128,8 @@ int32_t tGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
} }
if (pIter->len > pIter->totalLen) { if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL; *pPBlock = NULL;
return -1; return terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
} }
if (pIter->len == pIter->totalLen) { if (pIter->len == pIter->totalLen) {
@ -193,46 +190,46 @@ int32_t tPrintFixedSchemaSubmitReq(SSubmitReq *pReq, STSchema *pTschema) {
#endif #endif
int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) { int32_t tEncodeSEpSet(SEncoder *pEncoder, const SEpSet *pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->inUse));
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pEp->numOfEps));
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU16(pEncoder, pEp->eps[i].port));
if (tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStrWithLen(pEncoder, pEp->eps[i].fqdn, TSDB_FQDN_LEN));
} }
return 0; return 0;
} }
int32_t tDecodeSEpSet(SDecoder *pDecoder, SEpSet *pEp) { int32_t tDecodeSEpSet(SDecoder *pDecoder, SEpSet *pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pEp->inUse));
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pEp->numOfEps));
for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) { for (int32_t i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU16(pDecoder, &pEp->eps[i].port));
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn));
} }
return 0; return 0;
} }
int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) { int32_t tEncodeSQueryNodeAddr(SEncoder *pEncoder, SQueryNodeAddr *pAddr) {
if (tEncodeI32(pEncoder, pAddr->nodeId) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pAddr->nodeId));
if (tEncodeSEpSet(pEncoder, &pAddr->epSet) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pAddr->epSet));
return 0; return 0;
} }
int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) { int32_t tEncodeSQueryNodeLoad(SEncoder *pEncoder, SQueryNodeLoad *pLoad) {
if (tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSQueryNodeAddr(pEncoder, &pLoad->addr));
if (tEncodeU64(pEncoder, pLoad->load) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pLoad->load));
return 0; return 0;
} }
int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) { int32_t tDecodeSQueryNodeAddr(SDecoder *pDecoder, SQueryNodeAddr *pAddr) {
if (tDecodeI32(pDecoder, &pAddr->nodeId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pAddr->nodeId));
if (tDecodeSEpSet(pDecoder, &pAddr->epSet) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pAddr->epSet));
return 0; return 0;
} }
int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) { int32_t tDecodeSQueryNodeLoad(SDecoder *pDecoder, SQueryNodeLoad *pLoad) {
if (tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSQueryNodeAddr(pDecoder, &pLoad->addr));
if (tDecodeU64(pDecoder, &pLoad->load) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pLoad->load));
return 0; return 0;
} }
@ -258,63 +255,63 @@ void *taosDecodeSEpSet(const void *buf, SEpSet *pEp) {
} }
static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) { static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pReq) {
if (tEncodeSClientHbKey(pEncoder, &pReq->connKey) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pReq->connKey));
if (pReq->connKey.connType == CONN_TYPE__QUERY) { if (pReq->connKey.connType == CONN_TYPE__QUERY) {
if (tEncodeI64(pEncoder, pReq->app.appId) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->app.appId));
if (tEncodeI32(pEncoder, pReq->app.pid) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->app.pid));
if (tEncodeCStr(pEncoder, pReq->app.name) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pReq->app.name));
if (tEncodeI64(pEncoder, pReq->app.startTime) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pReq->app.startTime));
if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertsReq) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfInsertsReq));
if (tEncodeU64(pEncoder, pReq->app.summary.numOfInsertRows) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfInsertRows));
if (tEncodeU64(pEncoder, pReq->app.summary.insertElapsedTime) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.insertElapsedTime));
if (tEncodeU64(pEncoder, pReq->app.summary.insertBytes) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.insertBytes));
if (tEncodeU64(pEncoder, pReq->app.summary.fetchBytes) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.fetchBytes));
if (tEncodeU64(pEncoder, pReq->app.summary.queryElapsedTime) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.queryElapsedTime));
if (tEncodeU64(pEncoder, pReq->app.summary.numOfSlowQueries) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.numOfSlowQueries));
if (tEncodeU64(pEncoder, pReq->app.summary.totalRequests) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.totalRequests));
if (tEncodeU64(pEncoder, pReq->app.summary.currentRequests) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pReq->app.summary.currentRequests));
int32_t queryNum = 0; int32_t queryNum = 0;
if (pReq->query) { if (pReq->query) {
queryNum = 1; queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum));
if (tEncodeU32(pEncoder, pReq->query->connId) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pReq->query->connId));
int32_t num = taosArrayGetSize(pReq->query->queryDesc); int32_t num = taosArrayGetSize(pReq->query->queryDesc);
if (tEncodeI32(pEncoder, num) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, num));
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i); SQueryDesc *desc = taosArrayGet(pReq->query->queryDesc, i);
if (tEncodeCStr(pEncoder, desc->sql) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->sql));
if (tEncodeU64(pEncoder, desc->queryId) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, desc->queryId));
if (tEncodeI64(pEncoder, desc->useconds) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->useconds));
if (tEncodeI64(pEncoder, desc->stime) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->stime));
if (tEncodeI64(pEncoder, desc->reqRid) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, desc->reqRid));
if (tEncodeI8(pEncoder, desc->stableQuery) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->stableQuery));
if (tEncodeI8(pEncoder, desc->isSubQuery) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(pEncoder, desc->isSubQuery));
if (tEncodeCStr(pEncoder, desc->fqdn) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, desc->fqdn));
if (tEncodeI32(pEncoder, desc->subPlanNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, desc->subPlanNum));
int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0; int32_t snum = desc->subDesc ? taosArrayGetSize(desc->subDesc) : 0;
if (tEncodeI32(pEncoder, snum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, snum));
for (int32_t m = 0; m < snum; ++m) { for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m); SQuerySubDesc *sDesc = taosArrayGet(desc->subDesc, m);
if (tEncodeI64(pEncoder, sDesc->tid) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI64(pEncoder, sDesc->tid));
if (tEncodeCStr(pEncoder, sDesc->status) < 0) return -1; TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, sDesc->status));
} }
} }
} else { } else {
if (tEncodeI32(pEncoder, queryNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum));
} }
} }
int32_t kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
if (tEncodeI32(pEncoder, kvNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, kvNum));
void *pIter = taosHashIterate(pReq->info, NULL); void *pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
SKv *kv = pIter; SKv *kv = pIter;
if (tEncodeSKv(pEncoder, kv) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSKv(pEncoder, kv));
pIter = taosHashIterate(pReq->info, pIter); pIter = taosHashIterate(pReq->info, pIter);
} }
@ -322,83 +319,88 @@ static int32_t tSerializeSClientHbReq(SEncoder *pEncoder, const SClientHbReq *pR
} }
static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) { static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq) {
if (tDecodeSClientHbKey(pDecoder, &pReq->connKey) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSClientHbKey(pDecoder, &pReq->connKey));
if (pReq->connKey.connType == CONN_TYPE__QUERY) { if (pReq->connKey.connType == CONN_TYPE__QUERY) {
if (tDecodeI64(pDecoder, &pReq->app.appId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->app.appId));
if (tDecodeI32(pDecoder, &pReq->app.pid) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->app.pid));
if (tDecodeCStrTo(pDecoder, pReq->app.name) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pReq->app.name));
if (tDecodeI64(pDecoder, &pReq->app.startTime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pReq->app.startTime));
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertsReq) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertsReq));
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertRows) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfInsertRows));
if (tDecodeU64(pDecoder, &pReq->app.summary.insertElapsedTime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.insertElapsedTime));
if (tDecodeU64(pDecoder, &pReq->app.summary.insertBytes) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.insertBytes));
if (tDecodeU64(pDecoder, &pReq->app.summary.fetchBytes) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.fetchBytes));
if (tDecodeU64(pDecoder, &pReq->app.summary.queryElapsedTime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.queryElapsedTime));
if (tDecodeU64(pDecoder, &pReq->app.summary.numOfSlowQueries) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.numOfSlowQueries));
if (tDecodeU64(pDecoder, &pReq->app.summary.totalRequests) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.totalRequests));
if (tDecodeU64(pDecoder, &pReq->app.summary.currentRequests) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pReq->app.summary.currentRequests));
int32_t queryNum = 0; int32_t queryNum = 0;
if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum));
if (queryNum) { if (queryNum) {
pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query)); pReq->query = taosMemoryCalloc(1, sizeof(*pReq->query));
if (NULL == pReq->query) return -1; if (NULL == pReq->query) return -1;
if (tDecodeU32(pDecoder, &pReq->query->connId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pReq->query->connId));
int32_t num = 0; int32_t num = 0;
if (tDecodeI32(pDecoder, &num) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &num));
if (num > 0) { if (num > 0) {
pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc)); pReq->query->queryDesc = taosArrayInit(num, sizeof(SQueryDesc));
if (NULL == pReq->query->queryDesc) return -1; if (NULL == pReq->query->queryDesc) return -1;
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SQueryDesc desc = {0}; SQueryDesc desc = {0};
if (tDecodeCStrTo(pDecoder, desc.sql) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, desc.sql));
if (tDecodeU64(pDecoder, &desc.queryId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &desc.queryId));
if (tDecodeI64(pDecoder, &desc.useconds) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.useconds));
if (tDecodeI64(pDecoder, &desc.stime) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.stime));
if (tDecodeI64(pDecoder, &desc.reqRid) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &desc.reqRid));
if (tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&desc.stableQuery));
if (tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(pDecoder, (int8_t *)&desc.isSubQuery));
if (tDecodeCStrTo(pDecoder, desc.fqdn) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, desc.fqdn));
if (tDecodeI32(pDecoder, &desc.subPlanNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &desc.subPlanNum));
int32_t snum = 0; int32_t snum = 0;
if (tDecodeI32(pDecoder, &snum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &snum));
if (snum > 0) { if (snum > 0) {
desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc)); desc.subDesc = taosArrayInit(snum, sizeof(SQuerySubDesc));
if (NULL == desc.subDesc) return -1; if (NULL == desc.subDesc) return -1;
for (int32_t m = 0; m < snum; ++m) { for (int32_t m = 0; m < snum; ++m) {
SQuerySubDesc sDesc = {0}; SQuerySubDesc sDesc = {0};
if (tDecodeI64(pDecoder, &sDesc.tid) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &sDesc.tid));
if (tDecodeCStrTo(pDecoder, sDesc.status) < 0) return -1; TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, sDesc.status));
if (!taosArrayPush(desc.subDesc, &sDesc)) return -1; if (!taosArrayPush(desc.subDesc, &sDesc)) {
return terrno;
}
} }
} }
ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc)); ASSERT(desc.subPlanNum == taosArrayGetSize(desc.subDesc));
if (!taosArrayPush(pReq->query->queryDesc, &desc)) return -1; if (!taosArrayPush(pReq->query->queryDesc, &desc)) {
return terrno;
}
} }
} }
} }
} }
int32_t kvNum = 0; int32_t kvNum = 0;
if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum));
if (pReq->info == NULL) { if (pReq->info == NULL) {
pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); pReq->info = taosHashInit(kvNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
} }
if (pReq->info == NULL) return -1; if (pReq->info == NULL) {
return terrno;
}
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv kv = {0}; SKv kv = {0};
if (tDecodeSKv(pDecoder, &kv) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv));
int32_t code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv)); int32_t code = taosHashPut(pReq->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
if (code) { if (code) {
terrno = code; return terrno = code;
return -1;
} }
} }
@ -406,75 +408,75 @@ static int32_t tDeserializeSClientHbReq(SDecoder *pDecoder, SClientHbReq *pReq)
} }
static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pRsp) { static int32_t tSerializeSClientHbRsp(SEncoder *pEncoder, const SClientHbRsp *pRsp) {
if (tEncodeSClientHbKey(pEncoder, &pRsp->connKey) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSClientHbKey(pEncoder, &pRsp->connKey));
if (tEncodeI32(pEncoder, pRsp->status) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->status));
int32_t queryNum = 0; int32_t queryNum = 0;
if (pRsp->query) { if (pRsp->query) {
queryNum = 1; queryNum = 1;
if (tEncodeI32(pEncoder, queryNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum));
if (tEncodeU32(pEncoder, pRsp->query->connId) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pRsp->query->connId));
if (tEncodeU64(pEncoder, pRsp->query->killRid) < 0) return -1; TAOS_CHECK_RETURN(tEncodeU64(pEncoder, pRsp->query->killRid));
if (tEncodeI32(pEncoder, pRsp->query->totalDnodes) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->query->totalDnodes));
if (tEncodeI32(pEncoder, pRsp->query->onlineDnodes) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pRsp->query->onlineDnodes));
if (tEncodeI8(pEncoder, pRsp->query->killConnection) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pRsp->query->killConnection));
if (tEncodeSEpSet(pEncoder, &pRsp->query->epSet) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSEpSet(pEncoder, &pRsp->query->epSet));
int32_t num = taosArrayGetSize(pRsp->query->pQnodeList); int32_t num = taosArrayGetSize(pRsp->query->pQnodeList);
if (tEncodeI32(pEncoder, num) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, num));
for (int32_t i = 0; i < num; ++i) { for (int32_t i = 0; i < num; ++i) {
SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i); SQueryNodeLoad *pLoad = taosArrayGet(pRsp->query->pQnodeList, i);
if (tEncodeSQueryNodeLoad(pEncoder, pLoad) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSQueryNodeLoad(pEncoder, pLoad));
} }
} else { } else {
if (tEncodeI32(pEncoder, queryNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, queryNum));
} }
int32_t kvNum = taosArrayGetSize(pRsp->info); int32_t kvNum = taosArrayGetSize(pRsp->info);
if (tEncodeI32(pEncoder, kvNum) < 0) return -1; TAOS_CHECK_RETURN(tEncodeI32(pEncoder, kvNum));
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv *kv = taosArrayGet(pRsp->info, i); SKv *kv = taosArrayGet(pRsp->info, i);
if (tEncodeSKv(pEncoder, kv) < 0) return -1; TAOS_CHECK_RETURN(tEncodeSKv(pEncoder, kv));
} }
return 0; return 0;
} }
static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) { static int32_t tDeserializeSClientHbRsp(SDecoder *pDecoder, SClientHbRsp *pRsp) {
if (tDecodeSClientHbKey(pDecoder, &pRsp->connKey) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSClientHbKey(pDecoder, &pRsp->connKey));
if (tDecodeI32(pDecoder, &pRsp->status) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->status));
int32_t queryNum = 0; int32_t queryNum = 0;
if (tDecodeI32(pDecoder, &queryNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &queryNum));
if (queryNum) { if (queryNum) {
pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query)); pRsp->query = taosMemoryCalloc(1, sizeof(*pRsp->query));
if (NULL == pRsp->query) return -1; if (NULL == pRsp->query) return -1;
if (tDecodeU32(pDecoder, &pRsp->query->connId) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pRsp->query->connId));
if (tDecodeU64(pDecoder, &pRsp->query->killRid) < 0) return -1; TAOS_CHECK_RETURN(tDecodeU64(pDecoder, &pRsp->query->killRid));
if (tDecodeI32(pDecoder, &pRsp->query->totalDnodes) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->totalDnodes));
if (tDecodeI32(pDecoder, &pRsp->query->onlineDnodes) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pRsp->query->onlineDnodes));
if (tDecodeI8(pDecoder, &pRsp->query->killConnection) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pRsp->query->killConnection));
if (tDecodeSEpSet(pDecoder, &pRsp->query->epSet) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSEpSet(pDecoder, &pRsp->query->epSet));
int32_t pQnodeNum = 0; int32_t pQnodeNum = 0;
if (tDecodeI32(pDecoder, &pQnodeNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pQnodeNum));
if (pQnodeNum > 0) { if (pQnodeNum > 0) {
pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad)); pRsp->query->pQnodeList = taosArrayInit(pQnodeNum, sizeof(SQueryNodeLoad));
if (NULL == pRsp->query->pQnodeList) return -1; if (NULL == pRsp->query->pQnodeList) return terrno;
for (int32_t i = 0; i < pQnodeNum; ++i) { for (int32_t i = 0; i < pQnodeNum; ++i) {
SQueryNodeLoad load = {0}; SQueryNodeLoad load = {0};
if (tDecodeSQueryNodeLoad(pDecoder, &load) < 0) return -1; TAOS_CHECK_RETURN(tDecodeSQueryNodeLoad(pDecoder, &load));
if (!taosArrayPush(pRsp->query->pQnodeList, &load)) return -1; if (!taosArrayPush(pRsp->query->pQnodeList, &load)) return terrno;
} }
} }
} }
int32_t kvNum = 0; int32_t kvNum = 0;
if (tDecodeI32(pDecoder, &kvNum) < 0) return -1; TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &kvNum));
pRsp->info = taosArrayInit(kvNum, sizeof(SKv)); pRsp->info = taosArrayInit(kvNum, sizeof(SKv));
if (pRsp->info == NULL) return -1; if (pRsp->info == NULL) return -1;
for (int32_t i = 0; i < kvNum; i++) { for (int32_t i = 0; i < kvNum; i++) {
SKv kv = {0}; SKv kv = {0};
if (tDecodeSKv(pDecoder, &kv)) return -1; TAOS_CHECK_RETURN(tDecodeSKv(pDecoder, &kv));
if (!taosArrayPush(pRsp->info, &kv)) return -1; if (!taosArrayPush(pRsp->info, &kv)) return terrno;
} }
return 0; return 0;
@ -5308,7 +5310,7 @@ int32_t tSerializeSMTimerMsg(void *buf, int32_t bufLen, SMTimerReq *pReq) {
// return 0; // return 0;
// } // }
int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { int32_t tSerializeDropOrphanTaskMsg(void *buf, int32_t bufLen, SMStreamDropOrphanMsg *pMsg) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
@ -5331,7 +5333,7 @@ int32_t tSerializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrpha
return tlen; return tlen;
} }
int32_t tDeserializeDropOrphanTaskMsg(void* buf, int32_t bufLen, SMStreamDropOrphanMsg* pMsg) { int32_t tDeserializeDropOrphanTaskMsg(void *buf, int32_t bufLen, SMStreamDropOrphanMsg *pMsg) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);

View File

@ -1180,7 +1180,7 @@ int tdbPagerRestoreJournals(SPager *pPager) {
if (code) { if (code) {
taosArrayDestroy(pTxnList); taosArrayDestroy(pTxnList);
(void)tdbCloseDir(&pDir); (void)tdbCloseDir(&pDir);
tdbError("failed to restore file due to %s. jFileName:%s", strerror(code), jname); tdbError("failed to restore file due to %s. jFileName:%s", tstrerror(code), jname);
return code; return code;
} }
} }