Merge pull request #22561 from taosdata/fix/TD-25651
fix:[TD-25651] reset epoch if consumer changed to avoid consumeing no…
This commit is contained in:
commit
6f514b2bee
|
@ -218,7 +218,16 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) {
|
||||||
|
|
||||||
if (strlen(oneTable->childTableName) == 0) {
|
if (strlen(oneTable->childTableName) == 0) {
|
||||||
SArray *dst = taosArrayDup(oneTable->tags, NULL);
|
SArray *dst = taosArrayDup(oneTable->tags, NULL);
|
||||||
RandTableName rName = {dst, oneTable->sTableName, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
|
ASSERT(oneTable->sTableNameLen < TSDB_TABLE_NAME_LEN);
|
||||||
|
char superName[TSDB_TABLE_NAME_LEN] = {0};
|
||||||
|
RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
|
||||||
|
if(tsSmlDot2Underline){
|
||||||
|
memcpy(superName, oneTable->sTableName, oneTable->sTableNameLen);
|
||||||
|
smlStrReplace(superName, oneTable->sTableNameLen);
|
||||||
|
rName.stbFullName = superName;
|
||||||
|
}else{
|
||||||
|
rName.stbFullName = oneTable->sTableName;
|
||||||
|
}
|
||||||
|
|
||||||
buildChildTableName(&rName);
|
buildChildTableName(&rName);
|
||||||
taosArrayDestroy(dst);
|
taosArrayDestroy(dst);
|
||||||
|
@ -230,6 +239,9 @@ void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tin
|
||||||
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
|
char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0};
|
||||||
size_t nLen = strlen(tinfo->childTableName);
|
size_t nLen = strlen(tinfo->childTableName);
|
||||||
memcpy(key, currElement->measure, currElement->measureLen);
|
memcpy(key, currElement->measure, currElement->measureLen);
|
||||||
|
if(tsSmlDot2Underline){
|
||||||
|
smlStrReplace(key, currElement->measureLen);
|
||||||
|
}
|
||||||
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
|
memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen);
|
||||||
void *uid =
|
void *uid =
|
||||||
taosHashGet(info->tableUids, key,
|
taosHashGet(info->tableUids, key,
|
||||||
|
|
|
@ -879,20 +879,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
} else {
|
} else {
|
||||||
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
// atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_store_32(&pHandle->epoch, 0);
|
||||||
|
|
||||||
// kill executing task
|
|
||||||
// if(tqIsHandleExec(pHandle)) {
|
|
||||||
// qTaskInfo_t pTaskInfo = pHandle->execHandle.task;
|
|
||||||
// if (pTaskInfo != NULL) {
|
|
||||||
// qKillTask(pTaskInfo, TSDB_CODE_SUCCESS);
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
// qStreamCloseTsdbReader(pTaskInfo);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// remove if it has been register in the push manager, and return one empty block to consumer
|
|
||||||
tqUnregisterPushHandle(pTq, pHandle);
|
tqUnregisterPushHandle(pTq, pHandle);
|
||||||
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,7 +67,7 @@ class TDTestCase:
|
||||||
tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`")
|
tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
|
|
||||||
tdSql.query(f"select * from {dbname}.t_fc70dec6677d4277c5d9799c4da806da order by times")
|
tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times")
|
||||||
tdSql.checkRows(2)
|
tdSql.checkRows(2)
|
||||||
tdSql.checkData(0, 1, 1.300000000)
|
tdSql.checkData(0, 1, 1.300000000)
|
||||||
tdSql.checkData(1, 1, 13.000000000)
|
tdSql.checkData(1, 1, 13.000000000)
|
||||||
|
|
|
@ -237,7 +237,7 @@ class TDTestCase:
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
totalConsumeRows += resultList[i]
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
if totalConsumeRows != expectrowcnt:
|
if totalConsumeRows < expectrowcnt:
|
||||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||||
tdLog.exit("tmq consume rows error!")
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
|
|
@ -1533,6 +1533,7 @@ int sml_ts3724_Test() {
|
||||||
|
|
||||||
const char *sql[] = {
|
const char *sql[] = {
|
||||||
"stb.2,t1=1 f1=283i32 1632299372000",
|
"stb.2,t1=1 f1=283i32 1632299372000",
|
||||||
|
"stb_2,t1=1 f1=283i32 1632299372000",
|
||||||
".stb2,t1=1 f1=106i32 1632299378000",
|
".stb2,t1=1 f1=106i32 1632299378000",
|
||||||
"stb2.,t1=1 f1=106i32 1632299378000",
|
"stb2.,t1=1 f1=106i32 1632299378000",
|
||||||
};
|
};
|
||||||
|
@ -1547,6 +1548,18 @@ int sml_ts3724_Test() {
|
||||||
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "select * from stb_2");
|
||||||
|
TAOS_ROW row = taos_fetch_row(pRes);
|
||||||
|
int numRows = taos_affected_rows(pRes);
|
||||||
|
ASSERT(numRows == 1);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "show stables");
|
||||||
|
row = taos_fetch_row(pRes);
|
||||||
|
numRows = taos_affected_rows(pRes);
|
||||||
|
ASSERT(numRows == 3);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
taos_close(taos);
|
taos_close(taos);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue