test(tmq): enable drop table case
This commit is contained in:
parent
9ad18b8c2d
commit
dacbfc03cd
|
@ -106,8 +106,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -167,6 +167,7 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
|
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
assert(tmq);
|
assert(tmq);
|
||||||
|
@ -239,7 +240,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||||
msg_process(tmqmessage);
|
msg_process(tmqmessage);
|
||||||
taos_free_result(tmqmessage);
|
taos_free_result(tmqmessage);
|
||||||
|
|
||||||
tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);
|
/*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/
|
||||||
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -90,6 +90,10 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
int32_t sversion = 1;
|
int32_t sversion = 1;
|
||||||
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
if (pHandle->sver != sversion || pHandle->cachedSchemaUid != pHandle->msgIter.suid) {
|
||||||
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
pHandle->pSchema = metaGetTbTSchema(pHandle->pVnodeMeta, pHandle->msgIter.uid, sversion);
|
||||||
|
if (pHandle->pSchema == NULL) {
|
||||||
|
tqError("cannot found schema for table: %ld, version %d", pHandle->msgIter.suid, pHandle->sver);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
// this interface use suid instead of uid
|
// this interface use suid instead of uid
|
||||||
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
|
pHandle->pSchemaWrapper = metaGetTableSchema(pHandle->pVnodeMeta, pHandle->msgIter.suid, sversion, true);
|
||||||
|
@ -190,7 +194,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
FAIL:
|
FAIL:
|
||||||
taosArrayDestroy(*ppCols);
|
if (*ppCols) taosArrayDestroy(*ppCols);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -235,8 +239,8 @@ int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
||||||
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
int tqReadHandleRemoveTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
|
||||||
ASSERT(pHandle->tbIdHash != NULL);
|
ASSERT(pHandle->tbIdHash != NULL);
|
||||||
|
|
||||||
for(int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tbUidList); i++) {
|
||||||
int64_t* pKey = (int64_t*) taosArrayGet(tbUidList, i);
|
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
|
||||||
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
|
taosHashRemove(pHandle->tbIdHash, pKey, sizeof(int64_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1377,7 +1377,7 @@ class TDTestCase:
|
||||||
|
|
||||||
self.tmqCase1(cfgPath, buildPath)
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
self.tmqCase2(cfgPath, buildPath)
|
self.tmqCase2(cfgPath, buildPath)
|
||||||
#self.tmqCase3(cfgPath, buildPath)
|
self.tmqCase3(cfgPath, buildPath)
|
||||||
self.tmqCase4(cfgPath, buildPath)
|
self.tmqCase4(cfgPath, buildPath)
|
||||||
self.tmqCase5(cfgPath, buildPath)
|
self.tmqCase5(cfgPath, buildPath)
|
||||||
self.tmqCase6(cfgPath, buildPath)
|
self.tmqCase6(cfgPath, buildPath)
|
||||||
|
|
Loading…
Reference in New Issue