diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7e95d55ec7..fe65911780 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -624,14 +624,64 @@ _return: return pRequest; } -/*typedef SMqConsumeRsp tmq_message_t;*/ +static char *formatTimestamp(char *buf, int64_t val, int precision) { + time_t tt; + int32_t ms = 0; + if (precision == TSDB_TIME_PRECISION_NANO) { + tt = (time_t)(val / 1000000000); + ms = val % 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + tt = (time_t)(val / 1000000); + ms = val % 1000000; + } else { + tt = (time_t)(val / 1000); + ms = val % 1000; + } + + /* comment out as it make testcases like select_with_tags.sim fail. + but in windows, this may cause the call to localtime crash if tt < 0, + need to find a better solution. + if (tt < 0) { + tt = 0; + } + */ + +#ifdef WINDOWS + if (tt < 0) tt = 0; +#endif + if (tt <= 0 && ms < 0) { + tt--; + if (precision == TSDB_TIME_PRECISION_NANO) { + ms += 1000000000; + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + ms += 1000000; + } else { + ms += 1000; + } + } + + struct tm *ptm = localtime(&tt); + size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); + + if (precision == TSDB_TIME_PRECISION_NANO) { + sprintf(buf + pos, ".%09d", ms); + } else if (precision == TSDB_TIME_PRECISION_MICRO) { + sprintf(buf + pos, ".%06d", ms); + } else { + sprintf(buf + pos, ".%03d", ms); + } + + return buf; +} int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { if (code == -1) { - printf("discard\n"); + printf("msg discard\n"); return 0; } - SMqClientVg* pVg = (SMqClientVg*)param; + char pBuf[128]; + SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param; + SMqClientVg* pVg = pParam->pVg; SMqConsumeRsp rsp; tDecodeSMqConsumeRsp(pMsg->pData, &rsp); if (rsp.numOfTopics == 0) { @@ -644,10 +694,11 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("-----msg begin----\n");*/ printf("|"); for (int32_t i = 0; i < colNum; i++) { - printf(" %15s |", rsp.schemas->pSchema[i].name); + if (i == 0) printf(" %25s |", rsp.schemas->pSchema[i].name); + else printf(" %15s |", rsp.schemas->pSchema[i].name); } printf("\n"); - printf("=====================================\n"); + printf("===============================================\n"); int32_t sz = taosArrayGetSize(rsp.pBlockData); for (int32_t i = 0; i < sz; i++) { SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); @@ -659,7 +710,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); switch(pColInfoData->info.type) { case TSDB_DATA_TYPE_TIMESTAMP: - printf(" %15lu |", *(uint64_t*)var); + formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); + printf(" %25s |", pBuf); break; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_UINT: @@ -789,6 +841,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); pReq->offset = pVg->currentOffset+1; + *ppVg = pVg; pReq->head.vgId = htonl(pVg->vgId); pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 8cca1e6152..13f438f8f5 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) { taos_free_result(pRes); - char* sql = "select * from st1"; + char* sql = "select ts, k from st1"; pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); taos_free_result(pRes); taos_close(pConn); @@ -657,18 +657,18 @@ TEST(testCase, tmq_subscribe_stb_Test) { tmq_subscribe(tmq, topic_list); while (1) { - tmq_message_t* msg = tmq_consume_poll(tmq, 1000); + tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); //printf("get msg\n"); //if (msg == NULL) break; } } -#endif TEST(testCase, tmq_consume_Test) { } TEST(testCase, tmq_commit_TEST) { } +#endif #if 0 TEST(testCase, projection_query_tables) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index daa3fa3722..d979ab06f6 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -212,7 +212,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA //pHandle->tbUid = pTableIdList; //} -static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { +static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); if (pHandle->tbIdHash == NULL) { return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4e5246b0a8..ade623a736 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -824,8 +824,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); + SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pMeta }; pTopic->buffer.output[i].pReadHandle = pReadHandle; - pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } taosArrayPush(pConsumer->topics, pTopic); tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8e9a4ae861..2d3085c82b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5071,6 +5071,8 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) { SStreamBlockScanInfo* pInfo = pOperator->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; + pBlockInfo->rows = 0; + while (tqNextDataBlock(pInfo->readerHandle)) { pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index b59be7d102..8381a7c585 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) { SExecTaskInfo* pTaskInfo = nullptr; DataSinkHandle sinkHandle = nullptr; - //int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); + int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); } #pragma GCC diagnostic pop diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 5d0600c820..20e8378c26 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -216,7 +216,8 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable } else if (needSeqScan(pPlanNode)) { return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); } - return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan); + int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan; + return createUserTableScanNode(pPlanNode, pTable, type); } static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { diff --git a/src/connector/grafanaplugin b/src/connector/grafanaplugin new file mode 160000 index 0000000000..4a4d79099b --- /dev/null +++ b/src/connector/grafanaplugin @@ -0,0 +1 @@ +Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d