fix:[TD-31017]process return value in vnode for tmq
This commit is contained in:
parent
99ac957290
commit
680115e8c5
|
@ -356,22 +356,25 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
tDecoderInit(&dc, pVal, vLen);
|
tDecoderInit(&dc, pVal, vLen);
|
||||||
ret = metaDecodeEntry(&dc, &me);
|
ret = metaDecodeEntry(&dc, &me);
|
||||||
tDecoderClear(&dc);
|
|
||||||
|
|
||||||
if (ret < 0){
|
if (ret < 0){
|
||||||
|
tDecoderClear(&dc);
|
||||||
return TAOS_GET_TERRNO(ret);
|
return TAOS_GET_TERRNO(ret);
|
||||||
}
|
}
|
||||||
if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
|
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
|
||||||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
|
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
|
||||||
|
tDecoderClear(&dc);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayPush(ctx->idList, &tmp->uid) == NULL){
|
if (taosArrayPush(ctx->idList, &tmp->uid) == NULL){
|
||||||
|
tDecoderClear(&dc);
|
||||||
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
|
metaDebug("tmqsnap init idlist name:%s, uid:%" PRIi64, me.name, tmp->uid);
|
||||||
|
tDecoderClear(&dc);
|
||||||
|
|
||||||
SIdInfo info = {0};
|
SIdInfo info = {0};
|
||||||
if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
|
if (taosHashPut(ctx->idVersion, &tmp->uid, sizeof(tb_uid_t), &info, sizeof(SIdInfo)) != 0) {
|
||||||
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
return TAOS_GET_TERRNO(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
@ -402,14 +405,15 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8
|
||||||
SMetaEntry me = {0};
|
SMetaEntry me = {0};
|
||||||
tDecoderInit(&dc, pVal, vLen);
|
tDecoderInit(&dc, pVal, vLen);
|
||||||
ret = metaDecodeEntry(&dc, &me);
|
ret = metaDecodeEntry(&dc, &me);
|
||||||
tDecoderClear(&dc);
|
|
||||||
if (ret < 0){
|
if (ret < 0){
|
||||||
|
tDecoderClear(&dc);
|
||||||
return TAOS_GET_TERRNO(ret);
|
return TAOS_GET_TERRNO(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (ctx->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
|
if ((me.uid != ctx->suid && me.type == TSDB_SUPER_TABLE) ||
|
||||||
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
|
(me.ctbEntry.suid != ctx->suid && me.type == TSDB_CHILD_TABLE)) {
|
||||||
|
tDecoderClear(&dc);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -418,9 +422,11 @@ int32_t buildSnapContext(SVnode* pVnode, int64_t snapVersion, int64_t suid, int8
|
||||||
(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
|
(ctx->subType == TOPIC_SUB_TYPE__TABLE && me.uid == ctx->suid)) {
|
||||||
ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
|
ret = saveSuperTableInfoForChildTable(&me, ctx->suidInfo);
|
||||||
if (ret != 0){
|
if (ret != 0){
|
||||||
|
tDecoderClear(&dc);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tDecoderClear(&dc);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
|
for (int i = 0; i < taosArrayGetSize(ctx->idList); i++) {
|
||||||
|
|
|
@ -357,10 +357,17 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, STaosxRsp* pRsp, int3
|
||||||
*totalRows += pBlock->info.rows;
|
*totalRows += pBlock->info.rows;
|
||||||
blockDataFreeRes(pBlock);
|
blockDataFreeRes(pBlock);
|
||||||
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
|
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
|
||||||
(void) taosArrayPush(pRsp->common.blockSchema, &pSW);
|
if (taosArrayPush(pRsp->common.blockSchema, &pSW) == NULL){
|
||||||
|
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
pRsp->common.blockNum++;
|
pRsp->common.blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pBlocks);
|
||||||
|
taosArrayDestroy(pSchemas);
|
||||||
|
return;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
|
||||||
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
|
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
|
||||||
|
|
Loading…
Reference in New Issue