mq support stable subscribe

This commit is contained in:
Liu Jicong 2022-01-28 14:32:21 +08:00
parent 60a1ae8ce4
commit 610c812ed7
8 changed files with 71 additions and 13 deletions

View File

@ -624,14 +624,64 @@ _return:
return pRequest; return pRequest;
} }
/*typedef SMqConsumeRsp tmq_message_t;*/ static char *formatTimestamp(char *buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm *ptm = localtime(&tt);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == -1) { if (code == -1) {
printf("discard\n"); printf("msg discard\n");
return 0; return 0;
} }
SMqClientVg* pVg = (SMqClientVg*)param; char pBuf[128];
SMqConsumeCbParam* pParam = (SMqConsumeCbParam*)param;
SMqClientVg* pVg = pParam->pVg;
SMqConsumeRsp rsp; SMqConsumeRsp rsp;
tDecodeSMqConsumeRsp(pMsg->pData, &rsp); tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
if (rsp.numOfTopics == 0) { if (rsp.numOfTopics == 0) {
@ -644,10 +694,11 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
/*printf("-----msg begin----\n");*/ /*printf("-----msg begin----\n");*/
printf("|"); printf("|");
for (int32_t i = 0; i < colNum; i++) { for (int32_t i = 0; i < colNum; i++) {
printf(" %15s |", rsp.schemas->pSchema[i].name); if (i == 0) printf(" %25s |", rsp.schemas->pSchema[i].name);
else printf(" %15s |", rsp.schemas->pSchema[i].name);
} }
printf("\n"); printf("\n");
printf("=====================================\n"); printf("===============================================\n");
int32_t sz = taosArrayGetSize(rsp.pBlockData); int32_t sz = taosArrayGetSize(rsp.pBlockData);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i); SSDataBlock* pDataBlock = taosArrayGet(rsp.pBlockData, i);
@ -659,7 +710,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch(pColInfoData->info.type) { switch(pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
printf(" %15lu |", *(uint64_t*)var); formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break; break;
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT:
@ -789,6 +841,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blocking_time, int32_t
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs)); pTopic->nextVgIdx = (pTopic->nextVgIdx + 1 % taosArrayGetSize(pTopic->vgs));
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
pReq->offset = pVg->currentOffset+1; pReq->offset = pVg->currentOffset+1;
*ppVg = pVg;
pReq->head.vgId = htonl(pVg->vgId); pReq->head.vgId = htonl(pVg->vgId);
pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); pReq->head.contLen = htonl(sizeof(SMqConsumeReq));

View File

@ -606,7 +606,7 @@ TEST(testCase, create_topic_stb_Test) {
taos_free_result(pRes); taos_free_result(pRes);
char* sql = "select * from st1"; char* sql = "select ts, k from st1";
pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); pRes = taos_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
taos_free_result(pRes); taos_free_result(pRes);
taos_close(pConn); taos_close(pConn);
@ -657,18 +657,18 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_subscribe(tmq, topic_list); tmq_subscribe(tmq, topic_list);
while (1) { while (1) {
tmq_message_t* msg = tmq_consume_poll(tmq, 1000); tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
//printf("get msg\n"); //printf("get msg\n");
//if (msg == NULL) break; //if (msg == NULL) break;
} }
} }
#endif
TEST(testCase, tmq_consume_Test) { TEST(testCase, tmq_consume_Test) {
} }
TEST(testCase, tmq_commit_TEST) { TEST(testCase, tmq_commit_TEST) {
} }
#endif
#if 0 #if 0
TEST(testCase, projection_query_tables) { TEST(testCase, projection_query_tables) {

View File

@ -212,7 +212,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA
//pHandle->tbUid = pTableIdList; //pHandle->tbUid = pTableIdList;
//} //}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) { static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK); pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) { if (pHandle->tbIdHash == NULL) {
return -1; return -1;

View File

@ -824,8 +824,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
SReadHandle handle = { .reader = pReadHandle, .meta = pTq->pMeta };
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer); tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);

View File

@ -5071,6 +5071,8 @@ static SSDataBlock* doStreamBlockScan(void* param, bool* newgroup) {
SStreamBlockScanInfo* pInfo = pOperator->info; SStreamBlockScanInfo* pInfo = pOperator->info;
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
pBlockInfo->rows = 0;
while (tqNextDataBlock(pInfo->readerHandle)) { while (tqNextDataBlock(pInfo->readerHandle)) {
pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo); pTaskInfo->code = tqRetrieveDataBlockInfo(pInfo->readerHandle, pBlockInfo);
if (pTaskInfo->code != TSDB_CODE_SUCCESS) { if (pTaskInfo->code != TSDB_CODE_SUCCESS) {

View File

@ -219,7 +219,7 @@ TEST(testCase, build_executor_tree_Test) {
SExecTaskInfo* pTaskInfo = nullptr; SExecTaskInfo* pTaskInfo = nullptr;
DataSinkHandle sinkHandle = nullptr; DataSinkHandle sinkHandle = nullptr;
//int32_t code = qCreateExecTask((void*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); int32_t code = qCreateExecTask((SReadHandle*) 1, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle);
} }
#pragma GCC diagnostic pop #pragma GCC diagnostic pop

View File

@ -216,7 +216,8 @@ static SPhyNode* createMultiTableScanNode(SQueryPlanNode* pPlanNode, SQueryTable
} else if (needSeqScan(pPlanNode)) { } else if (needSeqScan(pPlanNode)) {
return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan); return createUserTableScanNode(pPlanNode, pTable, OP_TableSeqScan);
} }
return createUserTableScanNode(pPlanNode, pTable, OP_DataBlocksOptScan); int32_t type = (pPlanNode->info.type == QNODE_TABLESCAN)? OP_DataBlocksOptScan:OP_StreamScan;
return createUserTableScanNode(pPlanNode, pTable, type);
} }
static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) {

@ -0,0 +1 @@
Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d