fix:[TS-4540]seperate block if insert in one sql when subscribe query
This commit is contained in:
parent
dbc76c5361
commit
80f55abc61
|
@ -223,10 +223,10 @@ typedef struct SStoreTqReader {
|
|||
bool (*tqReaderCurrentBlockConsumed)();
|
||||
|
||||
struct SWalReader* (*tqReaderGetWalReader)(); // todo remove it
|
||||
int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
// int32_t (*tqReaderRetrieveTaosXBlock)(); // todo remove it
|
||||
|
||||
int32_t (*tqReaderSetSubmitMsg)(); // todo remove it
|
||||
bool (*tqReaderNextBlockFilterOut)();
|
||||
// bool (*tqReaderNextBlockFilterOut)();
|
||||
} SStoreTqReader;
|
||||
|
||||
typedef struct SStoreSnapshotFn {
|
||||
|
|
|
@ -368,24 +368,11 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
|
|||
}
|
||||
}
|
||||
|
||||
// todo ignore the error in wal?
|
||||
bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
||||
SWalReader* pWalReader = pReader->pWalReader;
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
|
||||
uint64_t st = taosGetTimestampMs();
|
||||
while (1) {
|
||||
// try next message in wal file
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
|
||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||
pReader->nextBlk = 0;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
|
||||
while (pReader->nextBlk < numOfBlocks) {
|
||||
tqTrace("tq reader next data block %d/%d, len:%d %" PRId64, pReader->nextBlk, numOfBlocks, pReader->msg.msgLen,
|
||||
|
@ -400,33 +387,32 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) {
|
|||
tqTrace("tq reader return submit block, uid:%" PRId64, pSubmitTbData->uid);
|
||||
SSDataBlock* pRes = NULL;
|
||||
int32_t code = tqRetrieveDataBlock(pReader, &pRes, NULL);
|
||||
if (code == TSDB_CODE_SUCCESS && pRes->info.rows > 0) {
|
||||
if (pDataBlock == NULL) {
|
||||
pDataBlock = createOneDataBlock(pRes, true);
|
||||
} else {
|
||||
blockDataMerge(pDataBlock, pRes);
|
||||
}
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
return true;
|
||||
}
|
||||
} else {
|
||||
pReader->nextBlk += 1;
|
||||
tqTrace("tq reader discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
|
||||
}
|
||||
}
|
||||
|
||||
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
|
||||
pReader->msg.msgStr = NULL;
|
||||
|
||||
if (pDataBlock != NULL) {
|
||||
blockDataCleanup(pReader->pResBlock);
|
||||
copyDataBlock(pReader->pResBlock, pDataBlock);
|
||||
blockDataDestroy(pDataBlock);
|
||||
return true;
|
||||
} else {
|
||||
qTrace("stream scan return empty, all %d submit blocks consumed, %s", numOfBlocks, id);
|
||||
}
|
||||
|
||||
if (taosGetTimestampMs() - st > 1000) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// try next message in wal file
|
||||
if (walNextValidMsg(pWalReader) < 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
void* pBody = POINTER_SHIFT(pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg));
|
||||
int32_t bodyLen = pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg);
|
||||
int64_t ver = pWalReader->pHead->head.version;
|
||||
tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver);
|
||||
pReader->nextBlk = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -128,12 +128,12 @@ void initTqAPI(SStoreTqReader* pTq) {
|
|||
pTq->tqReaderCurrentBlockConsumed = tqCurrentBlockConsumed;
|
||||
|
||||
pTq->tqReaderGetWalReader = tqGetWalReader; // todo remove it
|
||||
pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
// pTq->tqReaderRetrieveTaosXBlock = tqRetrieveTaosxBlock; // todo remove it
|
||||
|
||||
pTq->tqReaderSetSubmitMsg = tqReaderSetSubmitMsg; // todo remove it
|
||||
pTq->tqGetResultBlock = tqGetResultBlock;
|
||||
|
||||
pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
// pTq->tqReaderNextBlockFilterOut = tqNextDataBlockFilterOut;
|
||||
pTq->tqGetResultBlockTime = tqGetResultBlockTime;
|
||||
|
||||
pTq->tqGetStreamExecProgress = tqGetStreamExecInfo;
|
||||
|
|
|
@ -359,8 +359,46 @@ class TDTestCase:
|
|||
finally:
|
||||
consumer.close()
|
||||
|
||||
def consume_TS_4540_Test(self):
|
||||
tdSql.execute(f'create database if not exists test')
|
||||
tdSql.execute(f'use test')
|
||||
tdSql.execute(f'CREATE STABLE `test`.`b` ( `time` TIMESTAMP , `task_id` NCHAR(1000) ) TAGS( `key` NCHAR(1000))')
|
||||
tdSql.execute(f"insert into `test`.b1 using `test`.`b`(key) tags('1') (time, task_id) values ('2024-03-04 12:50:01.000', '32') `test`.b2 using `test`.`b`(key) tags('2') (time, task_id) values ('2024-03-04 12:50:01.000', '43') `test`.b3 using `test`.`b`(key) tags('3') (time, task_id) values ('2024-03-04 12:50:01.000', '123456')")
|
||||
|
||||
tdSql.execute(f'create topic tt as select tbname,task_id,key from b')
|
||||
consumer_dict = {
|
||||
"group.id": "g1",
|
||||
"td.connect.user": "root",
|
||||
"td.connect.pass": "taosdata",
|
||||
"auto.offset.reset": "earliest",
|
||||
}
|
||||
consumer = Consumer(consumer_dict)
|
||||
|
||||
try:
|
||||
consumer.subscribe(["tt"])
|
||||
except TmqError:
|
||||
tdLog.exit(f"subscribe error")
|
||||
|
||||
try:
|
||||
while True:
|
||||
res = consumer.poll(1)
|
||||
if not res:
|
||||
break
|
||||
val = res.value()
|
||||
if val is None:
|
||||
continue
|
||||
for block in val:
|
||||
data = block.fetchall()
|
||||
print(data)
|
||||
if data != [('b1', '32', '1')] and data != [('b2', '43', '2')] and data != [('b3', '123456', '3')]:
|
||||
tdLog.exit(f"index = 0 table b1 error")
|
||||
|
||||
finally:
|
||||
consumer.close()
|
||||
|
||||
def run(self):
|
||||
self.consumeTest()
|
||||
self.consume_TS_4540_Test()
|
||||
|
||||
tdSql.prepare()
|
||||
self.checkWal1VgroupOnlyMeta()
|
||||
|
|
Loading…
Reference in New Issue