diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d9d3c7e297..ee9a08b504 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3185,6 +3185,7 @@ typedef struct { SArray* blockData; SArray* blockTbName; SArray* blockSchema; + // the following attributes are extended from SMqDataRsp int32_t createTableNum; SArray* createTableLen; SArray* createTableReq; diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 55d0453fbf..3167a4c361 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,6 +24,8 @@ #include "tref.h" #include "ttimer.h" +#define VG_POLL_IGNORE_TICK 100 + struct SMqMgmt { int8_t inited; tmr_h timer; @@ -126,16 +128,14 @@ enum { }; typedef struct { - // statistics - int64_t pollCnt; - // offset + int64_t pollCnt; STqOffsetVal committedOffset; STqOffsetVal currentOffset; - // connection info - int32_t vgId; - int32_t vgStatus; - int32_t vgSkipCnt; - SEpSet epSet; + int32_t vgId; + int32_t vgStatus; + int32_t vgSkipCnt; + int32_t vgIgnoreCnt; // once empty block is received, idle for ignoreCnt then start to poll data + SEpSet epSet; } SMqClientVg; typedef struct { @@ -1347,14 +1347,17 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp); tDecoderClear(&decoder); memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead)); + } else { // invalid rspType + tscError("consumer:0x%"PRIx64" invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); taosWriteQitem(tmq->mqueue, pRspWrapper); - tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, - tmq->mqueue->numOfItems, requestId); + tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, + tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId); + tsem_post(&tmq->rspSem); return 0; @@ -1400,6 +1403,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, + .vgIgnoreCnt = 0, }; taosArrayPush(pTopic->vgs, &clientVg); @@ -1700,6 +1704,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg->vgIgnoreCnt > 0) { + pVg->vgIgnoreCnt -= 1; + tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %d tick before poll", tmq->consumerId, tmq->epoch, + pVg->vgId, pVg->vgIgnoreCnt); + continue; + } + int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); @@ -1810,8 +1821,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { SMqClientVg* pVg = pollRspWrapper->vgHandle; - /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, - * rspMsg->msg.rspOffset);*/ pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp @@ -1836,9 +1845,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (pollRspWrapper->taosxRsp.blockNum == 0) { taosFreeQitem(pollRspWrapper); rspWrapper = NULL; - tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pollRspWrapper->reqId); + pVg->vgIgnoreCnt = VG_POLL_IGNORE_TICK; continue; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 82d29b37eb..0c7e95f8eb 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -898,7 +898,86 @@ TEST(clientCase, update_test) { } } -TEST(clientCase, subscription_test) { +TEST(clientCase, sub_db_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + // TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1"); + // if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + // printf("failed to create topic, code:%s", taos_errstr(pRes)); + // taos_free_result(pRes); + // return; + // } + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + tmq_conf_set(conf, "group.id", "cgrpNamedb"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "topic_db1"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 5000; + + int32_t count = 0; + + while (1) { + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[1024]; + int32_t rows = 0; + + const char* topicName = tmq_get_topic_name(pRes); + const char* dbName = tmq_get_db_name(pRes); + int32_t vgroupId = tmq_get_vgroup_id(pRes); + + printf("topic: %s\n", topicName); + printf("db: %s\n", dbName); + printf("vgroup id: %d\n", vgroupId); + + if (count ++ > 200) { + tmq_unsubscribe(tmq); + break; + } + + while (1) { + TAOS_ROW row = taos_fetch_row(pRes); + if (row == NULL) break; + + fields = taos_fetch_fields(pRes); + numOfFields = taos_field_count(pRes); + precision = taos_result_precision(pRes); + rows++; + taos_print_row(buf, row, fields, numOfFields); + printf("precision: %d, row content: %s\n", precision, buf); + } + } +// return rows; + } + + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + +TEST(clientCase, sub_tb_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9c7d68f256..9d3adf8692 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6822,8 +6822,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) { } void tDeleteSMqDataRsp(SMqDataRsp *pRsp) { - taosArrayDestroy(pRsp->blockDataLen); - pRsp->blockDataLen = NULL; + pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);; taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree); pRsp->blockData = NULL; taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index a45d6d1dec..ee2bd007ce 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -103,9 +103,9 @@ typedef struct { } STqHandle; typedef struct { - SMqDataRsp dataRsp; + SMqDataRsp* pDataRsp; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; - SRpcHandleInfo pInfo; + SRpcHandleInfo info; } STqPushEntry; struct STQ { @@ -147,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp); // int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp); int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); -int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp); +int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type); int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry); // tqMeta diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 27d351f915..0fe7f9a773 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -192,7 +192,7 @@ void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp); +int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type); int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer); int tqCommit(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 76a6a8a840..9dc862ec63 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -68,7 +68,13 @@ static void destroySTqHandle(void* data) { static void tqPushEntryFree(void* data) { STqPushEntry* p = *(void**)data; - tDeleteSMqDataRsp(&p->dataRsp); + if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { + tDeleteSMqDataRsp(p->pDataRsp); + } else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) { + tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp); + } + + taosMemoryFree(p->pDataRsp); taosMemoryFree(p); } @@ -166,8 +172,57 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, return 0; } +static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, + int64_t consumerId, int32_t type) { + int32_t len = 0; + int32_t code = 0; + + if (type == TMQ_MSG_TYPE__POLL_RSP) { + tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); + } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); + } + + if (code < 0) { + return -1; + } + + int32_t tlen = sizeof(SMqRspHead) + len; + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + return -1; + } + + ((SMqRspHead*)buf)->mqMsgType = type; + ((SMqRspHead*)buf)->epoch = epoch; + ((SMqRspHead*)buf)->consumerId = consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + + SEncoder encoder = {0}; + tEncoderInit(&encoder, abuf, len); + + if (type == TMQ_MSG_TYPE__POLL_RSP) { + tEncodeSMqDataRsp(&encoder, pRsp); + } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp); + } + + tEncoderClear(&encoder); + + SRpcMsg rsp = { + .info = *pRpcHandleInfo, + .pCont = buf, + .contLen = tlen, + .code = 0, + }; + + tmsgSendRsp(&rsp); + return 0; +} + int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { - SMqDataRsp* pRsp = &pPushEntry->dataRsp; + SMqDataRsp* pRsp = pPushEntry->pDataRsp; #if 0 A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); @@ -181,37 +236,40 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { } #endif - int32_t len = 0; - int32_t code = 0; - tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); +// int32_t len = 0; +// int32_t code = 0; +// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); +// if (code < 0) { +// return -1; +// } +// +// int32_t tlen = sizeof(SMqRspHead) + len; +// void* buf = rpcMallocCont(tlen); +// if (buf == NULL) { +// return -1; +// } +// +// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead)); +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); +// +// SEncoder encoder = {0}; +// tEncoderInit(&encoder, abuf, len); +// tEncodeSMqDataRsp(&encoder, pRsp); +// tEncoderClear(&encoder); +// +// SRpcMsg rsp = { +// .info = pPushEntry->pInfo, +// .pCont = buf, +// .contLen = tlen, +// .code = 0, +// }; +// +// tmsgSendRsp(&rsp); +// - if (code < 0) { - return -1; - } - - int32_t tlen = sizeof(SMqRspHead) + len; - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - - memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead)); - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, abuf, len); - tEncodeSMqDataRsp(&encoder, pRsp); - tEncoderClear(&encoder); - - SRpcMsg rsp = { - .info = pPushEntry->pInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - - tmsgSendRsp(&rsp); + SMqRspHead* pHeader = &pPushEntry->pDataRsp->head; + doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType); char buf1[80] = {0}; char buf2[80] = {0}; @@ -219,11 +277,10 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) { tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset); tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2); - return 0; } -int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) { +int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) { #if 0 A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); @@ -240,109 +297,127 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con } #endif - int32_t len = 0; - int32_t code = 0; - tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); - if (code < 0) { - return -1; - } - int32_t tlen = sizeof(SMqRspHead) + len; - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = pReq->consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, abuf, len); - tEncodeSMqDataRsp(&encoder, pRsp); - tEncoderClear(&encoder); - - SRpcMsg rsp = { - .info = pMsg->info, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&rsp); - - char buf1[80] = {0}; - char buf2[80] = {0}; - tFormatOffset(buf1, 80, &pRsp->reqOffset); - tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s", - TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); - - return 0; -} - -int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { -#if 0 - A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); - A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); - - if (pRsp->withSchema) { - A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); - } else { - A(taosArrayGetSize(pRsp->blockSchema) == 0); - } - - if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { - if (pRsp->blockNum > 0) { - A(pRsp->rspOffset.version > pRsp->reqOffset.version); - } else { - A(pRsp->rspOffset.version >= pRsp->reqOffset.version); - } - } -#endif - - int32_t len = 0; - int32_t code = 0; - tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); - if (code < 0) { - return -1; - } - int32_t tlen = sizeof(SMqRspHead) + len; - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; - ((SMqRspHead*)buf)->epoch = pReq->epoch; - ((SMqRspHead*)buf)->consumerId = pReq->consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - - SEncoder encoder = {0}; - tEncoderInit(&encoder, abuf, len); - tEncodeSTaosxRsp(&encoder, pRsp); - tEncoderClear(&encoder); - - SRpcMsg rsp = { - .info = pMsg->info, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - - tmsgSendRsp(&rsp); +// int32_t len = 0; +// int32_t code = 0; +// +// if (type == TMQ_MSG_TYPE__POLL_RSP) { +// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code); +// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { +// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code); +// } +// +// if (code < 0) { +// return -1; +// } +// +// int32_t tlen = sizeof(SMqRspHead) + len; +// void* buf = rpcMallocCont(tlen); +// if (buf == NULL) { +// return -1; +// } +// +// ((SMqRspHead*)buf)->mqMsgType = type; +// ((SMqRspHead*)buf)->epoch = pReq->epoch; +// ((SMqRspHead*)buf)->consumerId = pReq->consumerId; +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); +// +// SEncoder encoder = {0}; +// tEncoderInit(&encoder, abuf, len); +// +// if (type == TMQ_MSG_TYPE__POLL_RSP) { +// tEncodeSMqDataRsp(&encoder, pRsp); +// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) { +// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp); +// } +// +// tEncoderClear(&encoder); +// +// SRpcMsg rsp = { +// .info = pMsg->info, +// .pCont = buf, +// .contLen = tlen, +// .code = 0, +// }; +// +// tmsgSendRsp(&rsp); + doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type); char buf1[80] = {0}; char buf2[80] = {0}; tFormatOffset(buf1, 80, &pRsp->reqOffset); tFormatOffset(buf2, 80, &pRsp->rspOffset); - tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", + tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s", TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); + return 0; } +//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) { +//#if 0 +// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum); +// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum); +// +// if (pRsp->withSchema) { +// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum); +// } else { +// A(taosArrayGetSize(pRsp->blockSchema) == 0); +// } +// +// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) { +// if (pRsp->blockNum > 0) { +// A(pRsp->rspOffset.version > pRsp->reqOffset.version); +// } else { +// A(pRsp->rspOffset.version >= pRsp->reqOffset.version); +// } +// } +//#endif +// +// int32_t len = 0; +// int32_t code = 0; +// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code); +// if (code < 0) { +// return -1; +// } +// +// int32_t tlen = sizeof(SMqRspHead) + len; +// void* buf = rpcMallocCont(tlen); +// if (buf == NULL) { +// terrno = TSDB_CODE_OUT_OF_MEMORY; +// return -1; +// } +// +// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP; +// ((SMqRspHead*)buf)->epoch = pReq->epoch; +// ((SMqRspHead*)buf)->consumerId = pReq->consumerId; +// +// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); +// +// SEncoder encoder = {0}; +// tEncoderInit(&encoder, abuf, len); +// tEncodeSTaosxRsp(&encoder, pRsp); +// tEncoderClear(&encoder); +// +// SRpcMsg rsp = { +// .info = pMsg->info, +// .pCont = buf, +// .contLen = tlen, +// .code = 0, +// }; +// +// tmsgSendRsp(&rsp); +// +// char buf1[80] = {0}; +// char buf2[80] = {0}; +// tFormatOffset(buf1, 80, &pRsp->reqOffset); +// tFormatOffset(buf2, 80, &pRsp->rspOffset); +// +// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s", +// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2); +// return 0; +//} + static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) { return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG && pLeft->val.version <= pRight->val.version; @@ -441,9 +516,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su } #endif - /*A(subType == TOPIC_SUB_TYPE__COLUMN);*/ pRsp->withSchema = false; - return 0; } @@ -506,7 +579,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version); - int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); tDeleteSMqDataRsp(&dataRsp); *pBlockReturned = true; @@ -515,7 +588,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand STaosxRsp taosxRsp = {0}; tqInitTaosxRsp(&taosxRsp, pRequest); tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal)); - int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); + int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); +// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); tDeleteSTaosxRsp(&taosxRsp); *pBlockReturned = true; @@ -570,13 +644,13 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { - code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp); + code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); taosWUnLockLatch(&pTq->pushLock); return code; } taosWUnLockLatch(&pTq->pushLock); - code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); // NOTE: this pHandle->consumerId may have been changed already. tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 @@ -611,7 +685,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* } if (taosxRsp.blockNum > 0) { - code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); return code; } else { @@ -622,13 +696,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* ",version:%" PRId64, consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version); - } + } else { - if (offset.type == TMQ_OFFSET__LOG) { +// if (offset.type == TMQ_OFFSET__LOG) { int64_t fetchVer = offset.version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { tDeleteSTaosxRsp(&taosxRsp); + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -646,14 +721,18 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; +// if (terrno == 0) { // failed to seek to given ver, but no errors happen. +// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); +// return code; +// } else { // error happens, return to consumers + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); + tDeleteSTaosxRsp(&taosxRsp); + taosMemoryFreeClear(pCkHead); + return code; +// } } SWalCont* pHead = &pCkHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); @@ -663,6 +742,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* .msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg), .ver = pHead->version, }; + if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) { tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId, pRequest->subKey); @@ -671,7 +751,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* if (taosxRsp.blockNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp); + code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); return code; @@ -1027,15 +1107,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2; - int32_t version = 1; + int32_t ver1 = 1; SMetaInfo info = {0}; int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); if (code == TSDB_CODE_SUCCESS) { - version = info.skmVer; + ver1 = info.skmVer; } pTask->tbSink.pTSchema = - tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version); + tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1); ASSERT(pTask->tbSink.pTSchema); } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index c0ed787176..1ba44739d6 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -209,6 +209,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); int32_t len = msgLen - sizeof(SSubmitReq2Msg); + int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { // lock push mgr to avoid potential msg lost @@ -217,7 +218,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); if (numOfRegisteredPush > 0) { tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", - pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); + vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); SArray* cachedKeys = taosArrayInit(0, sizeof(void*)); SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t)); @@ -239,7 +240,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) void* pIter = NULL; while (1) { pIter = taosHashIterate(pTq->pPushMgr, pIter); - if (pIter == NULL) break; + if (pIter == NULL) { + break; + } + STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); @@ -248,23 +252,23 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) continue; } - if (pPushEntry->dataRsp.reqOffset.version >= ver) { - tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", - pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver); + SMqDataRsp* pRsp = pPushEntry->pDataRsp; + if (pRsp->reqOffset.version >= ver) { + tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId, + pRsp->reqOffset.version, ver); continue; } STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; - SMqDataRsp* pRsp = &pPushEntry->dataRsp; - // prepare scan mem data SPackedData submit = { .msgStr = data, .msgLen = len, .ver = ver, }; + qStreamSetScanMemData(task, submit); // here start to scan submit block to extract the subscribed data @@ -272,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; if (qExecTask(task, &pDataBlock, &ts) < 0) { - tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr()); + tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr()); } if (pDataBlock == NULL) { @@ -283,11 +287,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) pRsp->blockNum++; } - tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey, - pRsp->blockNum); + tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum); if (pRsp->blockNum > 0) { // set offset tqOffsetResetToLog(&pRsp->rspOffset, ver); + // remove from hash size_t kLen; void* key = taosHashGetKey(pIter, &kLen); @@ -309,6 +313,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key); } } + taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree); taosArrayDestroy(cachedKeyLens); taosMemoryFree(data); @@ -334,9 +339,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) }; tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq); - tqProcessSubmitReq(pTq, submit); } + if (msgType == TDMT_VND_DELETE) { tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver); } @@ -346,7 +351,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) } int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, - SMqDataRsp* pDataRsp) { + SMqDataRsp* pDataRsp, int32_t type) { uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); STqHandle* pTqHandle = pHandle; @@ -359,14 +364,22 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, return -1; } - pPushEntry->pInfo = pRpcMsg->info; + pPushEntry->info = pRpcMsg->info; memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN); - pDataRsp->withTbName = 0; - memcpy(&pPushEntry->dataRsp, pDataRsp, sizeof(SMqDataRsp)); - pPushEntry->dataRsp.head.consumerId = consumerId; - pPushEntry->dataRsp.head.epoch = pRequest->epoch; - pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + if (type == TMQ_MSG_TYPE__TAOSX_RSP) { + pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp)); + memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp)); + } else if (type == TMQ_MSG_TYPE__POLL_RSP) { + pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp)); + memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp)); + } + + SMqRspHead* pHead = &pPushEntry->pDataRsp->head; + pHead->consumerId = consumerId; + pHead->epoch = pRequest->epoch; + pHead->mqMsgType = type; + taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*)); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId, @@ -377,12 +390,14 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) { int32_t vgId = TD_VID(pTq->pVnode); STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen); + if (pEntry != NULL) { - uint64_t cId = (*pEntry)->dataRsp.head.consumerId; + uint64_t cId = (*pEntry)->pDataRsp->head.consumerId; ASSERT(consumerId == cId); tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId, (*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1); + if (rspConsumer) { // rsp the old consumer with empty block. tqPushDataRsp(pTq, *pEntry); } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index baf74a83e4..ffab7d92b8 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -221,12 +221,12 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) { if (msg == NULL) { // create raw scan - SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo)); if (NULL == pTaskInfo) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTaskInfo->cost.created = taosGetTimestampUs(); diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index c5ae44214a..89bf337a20 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -123,8 +123,9 @@ class TDTestCase: pre_insert = "insert into " sql = pre_insert - t = time.time() - startTs = int(round(t * 1000)) + # t = 1678609778776 #time.time() + t = 1600000000000 + startTs = t #int(round(t * 1000)) #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) for i in range(ctbNum): sql += " %s_%d values "%(stbName,i)