diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 51b285cddd..558203052f 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -116,9 +116,7 @@ int32_t smlBindData(SQuery* handle, bool dataFormat, SArray* tags, SArray* colsS STableMeta* pTableMeta, char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash); - -int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* fields, - int numFields); +int rawBlockBindData(SQuery *query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD *fields, int numFields, bool needChangeLength); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); SArray* serializeVgroupsCreateTableBatch(SHashObj* pVgroupHashmap); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 72a00ef471..f290a83df0 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -1126,7 +1126,7 @@ static int32_t taosDeleteData(TAOS* taos, void* meta, int32_t metaLen) { TAOS_RES* res = taos_query(taos, sql); SRequestObj* pRequest = (SRequestObj*)res; code = pRequest->code; - if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { + if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST || code == TSDB_CODE_PAR_GET_META_ERROR) { code = TSDB_CODE_SUCCESS; } taos_free_result(res); @@ -1307,7 +1307,7 @@ int taos_write_raw_block_with_fields(TAOS* taos, int rows, char* pData, const ch pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields); + code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, fields, numFields, false); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1387,7 +1387,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname) pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); taosHashPut(pVgHash, (const char*)&vgData.vgId, sizeof(vgData.vgId), (char*)&vgData, sizeof(vgData)); - code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0); + code = rawBlockBindData(pQuery, pTableMeta, pData, NULL, NULL, 0, false); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1505,7 +1505,18 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, NULL, fields, pSW->nCols, true); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; @@ -1515,7 +1526,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { code = smlBuildOutput(pQuery, pVgHash); if (code != TSDB_CODE_SUCCESS) { uError("smlBuildOutput failed"); - return code; + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1538,6 +1549,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) SMqTaosxRspObj rspObj = {0}; SDecoder decoder = {0}; STableMeta* pTableMeta = NULL; + SVCreateTbReq* pCreateReqDst = NULL; terrno = TSDB_CODE_SUCCESS; SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT, 0); @@ -1605,17 +1617,17 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) strcpy(pName.tname, tbName); // find schema data info - SVCreateTbReq pCreateReq = {0}; - for (int j = 0; j < rspObj.rsp.createTableNum; j++) { void** dataTmp = taosArrayGet(rspObj.rsp.createTableReq, j); int32_t* lenTmp = taosArrayGet(rspObj.rsp.createTableLen, j); SDecoder decoderTmp = {0}; + SVCreateTbReq pCreateReq = {0}; tDecoderInit(&decoderTmp, *dataTmp, *lenTmp); - memset(&pCreateReq, 0, sizeof(SVCreateTbReq)); if (tDecodeSVCreateTbReq(&decoderTmp, &pCreateReq) < 0) { tDecoderClear(&decoderTmp); + uError("WriteRaw: tDecodeSVCreateTbReq error"); + code = TSDB_CODE_TMQ_INVALID_MSG; goto end; } @@ -1625,13 +1637,18 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } if (strcmp(tbName, pCreateReq.name) == 0) { - strcpy(pName.tname, pCreateReq.ctb.stbName); + cloneSVreateTbReq(&pCreateReq, &pCreateReqDst); tDecoderClear(&decoderTmp); break; } tDecoderClear(&decoderTmp); } + if(pCreateReqDst){ + strcpy(pName.tname, pCreateReqDst->ctb.stbName); + }else{ + strcpy(pName.tname, tbName); + } code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); if (code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { uError("WriteRaw:catalogGetTableMeta table not exist. table name: %s", tbName); @@ -1650,16 +1667,38 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) goto end; } + if(pCreateReqDst){ + pTableMeta->vgId = vg.vgId; + pTableMeta->uid = pCreateReqDst->uid; + } void* hData = taosHashGet(pVgHash, &vg.vgId, sizeof(vg.vgId)); if (hData == NULL) { taosHashPut(pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); } - code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, &pCreateReq, NULL, 0); + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.rsp.blockSchema, rspObj.resIter); + TAOS_FIELD* fields = taosMemoryCalloc(pSW->nCols, sizeof(TAOS_FIELD)); + if(fields == NULL){ + goto end; + } + for(int i = 0; i < pSW->nCols; i++){ + fields[i].type = pSW->pSchema[i].type; + fields[i].bytes = pSW->pSchema[i].bytes; + tstrncpy(fields[i].name, pSW->pSchema[i].name, tListLen(pSW->pSchema[i].name)); + } + code = rawBlockBindData(pQuery, pTableMeta, pRetrieve->data, pCreateReqDst, fields, pSW->nCols, true); + taosMemoryFree(fields); if (code != TSDB_CODE_SUCCESS) { uError("WriteRaw:rawBlockBindData failed"); goto end; } + pCreateReqDst = NULL; + } + + code = smlBuildOutput(pQuery, pVgHash); + if (code != TSDB_CODE_SUCCESS) { + uError("smlBuildOutput failed"); + goto end; } launchQueryImpl(pRequest, pQuery, true, NULL); @@ -1672,6 +1711,10 @@ end: destroyRequest(pRequest); taosHashCleanup(pVgHash); taosMemoryFreeClear(pTableMeta); + if (pCreateReqDst) { + tdDestroySVCreateTbReq(pCreateReqDst); + taosMemoryFree(pCreateReqDst); + } return code; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index a4b1be5a12..2d4c571d31 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2484,6 +2484,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { data += colSizes[col]; colSizes[col] = htonl(colSizes[col]); +// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, htonl(colSizes[col]), colSizes[col]); } *actualLen = dataLen; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 52f7190dd6..6dedeaf8f3 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -973,9 +973,8 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) { code = 0; goto _OVER; } else if (pStb->uid != createReq.suid) { - mError("stb:%s, already exist while create, input suid:%" PRId64 " not match with exist suid:%" PRId64, - createReq.name, createReq.suid, pStb->uid); - terrno = TSDB_CODE_MND_STABLE_UID_NOT_MATCH; + mInfo("stb:%s, alter table does not need to be done, because table is deleted", createReq.name); + code = 0; goto _OVER; } else if (createReq.tagVer > 0 || createReq.colVer > 0) { int32_t tagDelta = createReq.tagVer - pStb->tagVer; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index fd91d438e7..680cbbf929 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -270,8 +270,8 @@ int32_t tqReaderSetSubmitReq2(STqReader *pReader, void *msgStr, int32_t msgLen, // int32_t tqReaderSetDataMsg(STqReader *pReader, const SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock2(STqReader *pReader); bool tqNextDataBlockFilterOut2(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader); -int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas); +int32_t tqRetrieveDataBlock2(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData** pSubmitTbDataRet); +int32_t tqRetrieveTaosxBlock2(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData** pSubmitTbDataRet); // int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader); // int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 1b6da6c50d..753c42ff33 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -688,6 +688,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { .ver = pHead->version, }; if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, + TD_VID(pTq->pVnode), req.subKey); + return -1; } if (taosxRsp.blockNum > 0 /* threshold */) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); @@ -704,7 +707,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } else { /*A(pHandle->fetchMeta);*/ /*A(IS_META_MSG(pHead->msgType));*/ - tqDebug("fetch meta msg, ver:%" PRId64 ", type:%d", pHead->version, pHead->msgType); + tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); tqOffsetResetToLog(&metaRsp.rspOffset, fetchVer); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; diff --git a/source/dnode/vnode/src/tq/tqExec.c b/source/dnode/vnode/src/tq/tqExec.c index 40a82cc8e8..7896b931dc 100644 --- a/source/dnode/vnode/src/tq/tqExec.c +++ b/source/dnode/vnode/src/tq/tqExec.c @@ -34,7 +34,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t return 0; } -static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRsp) { +static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader->pSchemaWrapper); if (pSW == NULL) { return -1; @@ -43,7 +43,7 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, SMqDataRsp* pRs return 0; } -static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { +static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); // TODO add reference to gurantee success @@ -153,7 +153,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta if (pRsp->withTbName) { if (pOffset->type == TMQ_OFFSET__LOG) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, 1) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, 1) < 0) { continue; } } else { @@ -163,7 +163,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta } if (pRsp->withSchema) { if (pOffset->type == TMQ_OFFSET__LOG) { - tqAddBlockSchemaToRsp(pExec, (SMqDataRsp*)pRsp); + tqAddBlockSchemaToRsp(pExec, pRsp); } else { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); taosArrayPush(pRsp->blockSchema, &pSW); @@ -241,13 +241,14 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { /*int64_t uid = pExec->pExecReader->msgIter.uid;*/ int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -255,23 +256,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int64_t uid = pExec->pExecReader->lastBlkUid; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } -#endif + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; + } + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); @@ -294,12 +305,13 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR /*}*/ taosArrayClear(pBlocks); taosArrayClear(pSchemas); - if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas) < 0) { + SSubmitTbData* pSubmitTbDataRet = NULL; + if (tqRetrieveTaosxBlock2(pReader, pBlocks, pSchemas, &pSubmitTbDataRet) < 0) { if (terrno == TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND) continue; } if (pRsp->withTbName) { int64_t uid = pExec->pExecReader->lastBlkUid; - if (tqAddTbNameToRsp(pTq, uid, (SMqDataRsp*)pRsp, taosArrayGetSize(pBlocks)) < 0) { + if (tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)) < 0) { taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); taosArrayDestroyP(pSchemas, (FDelete)tDeleteSSchemaWrapper); pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); @@ -307,22 +319,33 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR continue; } } - if (pHandle->fetchMeta) { -#if 0 - SSubmitBlk* pBlk = pReader->pBlock; - int32_t schemaLen = htonl(pBlk->schemaLen); - if (schemaLen > 0) { - if (pRsp->createTableNum == 0) { - pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - } - void* createReq = taosMemoryCalloc(1, schemaLen); - memcpy(createReq, pBlk->data, schemaLen); - taosArrayPush(pRsp->createTableLen, &schemaLen); - taosArrayPush(pRsp->createTableReq, &createReq); - pRsp->createTableNum++; + if (pHandle->fetchMeta && pSubmitTbDataRet->pCreateTbReq != NULL) { + if (pRsp->createTableNum == 0) { + pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); + pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); } -#endif + + int32_t code = TSDB_CODE_SUCCESS; + uint32_t len = 0; + tEncodeSize(tEncodeSVCreateTbReq, pSubmitTbDataRet->pCreateTbReq, len, code); + if (TSDB_CODE_SUCCESS != code) { + continue; + } + void* createReq = taosMemoryCalloc(1, len); + SEncoder encoder = {0}; + tEncoderInit(&encoder, createReq, len); + code = tEncodeSVCreateTbReq(&encoder, pSubmitTbDataRet->pCreateTbReq); + if (code < 0) { + tEncoderClear(&encoder); + taosMemoryFree(createReq); + continue; + } + + taosArrayPush(pRsp->createTableLen, &len); + taosArrayPush(pRsp->createTableReq, &createReq); + pRsp->createTableNum++; + + tEncoderClear(&encoder); } /*tqAddBlockDataToRsp(&block, (SMqDataRsp*)pRsp, taosArrayGetSize(block.pDataBlock),*/ /*pTq->pVnode->config.tsdbCfg.precision);*/ @@ -340,13 +363,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxR } } } - taosArrayDestroy(pBlocks); taosArrayDestroy(pSchemas); - - if (pRsp->blockNum == 0) { - return -1; - } +// if (pRsp->blockNum == 0) { +// return -1; +// } return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index eb9c0c3eeb..99f0ed7703 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,7 +332,7 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) { while (tqNextDataBlock2(pReader)) { // TODO mem free memset(&ret->data, 0, sizeof(SSDataBlock)); - int32_t code = tqRetrieveDataBlock2(&ret->data, pReader); + int32_t code = tqRetrieveDataBlock2(&ret->data, pReader, NULL); if (code != 0 || ret->data.info.rows == 0) { continue; } @@ -454,12 +454,13 @@ bool tqNextDataBlockFilterOut2(STqReader* pReader, SHashObj* filterOutUids) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pReader->tbIdHash == NULL) return true; + if (filterOutUids == NULL) return true; - void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); + void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); if (ret == NULL) { return true; } + pReader->nextBlk++; } tDestroySSubmitReq2(&pReader->submit, TSDB_MSG_FLG_DECODE); @@ -550,7 +551,7 @@ int32_t tqScanSubmitSplit(SArray* pBlocks, SArray* schemas, STqReader* pReader) } #endif -int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { +int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); ASSERT(pReader->nextBlk < blockSz); @@ -559,6 +560,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); pReader->nextBlk++; + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; int32_t sversion = pSubmitTbData->sver; int64_t suid = pSubmitTbData->suid; int64_t uid = pSubmitTbData->uid; @@ -670,7 +672,7 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { sourceIdx++; } else if (pCol->cid == pColData->info.colId) { for (int32_t i = 0; i < pCol->nVal; i++) { - tColDataGetValue(pCol, sourceIdx, &colVal); + tColDataGetValue(pCol, i, &colVal); #if 0 void* val = NULL; if (IS_STR_DATA_TYPE(colVal.type)) { @@ -710,7 +712,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { for (int32_t i = 0; i < numOfRows; i++) { SRow* pRow = taosArrayGetP(pRows, i); - int32_t targetIdx = 0; int32_t sourceIdx = 0; for (int32_t j = 0; j < colActual; j++) { @@ -743,7 +744,6 @@ int32_t tqRetrieveDataBlock2(SSDataBlock* pBlock, STqReader* pReader) { } sourceIdx++; - targetIdx++; break; } else { ASSERT(0); @@ -1006,14 +1006,258 @@ FAIL: } #endif -int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas) { - SSDataBlock block = {0}; - if (tqRetrieveDataBlock2(&block, pReader) == 0) { - taosArrayPush(blocks, &block); - SSchemaWrapper* pSW = tCloneSSchemaWrapper(pReader->pSchemaWrapper); - taosArrayPush(schemas, &pSW); - return 0; +int32_t tqRetrieveTaosxBlock2(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { + tqDebug("tq reader retrieve data block %p, %d", pReader->msg2.msgStr, pReader->nextBlk); + + SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); + pReader->nextBlk++; + + if(pSubmitTbDataRet) *pSubmitTbDataRet = pSubmitTbData; + int32_t sversion = pSubmitTbData->sver; + int64_t suid = pSubmitTbData->suid; + int64_t uid = pSubmitTbData->uid; + pReader->lastBlkUid = uid; + + taosMemoryFree(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchema == NULL) { + tqWarn("vgId:%d, cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 + "), version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, suid, sversion); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; } + + tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, uid, sversion, 1); + if (pReader->pSchemaWrapper == NULL) { + tqWarn("vgId:%d, cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", + pReader->pWalReader->pWal->cfg.vgId, uid, pReader->cachedSchemaVer); + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + + STSchema* pTschema = pReader->pSchema; + SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + int32_t numOfRows = 0; + + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + SColData* pCol = taosArrayGet(pCols, 0); + numOfRows = pCol->nVal; + } else { + SArray* pRows = pSubmitTbData->aRowP; + numOfRows = taosArrayGetSize(pRows); + } + + int32_t curRow = 0; + int32_t lastRow = 0; + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + if (assigned == NULL) return -1; + + // convert and scan one block + if (pSubmitTbData->flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { + SArray* pCols = pSubmitTbData->aCol; + int32_t numOfCols = taosArrayGetSize(pCols); + for (int32_t i = 0; i < numOfRows; i++) { + bool buildNew = false; + + for (int32_t j = 0; j < numOfCols; j++){ + SColData* pCol = taosArrayGet(pCols, j); + SColVal colVal; + tColDataGetValue(pCol, i, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColData* pCol = taosArrayGet(pCols, sourceIdx); + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + + if (pCol->cid < pColData->info.colId) { + sourceIdx++; + } else if (pCol->cid == pColData->info.colId) { + tColDataGetValue(pCol, i, &colVal); + + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + + curRow++; + } + } else { + SArray* pRows = pSubmitTbData->aRowP; + for (int32_t i = 0; i < numOfRows; i++) { + SRow* pRow = taosArrayGetP(pRows, i); + bool buildNew = false; + + for (int32_t j = 0; j < pTschema->numOfCols; j++){ + SColVal colVal; + tRowGet(pRow, pTschema, j, &colVal); + if (curRow == 0) { + assigned[j] = !COL_VAL_IS_NONE(&colVal); + buildNew = true; + } else { + bool currentRowAssigned = !COL_VAL_IS_NONE(&colVal); + if (currentRowAssigned != assigned[j]) { + assigned[j] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + if (taosArrayGetSize(blocks) > 0) { + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + lastRow = curRow; + } + + SSDataBlock block = {0}; + SSchemaWrapper* pSW = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if(pSW == NULL){ + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + if (tqMaskBlock(pSW, &block, pSchemaWrapper, assigned) < 0) { + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + tqDebug("vgId:%d, build new block, col %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(block.pDataBlock)); + + block.info.id.uid = uid; + block.info.version = pReader->msg2.ver; + if (blockDataEnsureCapacity(&block, numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + blockDataFreeRes(&block); + tDeleteSSchemaWrapper(pSW); + goto FAIL; + } + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &pSW); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + + tqDebug("vgId:%d, taosx scan, block num: %d", pReader->pWalReader->pWal->cfg.vgId, + (int32_t)taosArrayGetSize(blocks)); + + int32_t targetIdx = 0; + int32_t sourceIdx = 0; + int32_t colActual = blockDataGetNumOfCols(pBlock); + while (targetIdx < colActual) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx); + SColVal colVal; + tRowGet(pRow, pTschema, sourceIdx, &colVal); + + if (colVal.cid < pColData->info.colId) { + sourceIdx++; + } else if (colVal.cid == pColData->info.colId) { + if (IS_STR_DATA_TYPE(colVal.type)) { + if (colVal.value.pData != NULL) { + char val[65535 + 2]; + memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); + varDataSetLen(val, colVal.value.nData); + if (colDataAppend(pColData, curRow - lastRow, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } else { + colDataAppendNULL(pColData, curRow - lastRow); + } + } else { + if (colDataAppend(pColData, curRow - lastRow, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { + goto FAIL; + } + } + sourceIdx++; + targetIdx++; + } + } + curRow++; + } + } + + SSDataBlock* pLastBlock = taosArrayGetLast(blocks); + pLastBlock->info.rows = curRow - lastRow; + + taosMemoryFree(assigned); + return 0; + + FAIL: + taosMemoryFree(assigned); return -1; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0d27332502..7213cb65da 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1554,7 +1554,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; @@ -1963,7 +1963,7 @@ FETCH_NEXT_BLOCK: while (tqNextDataBlock2(pInfo->tqReader)) { SSDataBlock block = {0}; - int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader); + int32_t code = tqRetrieveDataBlock2(&block, pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || block.info.rows == 0) { continue; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index 5791a634df..fbdaf6fc9f 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -609,7 +609,7 @@ static int bindFileds(SBoundColInfo* pBoundInfo, SSchema* pSchema, TAOS_FIELD* f } int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreateTbReq* pCreateTb, TAOS_FIELD* tFields, - int numFields) { + int numFields, bool needChangeLength) { STableDataCxt* pTableCxt = NULL; int ret = insGetTableDataCxt(((SVnodeModifyOpStmt*)(query->pRoot))->pTableBlockHashObj, &pTableMeta->uid, sizeof(pTableMeta->uid), pTableMeta, &pCreateTb, &pTableCxt, true); @@ -679,10 +679,15 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate pStart += BitmapLen(numOfRows); } char* pData = pStart; +// uError("rawBlockBindData col bytes:%d, type:%d, size:%d, htonl size:%d", pColSchema->bytes, pColSchema->type, colLength[c], htonl(colLength[c])); tColDataAddValueByDataBlock(pCol, pColSchema->type, pColSchema->bytes, numOfRows, offset, pData); fields += sizeof(int8_t) + sizeof(int32_t); - pStart += colLength[c]; + if(needChangeLength) { + pStart += htonl(colLength[c]); + }else{ + pStart += colLength[c]; + } } end: diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 7ca18f293c..b2bf6eec0b 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -43,9 +43,9 @@ class TDTestCase: tdSql.execute('use db_taosx') tdSql.query("show tables") if drop: - tdSql.checkRows(10) + tdSql.checkRows(11) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -63,20 +63,20 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") tdSql.execute('use abc1') tdSql.query("show tables") if drop: - tdSql.checkRows(10) + tdSql.checkRows(11) else: - tdSql.checkRows(15) + tdSql.checkRows(16) tdSql.query("select * from jt order by i") tdSql.checkRows(2) tdSql.checkData(0, 1, 1) @@ -94,13 +94,13 @@ class TDTestCase: tdSql.checkData(1, 5, "sttb4") tdSql.query("select * from stt order by ts") - tdSql.checkRows(2) + tdSql.checkRows(3) tdSql.checkData(0, 1, 1) - tdSql.checkData(1, 1, 21) + tdSql.checkData(2, 1, 21) tdSql.checkData(0, 2, 2) - tdSql.checkData(1, 2, 21) + tdSql.checkData(2, 2, 21) tdSql.checkData(0, 5, "stt3") - tdSql.checkData(1, 5, "stt4") + tdSql.checkData(2, 5, "stt4") return diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index 69bf52bb1a..fb05ae262b 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -71,6 +71,8 @@ static void msg_process(TAOS_RES* msg) { printf("write raw data type: %d\n", raw.raw_type); int32_t ret = tmq_write_raw(pConn, raw); printf("write raw data: %s\n", tmq_err2str(ret)); + ASSERT(ret == 0); + tmq_free_raw(raw); taos_close(pConn); } @@ -361,7 +363,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes){ } taos_free_result(pRes); - pRes = taos_query(pConn, "insert into stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " + pRes = taos_query(pConn, "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + 1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') " "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')"); if (taos_errno(pRes) != 0) { printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));