fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-02-16 00:15:01 +08:00
parent 39f77e45fc
commit 47a6836c27
2 changed files with 9 additions and 10 deletions

View File

@ -395,7 +395,6 @@ static void preProcessSubmitMsg(STqHandle* pHandle, const SMqPollReq* pRequest,
for (int32_t i = 0; i < blockSz; i++){
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, i);
if (pSubmitTbData== NULL){
tqReaderClearSubmitMsg(pReader);
taosArrayDestroy(*rawList);
*rawList = NULL;
return;
@ -456,15 +455,15 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
}
// this submit data is metadata and previous data is rawdata
if (pRequest->rawData != 0 && *totalRows > 0 && rawList == NULL){
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
if (pRequest->rawData != 0 && *totalRows > 0 && pRsp->createTableNum == 0 && rawList == NULL){
tqDebug("poll rawdata split,vgId:%d, this wal submit data contains metadata and previous data is data", pTq->pVnode->config.vgId);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
goto END;
}
// this submit data is rawdata and previous data is metadata
if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
tqDebug("poll rawdata split,vgId:%d, this wal submit data is data and previous data is metadata", pTq->pVnode->config.vgId);
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
goto END;
}

View File

@ -145,22 +145,22 @@ def stopTaosd(str):
os.system(cmd)
def cleanDb():
dropTopic = f"taos -c {dnode1}/{cfg} -s \"drop topic if exists test\""
dropTopic = f"{taosd}/bin/taos -c {dnode1}/{cfg} -s \"drop topic if exists test\""
print("dropTopic:%s" % dropTopic)
os.system(dropTopic)
dropDb = f"taos -c {dnode2}/{cfg} -s \"drop database if exists test\""
dropDb = f"{taosd}/bin/taos -c {dnode2}/{cfg} -s \"drop database if exists test\""
print("dropDb:%s" % dropDb)
os.system(dropDb)
createDb = f"taos -c {dnode2}/{cfg} -s \"create database test vgroups {vgroups}\""
createDb = f"{taosd}/bin/taos -c {dnode2}/{cfg} -s \"create database test vgroups {vgroups}\""
print("createDb:%s" % createDb)
os.system(createDb)
def restartTaosd():
cmd1 = f"{taosd} -c {dnode1}/{cfg} > /dev/null 2>&1 &"
cmd2 = f"{taosd} -c {dnode2}/{cfg} > /dev/null 2>&1 &"
cmd1 = f"{taosd}/bin/taosd -c {dnode1}/{cfg} > /dev/null 2>&1 &"
cmd2 = f"{taosd}/bin/taosd -c {dnode2}/{cfg} > /dev/null 2>&1 &"
print("start taosd1 :%s" % cmd1)
print("start taosd2 :%s" % cmd2)
os.system(cmd1)
@ -168,7 +168,7 @@ def restartTaosd():
def runTaosx():
cmd = f"{taosx} run -f \"tmq://root:taosdata@localhost:6030/test?group.id=taosx-new-`date +%s`&timeout={taosxTimeout}s&experimental.snapshot.enable=false&auto.offset.reset=earliest&prefer=raw\" -t \"taos://root:taosdata@localhost:7030/test\" > {taosxLog}"
cmd = f"{taosx} run -f \"tmq://root:taosdata@localhost:6030/test?group.id=taosx-new-`date +%s`&timeout={taosxTimeout}s&experimental.snapshot.enable=false&auto.offset.reset=earliest&prefer=raw&libraryPath={taosd}/lib/libtaos.so\" -t \"taos://root:taosdata@localhost:7030/test?libraryPath={taosd}/lib/libtaos.so\" > {taosxLog}"
print("run taosx:%s" % cmd)
os.system(cmd)