From d96cdff8f39f283c3bdb0c9ac6fb37b727ff545f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 28 Jan 2023 18:25:43 +0800 Subject: [PATCH 01/11] fix:add logic for auto create table while inserting data for taosx --- source/dnode/vnode/inc/vnode.h | 4 +- source/dnode/vnode/src/tq/tqExec.c | 89 ++++++++++++++++--------- source/dnode/vnode/src/tq/tqRead.c | 9 +-- source/libs/executor/src/scanoperator.c | 4 +- 4 files changed, 65 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3f3e287bb9..33325d41f3 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -266,8 +266,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock2(STqReader *pReader); bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader); -int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas); +int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet); +int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet); // int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); // int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 40a82cc8e8..562e188927 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -241,7 +241,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { @@ -255,23 +256,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int64_t uid = pExec->pExecReader->lastBlkUid; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } -#endif + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; + } + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); @@ -294,7 +305,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR /*}*/ taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { @@ -307,22 +319,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } -#endif + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; + } + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ /*pTq->pVnode->config.tsdbCfg.precision);*/ diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index eb9c0c3eeb..e6d81f5b10 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,7 +332,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (tqNextDataBlock2(pReader)) { // TODO mem free memset(&ret->data, 0, sizeof(SSDataBlock)); - int32_t code = tqRetrieveDataBlock2(&ret->data, pReader); + int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); if (code != 0 || ret->data.info.rows == 0) { continue; } @@ -550,7 +550,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) } #endif -int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { +int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); ASSERT(pReader->nextBlk < blockSz); @@ -559,6 +559,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -1006,9 +1007,9 @@ FAIL: } #endif -int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) { +int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { SSDataBlock block = {0}; - if (tqRetrieveDataBlock2(&block, pReader) == 0) { + if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) { taosArrayPush(blocks, &block); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper); taosArrayPush(schemas, &pSW); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 37c33c44e2..7529a5f8f7 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1532,7 +1532,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; @@ -1941,7 +1941,7 @@ FETCH_NEXT_BLOCK: while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; From d7e15881f5fef3e86742bd6007301412424ac80f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 29 Jan 2023 14:08:56 +0800 Subject: [PATCH 02/11] fix:length error in data block --- source/libs/parser/src/parInsertUtil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 7cc93b448f..07266364c4 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -681,7 +681,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); - pStart += colLength[c]; + pStart += htonl(colLength[c]); } end: From 94a3fa4ff3229766171ee7a8ad7e0d92ec9145b0 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 30 Jan 2023 09:55:22 +0800 Subject: [PATCH 03/11] fix:htol(length) & set vgId,uid correct for auto create table in taosx --- source/client/src/clientRawBlockWrite.c | 33 ++++++++++++++++++++----- source/dnode/vnode/src/tq/tq.c | 3 +++ source/dnode/vnode/src/tq/tqExec.c | 15 +++++------ utils/test/c/tmq_taosx_ci.c | 3 ++- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3fd6fed4fc..d84cbfe409 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("smlBuildOutput failed"); - return code; + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1538,6 +1538,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; + SVCreateTbReq* pCreateReqDst = NULL; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); @@ -1605,17 +1606,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) strcpy(pName.tname, tbName); // find schema data info - SVCreateTbReq pCreateReq = {0}; - for (int j = 0; j < rspObj.rsp.createTableNum; j++) { void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); SDecoder decoderTmp = {0}; + SVCreateTbReq pCreateReq = {0}; tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - memset(&pCreateReq, 0, sizeof(SVCreateTbReq)); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); + uError("WriteRaw: tDecodeSVCreateTbReq error"); + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } @@ -1625,13 +1626,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { - strcpy(pName.tname, pCreateReq.ctb.stbName); + cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); tDecoderClear(&decoderTmp); break; } tDecoderClear(&decoderTmp); } + if(pCreateReqDst){ + strcpy(pName.tname, pCreateReqDst->ctb.stbName); + }else{ + strcpy(pName.tname, tbName); + } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); @@ -1650,16 +1656,27 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } + if(pCreateReqDst){ + pTableMeta->vgId = vg.vgId; + pTableMeta->uid = pCreateReqDst->uid; + } void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); if (hData == NULL) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0); + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; } + pCreateReqDst = NULL; + } + + code = smlBuildOutput(pQuery, pVgHash); + if (code != TSDB_CODE_SUCCESS) { + uError("smlBuildOutput failed"); + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1672,6 +1689,10 @@ end: destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); + if (pCreateReqDst) { + tdDestroySVCreateTbReq(pCreateReqDst); + taosMemoryFree(pCreateReqDst); + } return code; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a27b6988c5..bda0256b51 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -683,6 +683,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, + TD_VID(pTq->pVnode), req.subKey); + return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 562e188927..ae9b7fdcf3 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -34,7 +34,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t return 0; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) { +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper); if (pSW == NULL) { return -1; @@ -43,7 +43,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success @@ -153,7 +153,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { continue; } } else { @@ -163,7 +163,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); + tqAddBlockSchemaToRsp(pExec, pRsp); } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); @@ -248,7 +248,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR if (pRsp->withTbName) { /*int64_t uid = pExec->pExecReader->msgIter.uid;*/ int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -311,7 +311,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -364,9 +364,6 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } - taosArrayDestroy(pBlocks); - taosArrayDestroy(pSchemas); - if (pRsp->blockNum == 0) { return -1; } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 69bf52bb1a..dfa0edc66e 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -70,6 +70,7 @@ static void msg_process(TAOS_RES* msg) { tmq_get_raw(msg, &raw); printf("write raw data type: %d\n", raw.raw_type); int32_t ret = tmq_write_raw(pConn, raw); + ASSERT(ret == 0); printf("write raw data: %s\n", tmq_err2str(ret)); tmq_free_raw(raw); taos_close(pConn); @@ -361,7 +362,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " + pRes = taos_query(pConn, "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); if (taos_errno(pRes) != 0) { printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes)); From 4128c5653c718b93de8990f13b4bea6cf794f354 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 30 Jan 2023 17:23:04 +0800 Subject: [PATCH 04/11] fix:error in tmq for taosx --- source/dnode/vnode/src/tq/tqRead.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index e6d81f5b10..12f7af3a71 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -671,7 +671,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD sourceIdx++; } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { - tColDataGetValue(pCol, sourceIdx, &colVal); + tColDataGetValue(pCol, i, &colVal); #if 0 void* val = NULL; if (IS_STR_DATA_TYPE(colVal.type)) { @@ -711,7 +711,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD for (int32_t i = 0; i < numOfRows; i++) { SRow* pRow = taosArrayGetP(pRows, i); - int32_t targetIdx = 0; int32_t sourceIdx = 0; for (int32_t j = 0; j < colActual; j++) { @@ -744,7 +743,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbD } sourceIdx++; - targetIdx++; break; } else { ASSERT(0); From 2ad5869659280411049727b56f1d6fdd2825b835 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 31 Jan 2023 14:25:39 +0800 Subject: [PATCH 05/11] fix:add log for block encode --- source/common/src/tdatablock.c | 1 + source/libs/parser/src/parInsertUtil.c | 1 + 2 files changed, 2 insertions(+) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 9b5b32cf69..22fe3c6538 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2483,6 +2483,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; colSizes[col] = htonl(colSizes[col]); + uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); } *actualLen = dataLen; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 07266364c4..62feff798c 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -678,6 +678,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate pStart += BitmapLen(numOfRows); } char* pData = pStart; + uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c])); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); From 936b15629fe17b26d4e89709d9f3daf7d3d8090e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 31 Jan 2023 15:26:58 +0800 Subject: [PATCH 06/11] fix:remove htonl length in block encode --- source/client/src/clientImpl.c | 11 +++++++---- source/common/src/tdatablock.c | 6 +++--- source/libs/parser/src/parInsertUtil.c | 5 +++-- 3 files changed, 13 insertions(+), 9 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a119408a08..26d5d5fcc0 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1771,8 +1771,10 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { - int32_t colLen = htonl(colLength[i]); - int32_t colLen1 = htonl(colLength1[i]); +// int32_t colLen = htonl(colLength[i]); +// int32_t colLen1 = htonl(colLength1[i]); + int32_t colLen = colLength[i]; + int32_t colLen1 = colLength1[i]; if(ASSERT(colLen < dataLen)){ tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1831,7 +1833,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; - colLength1[i] = htonl(len); +// colLength1[i] = htonl(len); + colLength1[i] = len; } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -1919,7 +1922,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { - colLength[i] = htonl(colLength[i]); +// colLength[i] = htonl(colLength[i]); if (colLength[i] >= dataLen) { tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 22fe3c6538..64a87d16e0 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2482,8 +2482,8 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } data += colSizes[col]; - colSizes[col] = htonl(colSizes[col]); - uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); +// colSizes[col] = htonl(colSizes[col]); +// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); } *actualLen = dataLen; @@ -2547,7 +2547,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { - colLen[i] = htonl(colLen[i]); +// colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 62feff798c..fcacaa0093 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -678,11 +678,12 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate pStart += BitmapLen(numOfRows); } char* pData = pStart; - uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c])); +// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c])); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); - pStart += htonl(colLength[c]); + pStart += colLength[c]; +// pStart += htonl(colLength[c]); } end: From 2bf0916bcf74e88cc8e8372f76f6493027cd6ab3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 1 Feb 2023 16:12:59 +0800 Subject: [PATCH 07/11] fix:split block data if there are none value in block for taosx --- source/client/src/clientRawBlockWrite.c | 26 +- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 256 +++++++++++++++++- tests/system-test/7-tmq/tmq_taosx.py | 20 +- 4 files changed, 285 insertions(+), 19 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d84cbfe409..c3dcdbd221 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1505,7 +1505,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1665,7 +1676,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index dcb63f6524..4eb0151c58 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -39,7 +39,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pMsg->msgType)]; if (msgFp == NULL) { terrno = TSDB_CODE_MSG_NOT_PROCESSED; - dGError("msg:%p, not processed since no handler", pMsg); + dGError("msg:%p,info:%s not processed since no handler", pMsg, TMSG_INFO(pMsg->msgType)); return -1; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 12f7af3a71..bbc21d498b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1006,13 +1006,257 @@ FAIL: #endif int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock2(&block, pReader, pSubmitTbDataRet) == 0) { - taosArrayPush(blocks, &block); - SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper); - taosArrayPush(schemas, &pSW); - return 0; + tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); + + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); + pReader->nextBlk++; + + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + int32_t sversion = pSubmitTbData->sver; + int64_t suid = pSubmitTbData->suid; + int64_t uid = pSubmitTbData->uid; + pReader->lastBlkUid = uid; + + taosMemoryFree(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchema == NULL) { + tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 + "), version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; } + + tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchemaWrapper == NULL) { + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + + STSchema* pTschema = pReader->pSchema; + SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + int32_t numOfRows = 0; + + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + SColData* pCol = taosArrayGet(pCols, 0); + numOfRows = pCol->nVal; + } else { + SArray* pRows = pSubmitTbData->aRowP; + numOfRows = taosArrayGetSize(pRows); + } + + int32_t curRow = 0; + int32_t lastRow = 0; + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + if (assigned == NULL) return -1; + + // convert and scan one block + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + int32_t numOfCols = taosArrayGetSize(pCols); + for (int32_t i = 0; i < numOfRows; i++) { + bool buildNew = false; + + for (int32_t j = 0; j < numOfCols; j++){ + SColData* pCol = taosArrayGet(pCols, j); + SColVal colVal; + tColDataGetValue(pCol, i, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColData* pCol = taosArrayGet(pCols, sourceIdx); + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + + if (pCol->cid < pColData->info.colId) { + sourceIdx++; + } else if (pCol->cid == pColData->info.colId) { + tColDataGetValue(pCol, i, &colVal); + + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + + curRow++; + } + } else { + SArray* pRows = pSubmitTbData->aRowP; + for (int32_t i = 0; i < numOfRows; i++) { + SRow* pRow = taosArrayGetP(pRows, i); + bool buildNew = false; + + for (int32_t j = 0; j < pTschema->numOfCols; j++){ + SColVal colVal; + tRowGet(pRow, pTschema, j, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + tRowGet(pRow, pTschema, sourceIdx, &colVal); + + if (colVal.cid < pColData->info.colId) { + sourceIdx++; + } else if (colVal.cid == pColData->info.colId) { + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + curRow++; + } + } + + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + + taosMemoryFree(assigned); + return 0; + + FAIL: + taosMemoryFree(assigned); return -1; } diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 7ca18f293c..c056451978 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -45,7 +45,7 @@ class TDTestCase: if drop: tdSql.checkRows(10) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -63,20 +63,20 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") tdSql.execute('use abc1') tdSql.query("show tables") if drop: tdSql.checkRows(10) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -94,13 +94,13 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") return From c8d49b603d293641e51d5ec429af5c327422c8f4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 2 Feb 2023 17:23:01 +0800 Subject: [PATCH 08/11] fix:roll back htol length for block raw data --- include/libs/parser/parser.h | 2 +- source/client/src/clientImpl.c | 11 ++++------- source/client/src/clientRawBlockWrite.c | 8 ++++---- source/common/src/tdatablock.c | 4 ++-- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 5 +++-- source/libs/parser/src/parInsertUtil.c | 9 ++++++--- 7 files changed, 21 insertions(+), 20 deletions(-) diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 8f22745973..0878e267c3 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -115,7 +115,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); -int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields); +int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index a21d244c08..2c53fe4080 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1771,10 +1771,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int char* pStart = p; char* pStart1 = p1; for (int32_t i = 0; i < numOfCols; ++i) { -// int32_t colLen = htonl(colLength[i]); -// int32_t colLen1 = htonl(colLength1[i]); - int32_t colLen = colLength[i]; - int32_t colLen1 = colLength1[i]; + int32_t colLen = htonl(colLength[i]); + int32_t colLen1 = htonl(colLength1[i]); if(ASSERT(colLen < dataLen)){ tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; @@ -1833,8 +1831,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int } colLen1 = len; totalLen += colLen1; -// colLength1[i] = htonl(len); - colLength1[i] = len; + colLength1[i] = htonl(len); } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { len = numOfRows * sizeof(int32_t); memcpy(pStart1, pStart, len); @@ -1922,7 +1919,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 char* pStart = p; for (int32_t i = 0; i < numOfCols; ++i) { -// colLength[i] = htonl(colLength[i]); + colLength[i] = htonl(colLength[i]); if (colLength[i] >= dataLen) { tscError("invalid colLength %d, dataLen %d", colLength[i], dataLen); return TSDB_CODE_TSC_INTERNAL_ERROR; diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index c6f08d55c1..0ae9a67451 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1307,7 +1307,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields); + code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1387,7 +1387,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0); + code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1515,7 +1515,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols); + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); @@ -1686,7 +1686,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) fields[i].bytes = pSW->pSchema[i].bytes; tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols); + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true); taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 64a87d16e0..69f98c5e30 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2482,7 +2482,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { } data += colSizes[col]; -// colSizes[col] = htonl(colSizes[col]); + colSizes[col] = htonl(colSizes[col]); // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); } @@ -2547,7 +2547,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { pStart += sizeof(int32_t) * numOfCols; for (int32_t i = 0; i < numOfCols; ++i) { -// colLen[i] = htonl(colLen[i]); + colLen[i] = htonl(colLen[i]); ASSERT(colLen[i] >= 0); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12cfd3a9e3..753c42ff33 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -707,7 +707,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } else { /*A(pHandle->fetchMeta);*/ /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bbc21d498b..99f0ed7703 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -454,12 +454,13 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pReader->tbIdHash == NULL) return true; + if (filterOutUids == NULL) return true; - void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); + void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); if (ret == NULL) { return true; } + pReader->nextBlk++; } tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index fcacaa0093..78b810c1a1 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -608,7 +608,7 @@ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* f } int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, - int numFields) { + int numFields, bool needChangeLength) { STableDataCxt* pTableCxt = NULL; int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); @@ -682,8 +682,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); - pStart += colLength[c]; -// pStart += htonl(colLength[c]); + if(needChangeLength) { + pStart += htonl(colLength[c]); + }else{ + pStart += colLength[c]; + } } end: From c576d1fa913dd1e6044e28afdc8a79ef8d15bc1c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Feb 2023 14:54:25 +0800 Subject: [PATCH 09/11] fix:error for subscribe stable --- source/dnode/vnode/src/tq/tqExec.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index ae9b7fdcf3..0a18b7bd11 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -364,9 +364,9 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } - if (pRsp->blockNum == 0) { - return -1; - } +// if (pRsp->blockNum == 0) { +// return -1; +// } return 0; } From c330588d5560b8d032388dd24bd6ec352b7085d7 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 3 Feb 2023 15:35:13 +0800 Subject: [PATCH 10/11] fix:error in subscribe table for taosx --- source/client/src/clientRawBlockWrite.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 5 ++--- tests/system-test/7-tmq/tmq_taosx.py | 4 ++-- utils/test/c/tmq_taosx_ci.c | 3 ++- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 0ae9a67451..f290a83df0 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1126,7 +1126,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { TAOS_RES* res = taos_query(taos, sql); SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { code = TSDB_CODE_SUCCESS; } taos_free_result(res); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c243e83a15..73a296ebe4 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -971,9 +971,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { code = 0; goto _OVER; } else if (pStb->uid != createReq.suid) { - mError("stb:%s, already exist while create, input suid:%" PRId64 " not match with exist suid:%" PRId64, - createReq.name, createReq.suid, pStb->uid); - terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH; + mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); + code = 0; goto _OVER; } else if (createReq.tagVer > 0 || createReq.colVer > 0) { int32_t tagDelta = createReq.tagVer - pStb->tagVer; diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index c056451978..b2bf6eec0b 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -43,7 +43,7 @@ class TDTestCase: tdSql.execute('use db_taosx') tdSql.query("show tables") if drop: - tdSql.checkRows(10) + tdSql.checkRows(11) else: tdSql.checkRows(16) tdSql.query("select * from jt order by i") @@ -74,7 +74,7 @@ class TDTestCase: tdSql.execute('use abc1') tdSql.query("show tables") if drop: - tdSql.checkRows(10) + tdSql.checkRows(11) else: tdSql.checkRows(16) tdSql.query("select * from jt order by i") diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index dfa0edc66e..fb05ae262b 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -70,8 +70,9 @@ static void msg_process(TAOS_RES* msg) { tmq_get_raw(msg, &raw); printf("write raw data type: %d\n", raw.raw_type); int32_t ret = tmq_write_raw(pConn, raw); - ASSERT(ret == 0); printf("write raw data: %s\n", tmq_err2str(ret)); + ASSERT(ret == 0); + tmq_free_raw(raw); taos_close(pConn); } From 33de4e2aba857b98ff30433d27606d49bb071201 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 6 Feb 2023 09:21:46 +0800 Subject: [PATCH 11/11] fix:memory leak --- source/dnode/vnode/src/tq/tqExec.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 0a18b7bd11..7896b931dc 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -363,7 +363,8 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } } - + taosArrayDestroy(pBlocks); + taosArrayDestroy(pSchemas); // if (pRsp->blockNum == 0) { // return -1; // }