diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3fd6fed4fc..d84cbfe409 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("smlBuildOutput failed"); - return code; + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1538,6 +1538,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; + SVCreateTbReq* pCreateReqDst = NULL; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); @@ -1605,17 +1606,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) strcpy(pName.tname, tbName); // find schema data info - SVCreateTbReq pCreateReq = {0}; - for (int j = 0; j < rspObj.rsp.createTableNum; j++) { void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); SDecoder decoderTmp = {0}; + SVCreateTbReq pCreateReq = {0}; tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - memset(&pCreateReq, 0, sizeof(SVCreateTbReq)); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); + uError("WriteRaw: tDecodeSVCreateTbReq error"); + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } @@ -1625,13 +1626,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { - strcpy(pName.tname, pCreateReq.ctb.stbName); + cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); tDecoderClear(&decoderTmp); break; } tDecoderClear(&decoderTmp); } + if(pCreateReqDst){ + strcpy(pName.tname, pCreateReqDst->ctb.stbName); + }else{ + strcpy(pName.tname, tbName); + } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); @@ -1650,16 +1656,27 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } + if(pCreateReqDst){ + pTableMeta->vgId = vg.vgId; + pTableMeta->uid = pCreateReqDst->uid; + } void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); if (hData == NULL) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0); + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; } + pCreateReqDst = NULL; + } + + code = smlBuildOutput(pQuery, pVgHash); + if (code != TSDB_CODE_SUCCESS) { + uError("smlBuildOutput failed"); + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1672,6 +1689,10 @@ end: destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); + if (pCreateReqDst) { + tdDestroySVCreateTbReq(pCreateReqDst); + taosMemoryFree(pCreateReqDst); + } return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a27b6988c5..bda0256b51 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -683,6 +683,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, + TD_VID(pTq->pVnode), req.subKey); + return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 562e188927..ae9b7fdcf3 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -34,7 +34,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t return 0; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) { +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper); if (pSW == NULL) { return -1; @@ -43,7 +43,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success @@ -153,7 +153,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { continue; } } else { @@ -163,7 +163,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); + tqAddBlockSchemaToRsp(pExec, pRsp); } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); @@ -248,7 +248,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pRsp->withTbName) { /*int64_t uid = pExec->pExecReader->msgIter.uid;*/ int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -364,9 +364,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } - taosArrayDestroy(pBlocks); - taosArrayDestroy(pSchemas); - if (pRsp->blockNum == 0) { return -1; } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 69bf52bb1a..dfa0edc66e 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -70,6 +70,7 @@ static void msg_process(TAOS_RES* msg) { tmq_get_raw(msg, &raw); printf("write raw data type: %d\n", raw.raw_type); int32_t ret = tmq_write_raw(pConn, raw); + ASSERT(ret == 0); printf("write raw data: %s\n", tmq_err2str(ret)); tmq_free_raw(raw); taos_close(pConn); @@ -361,7 +362,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " + pRes = taos_query(pConn, "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); if (taos_errno(pRes) != 0) { printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));