diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6b02d8f985..8501d88be0 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -210,7 +210,7 @@ SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo); const SSchemaWrapper* qExtractSchemaFromTask(qTaskInfo_t tinfo); const char* qExtractTbnameFromTask(qTaskInfo_t tinfo); -const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo); +//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo); void* qExtractReaderFromStreamScanner(void* scanner); diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 94162ff768..578f8148da 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -53,9 +53,7 @@ #define TMQ_META_VERSION "1.0" static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); - static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } - static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow, cJSON** pJson) { int32_t code = TSDB_CODE_SUCCESS; @@ -1856,6 +1854,12 @@ static threadlocal SHashObj* pCreateTbHash = NULL; static threadlocal SHashObj* pNameHash = NULL; static threadlocal SHashObj* pMetaHash = NULL; +typedef struct{ + SVgroupInfo vgInfo; + int64_t uid; + int64_t suid; +}tbInfo; + static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW){ char* p = (char*)rawData; // | version | total length | total rows | blankFill | total columns | flag seg| block group id | column schema | each @@ -1878,7 +1882,7 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe char* fieldName = pSW->pSchema[i].name; if (strcmp(pColSchema->name, fieldName) == 0) { - if (*fields != pColSchema->type && *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { + if (*fields != pColSchema->type || *(int32_t*)(fields + sizeof(int8_t)) != pColSchema->bytes) { return true; } break; @@ -1892,11 +1896,6 @@ static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrappe return false; } -static void tmqFreeMeta(void *data){ - STableMeta* pTableMeta = *(STableMeta**)data; - taosMemoryFree(pTableMeta); -} - static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t dataLen) { if (taos == NULL || data == NULL) { SET_ERROR_MSG("taos:%p or data:%p is NULL", taos, data); @@ -1933,23 +1932,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da goto end; } - if (pVgHash == NULL){ - pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pVgHash); - } - if (pCreateTbHash == NULL){ - pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - RAW_NULL_CHECK(pCreateTbHash); - } - if (pNameHash == NULL){ - pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - RAW_NULL_CHECK(pNameHash); - } - if (pMetaHash == NULL){ - pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - RAW_NULL_CHECK(pMetaHash); - taosHashSetFreeFp(pMetaHash, tmqFreeMeta); - } struct SCatalog* pCatalog = NULL; RAW_RETURN_CHECK(catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog)); @@ -1959,90 +1941,119 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da conn.requestObjRefId = pRequest->self; conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); - RAW_RETURN_CHECK(smlInitHandle(&pQuery)); + int retry = 0; + while(1){ + RAW_RETURN_CHECK(smlInitHandle(&pQuery)); - uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum); - while (++rspObj.resIter < rspObj.dataRsp.blockNum) { - void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); - RAW_NULL_CHECK(pRetrieve); - if (!rspObj.dataRsp.withSchema) { - goto end; - } - - const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); - RAW_NULL_CHECK(tbName); - - int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter); - RAW_NULL_CHECK(suid); - - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); - SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; - (void)strcpy(pName.dbname, pRequest->pDb); - (void)strcpy(pName.tname, tbName); - - // find schema data info - SVCreateTbReq* pCreateReqDst = NULL; - if (type == RES_TYPE__TMQ_METADATA){ - pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); - if (pCreateReqDst == NULL) { - RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); - pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); - } - } - - int32_t vgId = 0; - SVgroupInfo* vg = (SVgroupInfo*)taosHashGet(pNameHash, tbName, strlen(tbName)); - if (vg == NULL) { - SVgroupInfo vgTmp = {0}; - RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &vgTmp)); - RAW_RETURN_CHECK(taosHashPut(pNameHash, tbName, strlen(tbName), &vgTmp, sizeof(SVgroupInfo))); - code = taosHashPut(pVgHash, &vgTmp.vgId, sizeof(vgTmp.vgId), &vgTmp, sizeof(SVgroupInfo)); - code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; - RAW_RETURN_CHECK(code); - vgId = vgTmp.vgId; - } else { - vgId = vg->vgId; - } - - SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); - RAW_NULL_CHECK(pSW); - void* rawData = getRawDataFromRes(pRetrieve); - RAW_NULL_CHECK(rawData); - - STableMeta* pTableMeta = NULL; - STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, suid, LONG_BYTES); - if (pTableMetaTmp == NULL || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { - if (pCreateReqDst) { // change stable name to get meta - (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); - } - RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); - code = taosHashPut(pMetaHash, suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); - if (code != 0){ - taosMemoryFree(pTableMeta); + uDebug(LOG_ID_TAG " write raw data type:%d block num:%d", LOG_ID_VALUE, type, rspObj.dataRsp.blockNum); + while (++rspObj.resIter < rspObj.dataRsp.blockNum) { + void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter); + RAW_NULL_CHECK(pRetrieve); + if (!rspObj.dataRsp.withSchema) { goto end; } - if (pCreateReqDst) { - pTableMeta->vgId = vgId; - pTableMeta->uid = pCreateReqDst->uid; - pCreateReqDst->ctb.suid = pTableMeta->suid; + + const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter); + RAW_NULL_CHECK(tbName); + +// int64_t* suid = taosArrayGet(rspObj.dataRsp.blockSuid, rspObj.resIter); +// RAW_NULL_CHECK(suid); + + uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + (void)strcpy(pName.dbname, pRequest->pDb); + (void)strcpy(pName.tname, tbName); + + // find schema data info + SVCreateTbReq* pCreateReqDst = NULL; + if (type == RES_TYPE__TMQ_METADATA){ + pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); + if (pCreateReqDst == NULL) { + RAW_RETURN_CHECK(buildCreateTbMap(&rspObj.dataRsp, pCreateTbHash)); + pCreateReqDst = (SVCreateTbReq*)taosHashGet(pCreateTbHash, tbName, strlen(tbName)); + } } - }else{ - pTableMeta = *pTableMetaTmp; - } + STableMeta* pTableMeta = NULL; + tbInfo* tmpInfo = (tbInfo*)taosHashGet(pNameHash, tbName, strlen(tbName)); + if (tmpInfo == NULL || retry > 0) { + tbInfo info = {0}; - char err[ERR_MSG_LEN] = {0}; - code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); - if (code != TSDB_CODE_SUCCESS) { - SET_ERROR_MSG("table:%s, err:%s", tbName, err); - goto end; + RAW_RETURN_CHECK(catalogGetTableHashVgroup(pCatalog, &conn, &pName, &info.vgInfo)); + if (pCreateReqDst) { // change stable name to get meta + (void)strcpy(pName.tname, pCreateReqDst->ctb.stbName); + } + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); + info.uid = pTableMeta->uid; + if (pTableMeta->tableType == TSDB_CHILD_TABLE){ + info.suid = pTableMeta->suid; + } else { + info.suid = pTableMeta->uid; + } + code = taosHashPut(pMetaHash, &info.suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); + if (code != 0){ + taosMemoryFree(pTableMeta); + goto end; + } + if (pCreateReqDst) { + pTableMeta->vgId = info.vgInfo.vgId; + pTableMeta->uid = pCreateReqDst->uid; + pCreateReqDst->ctb.suid = pTableMeta->suid; + } + + code = taosHashPut(pNameHash, pName.tname, strlen(pName.tname), &info, sizeof(tbInfo)); + code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; + RAW_RETURN_CHECK(code); + code = taosHashPut(pVgHash, &info.vgInfo.vgId, sizeof(info.vgInfo.vgId), &info.vgInfo, sizeof(SVgroupInfo)); + code = (code == TSDB_CODE_DUP_KEY) ? 0 : code; + RAW_RETURN_CHECK(code); + } + + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter); + RAW_NULL_CHECK(pSW); + void* rawData = getRawDataFromRes(pRetrieve); + RAW_NULL_CHECK(rawData); + + if (pTableMeta == NULL || retry > 0){ + STableMeta** pTableMetaTmp = (STableMeta**)taosHashGet(pMetaHash, &tmpInfo->suid, LONG_BYTES); + if (pTableMetaTmp == NULL || retry > 0 || needRefreshMeta(rawData, *pTableMetaTmp, pSW)) { + RAW_RETURN_CHECK(catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta)); + code = taosHashPut(pMetaHash, &tmpInfo->suid, LONG_BYTES, &pTableMeta, POINTER_BYTES); + if (code != 0){ + taosMemoryFree(pTableMeta); + goto end; + } + + }else{ + pTableMeta = *pTableMetaTmp; + pTableMeta->uid = tmpInfo->uid; + pTableMeta->vgId = tmpInfo->vgInfo.vgId; + } + } + + char err[ERR_MSG_LEN] = {0}; + code = rawBlockBindData(pQuery, pTableMeta, rawData, pCreateReqDst, pSW, pSW->nCols, true, err, ERR_MSG_LEN, true); + if (code != TSDB_CODE_SUCCESS) { + SET_ERROR_MSG("table:%s, err:%s", tbName, err); + goto end; + } } + RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; + + if (NEED_CLIENT_HANDLE_ERROR(code)) { + uInfo("write raw retry:%d/3 end code:%d, msg:%s", retry, code, tstrerror(code)); + if (retry++ >= 3) { + break; + } + qDestroyQuery(pQuery); + pQuery = NULL; + rspObj.resIter = -1; + continue; + } + break; } - RAW_RETURN_CHECK(smlBuildOutput(pQuery, pVgHash)); - - launchQueryImpl(pRequest, pQuery, true, NULL); - code = pRequest->code; - end: uDebug(LOG_ID_TAG " write raw metadata return, msg:%s", LOG_ID_VALUE, tstrerror(code)); if (type == RES_TYPE__TMQ_METADATA){ @@ -2056,18 +2067,6 @@ static int32_t tmqWriteRawImpl(TAOS* taos, uint16_t type, void* data, int32_t da return code; } -void tmqClean() { - taosHashCleanup(pMetaHash); - taosHashCleanup(pNameHash); - void* pIter = taosHashIterate(pCreateTbHash, NULL); - while (pIter) { - tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); - pIter = taosHashIterate(pCreateTbHash, pIter); - } - taosHashCleanup(pCreateTbHash); - taosHashCleanup(pVgHash); -} - static void processSimpleMeta(SMqMetaRsp* pMetaRsp, cJSON** meta) { if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { processCreateStb(pMetaRsp, meta); @@ -2263,7 +2262,53 @@ void tmq_free_raw(tmq_raw_data raw) { (void)memset(terrMsg, 0, ERR_MSG_LEN); } +static void tmqFreeMeta(void *data){ + STableMeta* pTableMeta = *(STableMeta**)data; + taosMemoryFree(pTableMeta); +} + +void freeHash() { + taosHashCleanup(pMetaHash); + taosHashCleanup(pNameHash); + void* pIter = taosHashIterate(pCreateTbHash, NULL); + while (pIter) { + tDestroySVCreateTbReq(pIter, TSDB_MSG_FLG_DECODE); + pIter = taosHashIterate(pCreateTbHash, pIter); + } + taosHashCleanup(pCreateTbHash); + taosHashCleanup(pVgHash); +} + +static int32_t initHash(){ + int32_t code = 0; + if (pVgHash == NULL){ + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pVgHash); + } + if (pCreateTbHash == NULL){ + pCreateTbHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + RAW_NULL_CHECK(pCreateTbHash); + } + if (pNameHash == NULL){ + pNameHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pNameHash); + } + if (pMetaHash == NULL){ + pMetaHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + RAW_NULL_CHECK(pMetaHash); + taosHashSetFreeFp(pMetaHash, tmqFreeMeta); + } + return code; +end: + freeHash(); + return code; +} + static int32_t writeRawImpl(TAOS* taos, void* buf, uint32_t len, uint16_t type) { + int32_t code = initHash(); + if (code != 0) { + return code; + } if (type == TDMT_VND_CREATE_STB) { return taosCreateStb(taos, buf, len); } else if (type == TDMT_VND_ALTER_STB) { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4cbb808187..6d0323e20a 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1064,7 +1064,7 @@ END: tDestroySMqHbReq(&req); if (tmrId != NULL) { bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); - tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq hb:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag); + tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag); } int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId); if (ret != 0){ diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 41516d325a..e585212372 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10716,42 +10716,42 @@ _exit: return code; } -int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){ - for (int32_t i = 0; i < pRsp->blockNum; i++) { - if (pRsp->withTbName) { - int64_t* suid = taosArrayGet(pRsp->blockSuid, i); - if (suid != NULL){ - TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid)); - } - } - } - return 0; -} +//int32_t tEncodeSuidArray(SEncoder *pEncoder, const SMqDataRsp *pRsp){ +// for (int32_t i = 0; i < pRsp->blockNum; i++) { +// if (pRsp->withTbName) { +// int64_t* suid = taosArrayGet(pRsp->blockSuid, i); +// if (suid != NULL){ +// TAOS_CHECK_RETURN(tEncodeI64(pEncoder, *suid)); +// } +// } +// } +// return 0; +//} int32_t tEncodeMqDataRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_RETURN(tEncodeMqDataRspCommon(pEncoder, pRsp)); TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pRsp->sleepTime)); - TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp)); +// TAOS_CHECK_RETURN(tEncodeSuidArray(pEncoder, pRsp)); return 0; } -int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){ - if (!tDecodeIsEnd(pDecoder)) { - if (pRsp->withTbName) { - if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) { - TAOS_CHECK_RETURN(terrno); - } - } - - for (int32_t i = 0; i < pRsp->blockNum; i++) { - int64_t suid = 0; - TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid)); - if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) { - TAOS_CHECK_RETURN(terrno); - } - } - } - return 0; -} +//int32_t tDecodeSuidArray(SDecoder *pDecoder, SMqDataRsp *pRsp){ +// if (!tDecodeIsEnd(pDecoder)) { +// if (pRsp->withTbName) { +// if ((pRsp->blockSuid = taosArrayInit(pRsp->blockNum, sizeof(int64_t))) == NULL) { +// TAOS_CHECK_RETURN(terrno); +// } +// } +// +// for (int32_t i = 0; i < pRsp->blockNum; i++) { +// int64_t suid = 0; +// TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &suid)); +// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL) { +// TAOS_CHECK_RETURN(terrno); +// } +// } +// } +// return 0; +//} int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) { int32_t code = 0; int32_t lino; @@ -10828,9 +10828,9 @@ int32_t tDecodeMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { if (!tDecodeIsEnd(pDecoder)) { TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pRsp->sleepTime)); } - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp)); - } +// if (!tDecodeIsEnd(pDecoder)) { +// TAOS_CHECK_RETURN(tDecodeSuidArray(pDecoder, pRsp)); +// } return 0; } @@ -10844,8 +10844,8 @@ static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) { pRsp->blockSchema = NULL; taosArrayDestroyP(pRsp->blockTbName, (FDelete)taosMemoryFree); pRsp->blockTbName = NULL; - taosArrayDestroy(pRsp->blockSuid); - pRsp->blockSuid = NULL; +// taosArrayDestroy(pRsp->blockSuid); +// pRsp->blockSuid = NULL; tOffsetDestroy(&pRsp->reqOffset); tOffsetDestroy(&pRsp->rspOffset); } @@ -10865,7 +10865,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const SMqDataRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeBinary(pEncoder, createTableReq, createTableLen)); } } - TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp)); +// TAOS_CHECK_EXIT(tEncodeSuidArray(pEncoder, pRsp)); _exit: return code; @@ -10897,9 +10897,9 @@ int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } } } - if (!tDecodeIsEnd(pDecoder)) { - TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp)); - } +// if (!tDecodeIsEnd(pDecoder)) { +// TAOS_CHECK_EXIT(tDecodeSuidArray(pDecoder, pRsp)); +// } _exit: return code; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index a95437ab0d..68702754aa 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -77,14 +77,16 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i tqError("failed to push tbName to blockTbName:%s", tbName); continue; } - int64_t suid = 0; - if(mr.me.type == TSDB_CHILD_TABLE){ - suid = mr.me.ctbEntry.suid; - } - if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){ - tqError("failed to push suid to blockSuid:%"PRId64, suid); - continue; - } +// int64_t suid = 0; +// if(mr.me.type == TSDB_CHILD_TABLE){ +// suid = mr.me.ctbEntry.suid; +// }else{ +// suid = mr.me.uid; +// } +// if(taosArrayPush(pRsp->blockSuid, &suid) == NULL){ +// tqError("failed to push suid to blockSuid:%"PRId64, suid); +// continue; +// } } metaReaderClear(&mr); return 0; @@ -227,11 +229,11 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); continue; } - int64_t suid = qExtractSuidFromTask(task); - if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){ - tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId); - continue; - } +// int64_t suid = qExtractSuidFromTask(task); +// if (taosArrayPush(pRsp->blockSuid, &suid) == NULL){ +// tqError("vgId:%d, failed to add suid to rsp msg", pTq->pVnode->config.vgId); +// continue; +// } } if (pRsp->withSchema) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 891e55d91d..6acfe6b074 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -50,11 +50,10 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); - pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t)); +// pRsp->blockSuid = taosArrayInit(0, sizeof(int64_t)); if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || - pRsp->blockTbName == NULL || pRsp->blockSchema == NULL || - pRsp->blockSuid == NULL) { + pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { if (pRsp->blockData != NULL) { taosArrayDestroy(pRsp->blockData); pRsp->blockData = NULL; @@ -75,10 +74,10 @@ static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { pRsp->blockSchema = NULL; } - if (pRsp->blockSuid != NULL) { - taosArrayDestroy(pRsp->blockSuid); - pRsp->blockSuid = NULL; - } +// if (pRsp->blockSuid != NULL) { +// taosArrayDestroy(pRsp->blockSuid); +// pRsp->blockSuid = NULL; +// } return terrno; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 08b7ba0e05..b5af516f75 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1212,10 +1212,10 @@ const char* qExtractTbnameFromTask(qTaskInfo_t tinfo) { return pTaskInfo->streamInfo.tbName; } -const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) { - SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; - return pTaskInfo->streamInfo.suid; -} +//const int64_t qExtractSuidFromTask(qTaskInfo_t tinfo) { +// SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; +// return pTaskInfo->streamInfo.suid; +//} SMqBatchMetaRsp* qStreamExtractMetaMsg(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; @@ -1499,7 +1499,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT cleanupQueryTableDataCond(&pTaskInfo->streamInfo.tableCond); tstrncpy(pTaskInfo->streamInfo.tbName, mtInfo.tbName, TSDB_TABLE_NAME_LEN); - pTaskInfo->streamInfo.suid = mtInfo.suid; + pTaskInfo->streamInfo.suid = mtInfo.suid == 0 ? mtInfo.uid : mtInfo.suid; tDeleteSchemaWrapper(pTaskInfo->streamInfo.schema); pTaskInfo->streamInfo.schema = mtInfo.schema; diff --git a/source/libs/parser/src/parInsertUtil.c b/source/libs/parser/src/parInsertUtil.c index f29ed79412..8adf32d2dd 100644 --- a/source/libs/parser/src/parInsertUtil.c +++ b/source/libs/parser/src/parInsertUtil.c @@ -963,11 +963,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate ret = TSDB_CODE_INVALID_PARA; goto end; } - if (tFields != NULL && numFields > boundInfo->numOfBound) { - if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound); - ret = TSDB_CODE_INVALID_PARA; - goto end; - } +// if (tFields != NULL && numFields > boundInfo->numOfBound) { +// if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d bigger than num of bound cols:%d", numFields, boundInfo->numOfBound); +// ret = TSDB_CODE_INVALID_PARA; +// goto end; +// } if (tFields == NULL && numOfCols != boundInfo->numOfBound) { if (errstr != NULL) snprintf(errstr, errstrLen, "numFields:%d not equal to num of bound cols:%d", numOfCols, boundInfo->numOfBound); ret = TSDB_CODE_INVALID_PARA; @@ -1037,6 +1037,11 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate pStart += numOfRows * sizeof(int32_t); } else { pStart += BitmapLen(numOfRows); +// for(int k = 0; k < numOfRows; k++) { +// if(!colDataIsNull_f(offset, k) && pColSchema->type == TSDB_DATA_TYPE_INT){ +// printf("colName:%s,val:%d", fieldName, *(int32_t*)(pStart + k * sizeof(int32_t))); +// } +// } } char* pData = pStart; diff --git a/tests/system-test/7-tmq/tmq_taosx.py b/tests/system-test/7-tmq/tmq_taosx.py index 4e90aefe7c..5047ada1d1 100644 --- a/tests/system-test/7-tmq/tmq_taosx.py +++ b/tests/system-test/7-tmq/tmq_taosx.py @@ -131,14 +131,14 @@ class TDTestCase: tdSql.checkData(0, 2, 1) tdSql.query("select * from ct3 order by c1 desc") - tdSql.checkRows(2) + tdSql.checkRows(5) tdSql.checkData(0, 1, 51) tdSql.checkData(0, 4, 940) tdSql.checkData(1, 1, 23) tdSql.checkData(1, 4, None) tdSql.query("select * from st1 order by ts") - tdSql.checkRows(8) + tdSql.checkRows(14) tdSql.checkData(0, 1, 1) tdSql.checkData(1, 1, 3) tdSql.checkData(4, 1, 4) @@ -180,7 +180,7 @@ class TDTestCase: tdSql.checkData(6, 8, None) tdSql.query("select * from ct1") - tdSql.checkRows(4) + tdSql.checkRows(7) tdSql.query("select * from ct2") tdSql.checkRows(0) diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index d49c7e4ad4..cd70dd88f5 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -133,7 +133,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); if (taos_errno(pRes) != 0) { - printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + printf("failed to create child table ct0, reason:%s\n", taos_errstr(pRes)); return -1; } taos_free_result(pRes); @@ -176,7 +176,7 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { pRes = taos_query( pConn, "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " - "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(1626006833703, 23, 32, 's21ds')"); if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1; @@ -190,6 +190,41 @@ int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { } taos_free_result(pRes); + pRes = taos_query(pConn, "insert into ct1 values(1736006813600, -32222, 43, 'ewb', 99)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 drop column c4"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(1736006833600, -4223, 344, 'bfs')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(1766006833600, -4432, 4433, 'e23wb', 9349)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); if (taos_errno(pRes) != 0) { printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); @@ -597,6 +632,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); tmq_conf_set(conf, "msg.consume.excluded", "1"); +// tmq_conf_set(conf, "session.timeout.ms", "1000000"); // tmq_conf_set(conf, "max.poll.interval.ms", "20000"); if (g_conf.snapShot) { @@ -637,6 +673,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 5000); if (tmqmessage) { cnt++; + printf("cnt:%d\n", cnt); msg_process(tmqmessage); taos_free_result(tmqmessage); } else { @@ -845,6 +882,8 @@ void initLogFile() { "{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\"," "\"colType\":5}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\"," "\"colType\":8,\"colLength\":64}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\"," @@ -992,6 +1031,8 @@ void initLogFile() { "{\"name\":\"t1\",\"type\":4,\"value\":3000}],\"createList\":[]}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\"," "\"colType\":5}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":6,\"colName\":\"c4\"}", + "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":5,\"colName\":\"c4\",\"colType\":5}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":7,\"colName\":\"c3\"," "\"colType\":8,\"colLength\":64}", "{\"type\":\"alter\",\"tableType\":\"super\",\"tableName\":\"st1\",\"alterType\":1,\"colName\":\"t2\","