diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ff9b573464..e76422ee34 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -80,6 +80,14 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v */ qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols, SSchemaWrapper** pSchema); +/** + * set the task Id, usually used by message queue process + * @param tinfo + * @param taskId + * @param queryId + */ +void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId); + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo); /** * Set multiple input data blocks for the stream scan. diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e95f716313..60c4850dde 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1855,14 +1855,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 " vg total:%" PRId64 " total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, tmq->totalRows, pVg->numOfRows, + tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); return pRsp; } } else { - tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pDataRsp->head.epoch, consumerEpoch); + SMqClientVg* pVg = pollRspWrapper->vgHandle; + tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pVg->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1881,8 +1882,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { taosFreeQitem(pollRspWrapper); return pRsp; } else { - tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } @@ -1928,8 +1929,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { return pRsp; } else { - tscDebug("consumer:0x%" PRIx64 " msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", - tmq->consumerId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + tscDebug("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tmq->consumerId, pollRspWrapper->vgHandle->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 0c7e95f8eb..5523e1f777 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -112,7 +112,7 @@ void createNewTable(TAOS* pConn, int32_t index) { } taos_free_result(pRes); - for(int32_t i = 0; i < 2000; i += 20) { + for(int32_t i = 0; i < 100; i += 20) { char sql[1024] = {0}; sprintf(sql, "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" @@ -167,6 +167,80 @@ void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { printf("success, code:%d\n", code); } +void* doConsumeData(void* param) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + tmq_conf_set(conf, "group.id", "cgrpName12"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "topic_t2"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 25000; + + int32_t count = 0; + + while (1) { + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[1024]; + + const char* topicName = tmq_get_topic_name(pRes); + const char* dbName = tmq_get_db_name(pRes); + int32_t vgroupId = tmq_get_vgroup_id(pRes); + + printf("topic: %s\n", topicName); + printf("db: %s\n", dbName); + printf("vgroup id: %d\n", vgroupId); + + while (1) { + TAOS_ROW row = taos_fetch_row(pRes); + if (row == NULL) { + break; + } + + fields = taos_fetch_fields(pRes); + numOfFields = taos_field_count(pRes); + precision = taos_result_precision(pRes); + taos_print_row(buf, row, fields, numOfFields); + totalRows += 1; +// printf("precision: %d, row content: %s\n", precision, buf); + } + + taos_free_result(pRes); + } else { + break; + } + } + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return NULL; +} + } // namespace int main(int argc, char** argv) { @@ -188,7 +262,6 @@ TEST(clientCase, driverInit_Test) { TEST(clientCase, connect_Test) { taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); @@ -708,7 +781,7 @@ TEST(clientCase, projection_query_tables) { // } // taos_free_result(pRes); - TAOS_RES* pRes = taos_query(pConn, "use abc2"); + TAOS_RES* pRes = taos_query(pConn, "use abc1"); taos_free_result(pRes); pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); @@ -730,7 +803,7 @@ TEST(clientCase, projection_query_tables) { } taos_free_result(pRes); - for (int32_t i = 0; i < 2; ++i) { + for (int32_t i = 0; i < 10000; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); } @@ -970,28 +1043,23 @@ TEST(clientCase, sub_db_test) { taos_print_row(buf, row, fields, numOfFields); printf("precision: %d, row content: %s\n", precision, buf); } + taos_free_result(pRes); } -// return rows; } fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } TEST(clientCase, sub_tb_test) { + taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); - // TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1"); - // if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { - // printf("failed to create topic, code:%s", taos_errstr(pRes)); - // taos_free_result(pRes); - // return; - // } - tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); - tmq_conf_set(conf, "group.id", "cgrpName"); + tmq_conf_set(conf, "group.id", "cgrpName27"); tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "auto.offset.reset", "earliest"); @@ -1004,10 +1072,11 @@ TEST(clientCase, sub_tb_test) { // 创建订阅 topics 列表 tmq_list_t* topicList = tmq_list_new(); - tmq_list_append(topicList, "topic_t1"); + tmq_list_append(topicList, "topic_t2"); // 启动订阅 tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); TAOS_FIELD* fields = NULL; @@ -1015,7 +1084,7 @@ TEST(clientCase, sub_tb_test) { int32_t precision = 0; int32_t totalRows = 0; int32_t msgCnt = 0; - int32_t timeout = 5000; + int32_t timeout = 25000; int32_t count = 0; @@ -1023,7 +1092,6 @@ TEST(clientCase, sub_tb_test) { TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); if (pRes) { char buf[1024]; - int32_t rows = 0; const char* topicName = tmq_get_topic_name(pRes); const char* dbName = tmq_get_db_name(pRes); @@ -1033,27 +1101,45 @@ TEST(clientCase, sub_tb_test) { printf("db: %s\n", dbName); printf("vgroup id: %d\n", vgroupId); - if (count ++ > 200) { - tmq_unsubscribe(tmq); - break; - } - while (1) { TAOS_ROW row = taos_fetch_row(pRes); - if (row == NULL) break; + if (row == NULL) { + break; + } fields = taos_fetch_fields(pRes); numOfFields = taos_field_count(pRes); precision = taos_result_precision(pRes); - rows++; taos_print_row(buf, row, fields, numOfFields); + totalRows += 1; printf("precision: %d, row content: %s\n", precision, buf); } + + taos_free_result(pRes); +// if ((++count) > 1) { +// break; +// } + } else { + break; } -// return rows; } + tmq_consumer_close(tmq); + taos_close(pConn); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +TEST(clientCase, sub_tb_mt_test) { + taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + TdThread qid[20] = {0}; + + for(int32_t i = 0; i < 1; ++i) { + taosThreadCreate(&qid[i], NULL, doConsumeData, NULL); + } + + for(int32_t i = 0; i < 4; ++i) { + taosThreadJoin(qid[i], NULL); + } +} + #pragma GCC diagnostic pop diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6a707bf4b0..c5cd17cbf4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -639,6 +639,8 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // lock taosWLockLatch(&pTq->pushLock); + + qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, &offset); // till now, all data has been transferred to consumer, new data needs to push client once arrived. diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0617c83113..fd610b5bcd 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -835,6 +835,8 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void doDestroyTask(SExecTaskInfo* pTaskInfo); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); +char* buildTaskId(uint64_t taskId, uint64_t queryId); + int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, int32_t vgId, char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index c01a297f80..7a6e737d48 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -159,6 +159,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } } +void qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId) { + SExecTaskInfo* pTaskInfo = tinfo; + pTaskInfo->id.queryId = queryId; + + taosMemoryFreeClear(pTaskInfo->id.str); + pTaskInfo->id.str = buildTaskId(taskId, queryId); +} + int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) { if (tinfo == NULL) { return TSDB_CODE_APP_ERROR; @@ -1099,7 +1107,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList); - qDebug("switch to next table %" PRId64 " ts %" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); + qDebug("switch to table uid:%" PRId64 " ts:%" PRId64 "% "PRId64 " rows returned", uid, ts, pInfo->pTableScanOp->resultInfo.totalRows); pInfo->pTableScanOp->resultInfo.totalRows = 0; bool found = false; @@ -1138,7 +1146,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT pTableScanInfo->base.cond.twindows.skey = oldSkey; pTableScanInfo->scanTimes = 0; - qDebug("tsdb reader offset seek to uid %" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, + qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 ", table cur set to %d , all table num %d", uid, ts, pTableScanInfo->currentTable, numOfTables); } else { qError("invalid pOffset->type:%d", pOffset->type); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index b32041847c..67174c3267 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1959,7 +1959,7 @@ void destroyAggOperatorInfo(void* param) { taosMemoryFreeClear(param); } -static char* buildTaskId(uint64_t taskId, uint64_t queryId) { +char* buildTaskId(uint64_t taskId, uint64_t queryId) { char* p = taosMemoryMalloc(64); int32_t offset = 6; @@ -1971,7 +1971,6 @@ static char* buildTaskId(uint64_t taskId, uint64_t queryId) { offset += tintToHex(queryId, &p[offset]); p[offset] = 0; - return p; }