fix(tmq): fix error in tmq.
This commit is contained in:
parent
7f2aae69d4
commit
731a2138df
|
@ -3185,6 +3185,7 @@ typedef struct {
|
||||||
SArray* blockData;
|
SArray* blockData;
|
||||||
SArray* blockTbName;
|
SArray* blockTbName;
|
||||||
SArray* blockSchema;
|
SArray* blockSchema;
|
||||||
|
// the following attributes are extended from SMqDataRsp
|
||||||
int32_t createTableNum;
|
int32_t createTableNum;
|
||||||
SArray* createTableLen;
|
SArray* createTableLen;
|
||||||
SArray* createTableReq;
|
SArray* createTableReq;
|
||||||
|
|
|
@ -24,6 +24,8 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
|
||||||
|
#define VG_POLL_IGNORE_TICK 100
|
||||||
|
|
||||||
struct SMqMgmt {
|
struct SMqMgmt {
|
||||||
int8_t inited;
|
int8_t inited;
|
||||||
tmr_h timer;
|
tmr_h timer;
|
||||||
|
@ -126,16 +128,14 @@ enum {
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// statistics
|
int64_t pollCnt;
|
||||||
int64_t pollCnt;
|
|
||||||
// offset
|
|
||||||
STqOffsetVal committedOffset;
|
STqOffsetVal committedOffset;
|
||||||
STqOffsetVal currentOffset;
|
STqOffsetVal currentOffset;
|
||||||
// connection info
|
int32_t vgId;
|
||||||
int32_t vgId;
|
int32_t vgStatus;
|
||||||
int32_t vgStatus;
|
int32_t vgSkipCnt;
|
||||||
int32_t vgSkipCnt;
|
int32_t vgIgnoreCnt; // once empty block is received, idle for ignoreCnt then start to poll data
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1347,14 +1347,17 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
|
tDecodeSTaosxRsp(&decoder, &pRspWrapper->taosxRsp);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
|
memcpy(&pRspWrapper->taosxRsp, pMsg->pData, sizeof(SMqRspHead));
|
||||||
|
} else { // invalid rspType
|
||||||
|
tscError("consumer:0x%"PRIx64" invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 ", put poll res into mqueue, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
|
||||||
tmq->mqueue->numOfItems, requestId);
|
tmq->consumerId, rspType, vgId, tmq->mqueue->numOfItems, requestId);
|
||||||
|
|
||||||
tsem_post(&tmq->rspSem);
|
tsem_post(&tmq->rspSem);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
|
@ -1400,6 +1403,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
.epSet = pVgEp->epSet,
|
.epSet = pVgEp->epSet,
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
.vgSkipCnt = 0,
|
.vgSkipCnt = 0,
|
||||||
|
.vgIgnoreCnt = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
taosArrayPush(pTopic->vgs, &clientVg);
|
taosArrayPush(pTopic->vgs, &clientVg);
|
||||||
|
@ -1700,6 +1704,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
|
|
||||||
for (int j = 0; j < numOfVg; j++) {
|
for (int j = 0; j < numOfVg; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
|
if (pVg->vgIgnoreCnt > 0) {
|
||||||
|
pVg->vgIgnoreCnt -= 1;
|
||||||
|
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %d tick before poll", tmq->consumerId, tmq->epoch,
|
||||||
|
pVg->vgId, pVg->vgIgnoreCnt);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||||
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
if (vgStatus == TMQ_VG_STATUS__WAIT) {
|
||||||
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
|
||||||
|
@ -1810,8 +1821,6 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
|
|
||||||
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
|
||||||
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
SMqClientVg* pVg = pollRspWrapper->vgHandle;
|
||||||
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
|
|
||||||
* rspMsg->msg.rspOffset);*/
|
|
||||||
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
|
pVg->currentOffset = pollRspWrapper->metaRsp.rspOffset;
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
// build rsp
|
// build rsp
|
||||||
|
@ -1836,9 +1845,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
if (pollRspWrapper->taosxRsp.blockNum == 0) {
|
||||||
taosFreeQitem(pollRspWrapper);
|
taosFreeQitem(pollRspWrapper);
|
||||||
rspWrapper = NULL;
|
rspWrapper = NULL;
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId,
|
||||||
pollRspWrapper->reqId);
|
pollRspWrapper->reqId);
|
||||||
|
pVg->vgIgnoreCnt = VG_POLL_IGNORE_TICK;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -898,7 +898,86 @@ TEST(clientCase, update_test) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(clientCase, subscription_test) {
|
TEST(clientCase, sub_db_test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
|
||||||
|
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
|
// printf("failed to create topic, code:%s", taos_errstr(pRes));
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
|
tmq_conf_set(conf, "group.id", "cgrpNamedb");
|
||||||
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
|
// 创建订阅 topics 列表
|
||||||
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
|
tmq_list_append(topicList, "topic_db1");
|
||||||
|
|
||||||
|
// 启动订阅
|
||||||
|
tmq_subscribe(tmq, topicList);
|
||||||
|
tmq_list_destroy(topicList);
|
||||||
|
|
||||||
|
TAOS_FIELD* fields = NULL;
|
||||||
|
int32_t numOfFields = 0;
|
||||||
|
int32_t precision = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
int32_t msgCnt = 0;
|
||||||
|
int32_t timeout = 5000;
|
||||||
|
|
||||||
|
int32_t count = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
|
if (pRes) {
|
||||||
|
char buf[1024];
|
||||||
|
int32_t rows = 0;
|
||||||
|
|
||||||
|
const char* topicName = tmq_get_topic_name(pRes);
|
||||||
|
const char* dbName = tmq_get_db_name(pRes);
|
||||||
|
int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||||
|
|
||||||
|
printf("topic: %s\n", topicName);
|
||||||
|
printf("db: %s\n", dbName);
|
||||||
|
printf("vgroup id: %d\n", vgroupId);
|
||||||
|
|
||||||
|
if (count ++ > 200) {
|
||||||
|
tmq_unsubscribe(tmq);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_ROW row = taos_fetch_row(pRes);
|
||||||
|
if (row == NULL) break;
|
||||||
|
|
||||||
|
fields = taos_fetch_fields(pRes);
|
||||||
|
numOfFields = taos_field_count(pRes);
|
||||||
|
precision = taos_result_precision(pRes);
|
||||||
|
rows++;
|
||||||
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
printf("precision: %d, row content: %s\n", precision, buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(clientCase, sub_tb_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
|
|
@ -6822,8 +6822,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
||||||
taosArrayDestroy(pRsp->blockDataLen);
|
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);;
|
||||||
pRsp->blockDataLen = NULL;
|
|
||||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||||
pRsp->blockData = NULL;
|
pRsp->blockData = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
|
|
@ -103,9 +103,9 @@ typedef struct {
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqDataRsp dataRsp;
|
SMqDataRsp* pDataRsp;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRpcHandleInfo pInfo;
|
SRpcHandleInfo info;
|
||||||
} STqPushEntry;
|
} STqPushEntry;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
|
@ -147,7 +147,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||||
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
||||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
|
||||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
||||||
|
|
||||||
// tqMeta
|
// tqMeta
|
||||||
|
|
|
@ -192,7 +192,7 @@ void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp);
|
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
|
||||||
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
|
|
|
@ -68,7 +68,13 @@ static void destroySTqHandle(void* data) {
|
||||||
|
|
||||||
static void tqPushEntryFree(void* data) {
|
static void tqPushEntryFree(void* data) {
|
||||||
STqPushEntry* p = *(void**)data;
|
STqPushEntry* p = *(void**)data;
|
||||||
tDeleteSMqDataRsp(&p->dataRsp);
|
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tDeleteSMqDataRsp(p->pDataRsp);
|
||||||
|
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(p->pDataRsp);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,8 +172,57 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||||
|
int64_t consumerId, int32_t type) {
|
||||||
|
int32_t len = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
|
void* buf = rpcMallocCont(tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMqRspHead*)buf)->mqMsgType = type;
|
||||||
|
((SMqRspHead*)buf)->epoch = epoch;
|
||||||
|
((SMqRspHead*)buf)->consumerId = consumerId;
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, abuf, len);
|
||||||
|
|
||||||
|
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
SRpcMsg rsp = {
|
||||||
|
.info = *pRpcHandleInfo,
|
||||||
|
.pCont = buf,
|
||||||
|
.contLen = tlen,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
|
@ -181,37 +236,40 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t len = 0;
|
// int32_t len = 0;
|
||||||
int32_t code = 0;
|
// int32_t code = 0;
|
||||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
// if (code < 0) {
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
|
// void* buf = rpcMallocCont(tlen);
|
||||||
|
// if (buf == NULL) {
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// SEncoder encoder = {0};
|
||||||
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
|
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pPushEntry->pInfo,
|
||||||
|
// .pCont = buf,
|
||||||
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// tmsgSendRsp(&rsp);
|
||||||
|
//
|
||||||
|
|
||||||
if (code < 0) {
|
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||||
return -1;
|
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
|
||||||
tEncoderInit(&encoder, abuf, len);
|
|
||||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
SRpcMsg rsp = {
|
|
||||||
.info = pPushEntry->pInfo,
|
|
||||||
.pCont = buf,
|
|
||||||
.contLen = tlen,
|
|
||||||
.code = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
|
@ -219,11 +277,10 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
|
||||||
#if 0
|
#if 0
|
||||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
|
@ -240,109 +297,127 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t len = 0;
|
// int32_t len = 0;
|
||||||
int32_t code = 0;
|
// int32_t code = 0;
|
||||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
//
|
||||||
if (code < 0) {
|
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
return -1;
|
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
}
|
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||||
void* buf = rpcMallocCont(tlen);
|
// }
|
||||||
if (buf == NULL) {
|
//
|
||||||
return -1;
|
// if (code < 0) {
|
||||||
}
|
// return -1;
|
||||||
|
// }
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
//
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
// void* buf = rpcMallocCont(tlen);
|
||||||
|
// if (buf == NULL) {
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
// return -1;
|
||||||
|
// }
|
||||||
SEncoder encoder = {0};
|
//
|
||||||
tEncoderInit(&encoder, abuf, len);
|
// ((SMqRspHead*)buf)->mqMsgType = type;
|
||||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||||
tEncoderClear(&encoder);
|
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||||
|
//
|
||||||
SRpcMsg rsp = {
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
.info = pMsg->info,
|
//
|
||||||
.pCont = buf,
|
// SEncoder encoder = {0};
|
||||||
.contLen = tlen,
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
.code = 0,
|
//
|
||||||
};
|
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
tmsgSendRsp(&rsp);
|
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
char buf1[80] = {0};
|
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||||
char buf2[80] = {0};
|
// }
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
//
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
// tEncoderClear(&encoder);
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
|
//
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pMsg->info,
|
||||||
return 0;
|
// .pCont = buf,
|
||||||
}
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
// };
|
||||||
#if 0
|
//
|
||||||
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
// tmsgSendRsp(&rsp);
|
||||||
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
||||||
|
|
||||||
if (pRsp->withSchema) {
|
|
||||||
A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
|
||||||
} else {
|
|
||||||
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
|
||||||
if (pRsp->blockNum > 0) {
|
|
||||||
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
|
||||||
} else {
|
|
||||||
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t len = 0;
|
|
||||||
int32_t code = 0;
|
|
||||||
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
|
||||||
if (buf == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
|
||||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
|
||||||
tEncoderInit(&encoder, abuf, len);
|
|
||||||
tEncodeSTaosxRsp(&encoder, pRsp);
|
|
||||||
tEncoderClear(&encoder);
|
|
||||||
|
|
||||||
SRpcMsg rsp = {
|
|
||||||
.info = pMsg->info,
|
|
||||||
.pCont = buf,
|
|
||||||
.contLen = tlen,
|
|
||||||
.code = 0,
|
|
||||||
};
|
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
|
||||||
tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||||
|
//#if 0
|
||||||
|
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
|
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
|
//
|
||||||
|
// if (pRsp->withSchema) {
|
||||||
|
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||||
|
// } else {
|
||||||
|
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
// if (pRsp->blockNum > 0) {
|
||||||
|
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
// } else {
|
||||||
|
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//#endif
|
||||||
|
//
|
||||||
|
// int32_t len = 0;
|
||||||
|
// int32_t code = 0;
|
||||||
|
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||||
|
// if (code < 0) {
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
|
// void* buf = rpcMallocCont(tlen);
|
||||||
|
// if (buf == NULL) {
|
||||||
|
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||||
|
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||||
|
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||||
|
//
|
||||||
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// SEncoder encoder = {0};
|
||||||
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
|
// tEncodeSTaosxRsp(&encoder, pRsp);
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pMsg->info,
|
||||||
|
// .pCont = buf,
|
||||||
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// tmsgSendRsp(&rsp);
|
||||||
|
//
|
||||||
|
// char buf1[80] = {0};
|
||||||
|
// char buf2[80] = {0};
|
||||||
|
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
|
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
//
|
||||||
|
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||||
|
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
// return 0;
|
||||||
|
//}
|
||||||
|
|
||||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
pLeft->val.version <= pRight->val.version;
|
pLeft->val.version <= pRight->val.version;
|
||||||
|
@ -441,9 +516,7 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/*A(subType == TOPIC_SUB_TYPE__COLUMN);*/
|
|
||||||
pRsp->withSchema = false;
|
pRsp->withSchema = false;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -506,7 +579,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||||
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
|
|
||||||
*pBlockReturned = true;
|
*pBlockReturned = true;
|
||||||
|
@ -515,7 +588,8 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, pRequest);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
|
// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
|
||||||
*pBlockReturned = true;
|
*pBlockReturned = true;
|
||||||
|
@ -570,13 +644,13 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp);
|
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
// NOTE: this pHandle->consumerId may have been changed already.
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
|
@ -611,7 +685,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosxRsp.blockNum > 0) {
|
if (taosxRsp.blockNum > 0) {
|
||||||
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
|
@ -622,13 +696,14 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
||||||
taosxRsp.rspOffset.version);
|
taosxRsp.rspOffset.version);
|
||||||
}
|
} else {
|
||||||
|
|
||||||
if (offset.type == TMQ_OFFSET__LOG) {
|
// if (offset.type == TMQ_OFFSET__LOG) {
|
||||||
int64_t fetchVer = offset.version + 1;
|
int64_t fetchVer = offset.version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -646,14 +721,18 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
taosMemoryFreeClear(pCkHead);
|
// return code;
|
||||||
return code;
|
// } else { // error happens, return to consumers
|
||||||
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
taosMemoryFreeClear(pCkHead);
|
||||||
|
return code;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||||
pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
||||||
|
|
||||||
|
@ -663,6 +742,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
||||||
.ver = pHead->version,
|
.ver = pHead->version,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
||||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
|
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
|
||||||
pRequest->subKey);
|
pRequest->subKey);
|
||||||
|
@ -671,7 +751,7 @@ static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq*
|
||||||
|
|
||||||
if (taosxRsp.blockNum > 0) {
|
if (taosxRsp.blockNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return code;
|
return code;
|
||||||
|
@ -1027,15 +1107,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||||
|
|
||||||
int32_t version = 1;
|
int32_t ver1 = 1;
|
||||||
SMetaInfo info = {0};
|
SMetaInfo info = {0};
|
||||||
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
version = info.skmVer;
|
ver1 = info.skmVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->tbSink.pTSchema =
|
pTask->tbSink.pTSchema =
|
||||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
|
||||||
ASSERT(pTask->tbSink.pTSchema);
|
ASSERT(pTask->tbSink.pTSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -209,6 +209,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
|
@ -217,7 +218,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
||||||
if (numOfRegisteredPush > 0) {
|
if (numOfRegisteredPush > 0) {
|
||||||
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
||||||
pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||||
|
|
||||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||||
|
@ -239,7 +240,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||||
|
@ -248,23 +252,23 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
if (pRsp->reqOffset.version >= ver) {
|
||||||
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
|
||||||
|
pRsp->reqOffset.version, ver);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqExecHandle* pExec = &pHandle->execHandle;
|
STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
|
||||||
|
|
||||||
// prepare scan mem data
|
// prepare scan mem data
|
||||||
SPackedData submit = {
|
SPackedData submit = {
|
||||||
.msgStr = data,
|
.msgStr = data,
|
||||||
.msgLen = len,
|
.msgLen = len,
|
||||||
.ver = ver,
|
.ver = ver,
|
||||||
};
|
};
|
||||||
|
|
||||||
qStreamSetScanMemData(task, submit);
|
qStreamSetScanMemData(task, submit);
|
||||||
|
|
||||||
// here start to scan submit block to extract the subscribed data
|
// here start to scan submit block to extract the subscribed data
|
||||||
|
@ -272,7 +276,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -283,11 +287,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
|
||||||
pRsp->blockNum);
|
|
||||||
if (pRsp->blockNum > 0) {
|
if (pRsp->blockNum > 0) {
|
||||||
// set offset
|
// set offset
|
||||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||||
|
|
||||||
// remove from hash
|
// remove from hash
|
||||||
size_t kLen;
|
size_t kLen;
|
||||||
void* key = taosHashGetKey(pIter, &kLen);
|
void* key = taosHashGetKey(pIter, &kLen);
|
||||||
|
@ -309,6 +313,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||||
taosArrayDestroy(cachedKeyLens);
|
taosArrayDestroy(cachedKeyLens);
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
|
@ -334,9 +339,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
};
|
};
|
||||||
|
|
||||||
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
|
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
|
||||||
|
|
||||||
tqProcessSubmitReq(pTq, submit);
|
tqProcessSubmitReq(pTq, submit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_DELETE) {
|
if (msgType == TDMT_VND_DELETE) {
|
||||||
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
||||||
}
|
}
|
||||||
|
@ -346,7 +351,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||||
SMqDataRsp* pDataRsp) {
|
SMqDataRsp* pDataRsp, int32_t type) {
|
||||||
uint64_t consumerId = pRequest->consumerId;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
STqHandle* pTqHandle = pHandle;
|
STqHandle* pTqHandle = pHandle;
|
||||||
|
@ -359,14 +364,22 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pPushEntry->pInfo = pRpcMsg->info;
|
pPushEntry->info = pRpcMsg->info;
|
||||||
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
pDataRsp->withTbName = 0;
|
|
||||||
|
|
||||||
memcpy(&pPushEntry->dataRsp, pDataRsp, sizeof(SMqDataRsp));
|
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
pPushEntry->dataRsp.head.consumerId = consumerId;
|
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
|
||||||
pPushEntry->dataRsp.head.epoch = pRequest->epoch;
|
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
|
||||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
|
||||||
|
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
|
||||||
|
pHead->consumerId = consumerId;
|
||||||
|
pHead->epoch = pRequest->epoch;
|
||||||
|
pHead->mqMsgType = type;
|
||||||
|
|
||||||
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
|
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId,
|
||||||
|
@ -377,12 +390,14 @@ int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest,
|
||||||
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
||||||
|
|
||||||
if (pEntry != NULL) {
|
if (pEntry != NULL) {
|
||||||
uint64_t cId = (*pEntry)->dataRsp.head.consumerId;
|
uint64_t cId = (*pEntry)->pDataRsp->head.consumerId;
|
||||||
ASSERT(consumerId == cId);
|
ASSERT(consumerId == cId);
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
|
||||||
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
|
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
|
||||||
|
|
||||||
if (rspConsumer) { // rsp the old consumer with empty block.
|
if (rspConsumer) { // rsp the old consumer with empty block.
|
||||||
tqPushDataRsp(pTq, *pEntry);
|
tqPushDataRsp(pTq, *pEntry);
|
||||||
}
|
}
|
||||||
|
|
|
@ -221,12 +221,12 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
// create raw scan
|
// create raw scan
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||||
if (NULL == pTaskInfo) {
|
if (NULL == pTaskInfo) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
|
|
||||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||||
|
|
|
@ -123,8 +123,9 @@ class TDTestCase:
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
sql = pre_insert
|
sql = pre_insert
|
||||||
|
|
||||||
t = time.time()
|
# t = 1678609778776 #time.time()
|
||||||
startTs = int(round(t * 1000))
|
t = 1600000000000
|
||||||
|
startTs = t #int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s_%d values "%(stbName,i)
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
|
Loading…
Reference in New Issue