Merge pull request #20422 from taosdata/fix/liaohj
fix(tmq): fix some race condition and do some internal refactor.
This commit is contained in:
commit
7743518557
|
@ -25,13 +25,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// TODO remove it
|
|
||||||
enum {
|
|
||||||
TMQ_CONF__RESET_OFFSET__NONE = -3,
|
|
||||||
TMQ_CONF__RESET_OFFSET__EARLIEAST = -2,
|
|
||||||
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
|
||||||
};
|
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
#define IS_META_MSG(x) ( \
|
#define IS_META_MSG(x) ( \
|
||||||
x == TDMT_VND_CREATE_STB \
|
x == TDMT_VND_CREATE_STB \
|
||||||
|
|
|
@ -3186,6 +3186,7 @@ typedef struct {
|
||||||
SArray* blockData;
|
SArray* blockData;
|
||||||
SArray* blockTbName;
|
SArray* blockTbName;
|
||||||
SArray* blockSchema;
|
SArray* blockSchema;
|
||||||
|
// the following attributes are extended from SMqDataRsp
|
||||||
int32_t createTableNum;
|
int32_t createTableNum;
|
||||||
SArray* createTableLen;
|
SArray* createTableLen;
|
||||||
SArray* createTableReq;
|
SArray* createTableReq;
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -898,7 +898,7 @@ TEST(clientCase, update_test) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(clientCase, subscription_test) {
|
TEST(clientCase, sub_db_test) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
@ -912,7 +912,7 @@ TEST(clientCase, subscription_test) {
|
||||||
tmq_conf_t* conf = tmq_conf_new();
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
tmq_conf_set(conf, "group.id", "cgrpName");
|
tmq_conf_set(conf, "group.id", "cgrpNamedb");
|
||||||
tmq_conf_set(conf, "td.connect.user", "root");
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
@ -925,7 +925,7 @@ TEST(clientCase, subscription_test) {
|
||||||
|
|
||||||
// 创建订阅 topics 列表
|
// 创建订阅 topics 列表
|
||||||
tmq_list_t* topicList = tmq_list_new();
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
// tmq_list_append(topicList, "topic_t1");
|
tmq_list_append(topicList, "topic_db1");
|
||||||
|
|
||||||
// 启动订阅
|
// 启动订阅
|
||||||
tmq_subscribe(tmq, topicList);
|
tmq_subscribe(tmq, topicList);
|
||||||
|
@ -954,7 +954,86 @@ TEST(clientCase, subscription_test) {
|
||||||
printf("db: %s\n", dbName);
|
printf("db: %s\n", dbName);
|
||||||
printf("vgroup id: %d\n", vgroupId);
|
printf("vgroup id: %d\n", vgroupId);
|
||||||
|
|
||||||
if (count ++ > 20) {
|
if (count ++ > 200) {
|
||||||
|
tmq_unsubscribe(tmq);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_ROW row = taos_fetch_row(pRes);
|
||||||
|
if (row == NULL) break;
|
||||||
|
|
||||||
|
fields = taos_fetch_fields(pRes);
|
||||||
|
numOfFields = taos_field_count(pRes);
|
||||||
|
precision = taos_result_precision(pRes);
|
||||||
|
rows++;
|
||||||
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
printf("precision: %d, row content: %s\n", precision, buf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// return rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(clientCase, sub_tb_test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
||||||
|
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
|
||||||
|
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
|
||||||
|
// printf("failed to create topic, code:%s", taos_errstr(pRes));
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
// return;
|
||||||
|
// }
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||||
|
tmq_conf_set(conf, "group.id", "cgrpName");
|
||||||
|
tmq_conf_set(conf, "td.connect.user", "root");
|
||||||
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
|
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
|
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||||
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
|
|
||||||
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
|
// 创建订阅 topics 列表
|
||||||
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
|
tmq_list_append(topicList, "topic_t1");
|
||||||
|
|
||||||
|
// 启动订阅
|
||||||
|
tmq_subscribe(tmq, topicList);
|
||||||
|
tmq_list_destroy(topicList);
|
||||||
|
|
||||||
|
TAOS_FIELD* fields = NULL;
|
||||||
|
int32_t numOfFields = 0;
|
||||||
|
int32_t precision = 0;
|
||||||
|
int32_t totalRows = 0;
|
||||||
|
int32_t msgCnt = 0;
|
||||||
|
int32_t timeout = 5000;
|
||||||
|
|
||||||
|
int32_t count = 0;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
|
if (pRes) {
|
||||||
|
char buf[1024];
|
||||||
|
int32_t rows = 0;
|
||||||
|
|
||||||
|
const char* topicName = tmq_get_topic_name(pRes);
|
||||||
|
const char* dbName = tmq_get_db_name(pRes);
|
||||||
|
int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||||
|
|
||||||
|
printf("topic: %s\n", topicName);
|
||||||
|
printf("db: %s\n", dbName);
|
||||||
|
printf("vgroup id: %d\n", vgroupId);
|
||||||
|
|
||||||
|
if (count ++ > 200) {
|
||||||
tmq_unsubscribe(tmq);
|
tmq_unsubscribe(tmq);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6639,8 +6639,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
|
||||||
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA || pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
|
||||||
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
|
snprintf(buf, maxLen, "offset(snapshot) uid:%" PRId64 " ts:%" PRId64, pVal->uid, pVal->ts);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
return TSDB_CODE_INVALID_PARA;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6823,8 +6824,7 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
void tDeleteSMqDataRsp(SMqDataRsp *pRsp) {
|
||||||
taosArrayDestroy(pRsp->blockDataLen);
|
pRsp->blockDataLen = taosArrayDestroy(pRsp->blockDataLen);;
|
||||||
pRsp->blockDataLen = NULL;
|
|
||||||
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(pRsp->blockData, (FDelete)taosMemoryFree);
|
||||||
pRsp->blockData = NULL;
|
pRsp->blockData = NULL;
|
||||||
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
|
|
@ -24,10 +24,10 @@ extern "C" {
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MQ_CONSUMER_STATUS__MODIFY = 1,
|
MQ_CONSUMER_STATUS__MODIFY = 1,
|
||||||
MQ_CONSUMER_STATUS__MODIFY_IN_REB,
|
MQ_CONSUMER_STATUS__MODIFY_IN_REB, // this value is not used anymore
|
||||||
MQ_CONSUMER_STATUS__READY,
|
MQ_CONSUMER_STATUS__READY,
|
||||||
MQ_CONSUMER_STATUS__LOST,
|
MQ_CONSUMER_STATUS__LOST,
|
||||||
MQ_CONSUMER_STATUS__LOST_IN_REB,
|
MQ_CONSUMER_STATUS__LOST_IN_REB, // this value is not used anymore
|
||||||
MQ_CONSUMER_STATUS__LOST_REBD,
|
MQ_CONSUMER_STATUS__LOST_REBD,
|
||||||
MQ_CONSUMER_STATUS__REMOVED,
|
MQ_CONSUMER_STATUS__REMOVED,
|
||||||
};
|
};
|
||||||
|
|
|
@ -317,7 +317,7 @@ static int32_t mndProcessMqTimerMsg(SRpcMsg *pMsg) {
|
||||||
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
}
|
}
|
||||||
taosRUnLockLatch(&pConsumer->lock);
|
taosRUnLockLatch(&pConsumer->lock);
|
||||||
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
} else if (status == MQ_CONSUMER_STATUS__MODIFY || status == MQ_CONSUMER_STATUS__MODIFY_IN_REB) {
|
||||||
taosRLockLatch(&pConsumer->lock);
|
taosRLockLatch(&pConsumer->lock);
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
int32_t newTopicNum = taosArrayGetSize(pConsumer->rebNewTopics);
|
||||||
|
|
|
@ -224,7 +224,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
.pVgEp = pVgEp,
|
.pVgEp = pVgEp,
|
||||||
};
|
};
|
||||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||||
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:%" PRIx64, sub, pVgEp->vgId, consumerId);
|
mInfo("sub:%s mq re-balance remove vgId:%d from consumer:0x%" PRIx64, sub, pVgEp->vgId, consumerId);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pConsumerEp->vgs);
|
taosArrayDestroy(pConsumerEp->vgs);
|
||||||
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
taosHashRemove(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||||
|
@ -329,7 +329,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||||
mInfo("sub:%s mq rebalance add new consumer:%" PRIx64, sub, consumerId);
|
mInfo("sub:%s mq rebalance add new consumer:0x%" PRIx64, sub, consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -357,7 +357,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (not enough)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -387,12 +387,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
||||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||||
mInfo("mq rebalance: skip vg %d for same consumer:%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: skip vg %d for same consumer:0x%" PRIx64 " (second scan)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
mInfo("mq rebalance: add vgId:%d to consumer:0x%" PRIx64 " (second scan) (unassigned)", pRebVg->pVgEp->vgId,
|
||||||
pConsumerEp->consumerId);
|
pConsumerEp->consumerId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1019,7 +1019,7 @@ int32_t mndRetrieveSubscribe(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pConsumerEp->consumerId, false);
|
||||||
|
|
||||||
mDebug("mnd show subscriptions: topic %s, consumer:%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
mDebug("mnd show subscriptions: topic %s, consumer:0x%" PRIx64 " cgroup %s vgid %d", varDataVal(topic),
|
||||||
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
pConsumerEp->consumerId, varDataVal(cgroup), pVgEp->vgId);
|
||||||
|
|
||||||
// offset
|
// offset
|
||||||
|
|
|
@ -79,43 +79,33 @@ typedef struct {
|
||||||
} STqExecDb;
|
} STqExecDb;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t subType;
|
int8_t subType;
|
||||||
|
STqReader* pExecReader;
|
||||||
STqReader* pExecReader;
|
qTaskInfo_t task;
|
||||||
qTaskInfo_t task;
|
|
||||||
union {
|
union {
|
||||||
STqExecCol execCol;
|
STqExecCol execCol;
|
||||||
STqExecTb execTb;
|
STqExecTb execTb;
|
||||||
STqExecDb execDb;
|
STqExecDb execDb;
|
||||||
};
|
};
|
||||||
int32_t numOfCols; // number of out pout column, temporarily used
|
int32_t numOfCols; // number of out pout column, temporarily used
|
||||||
} STqExecHandle;
|
} STqExecHandle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// info
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
int64_t consumerId;
|
||||||
int64_t consumerId;
|
int32_t epoch;
|
||||||
int32_t epoch;
|
int8_t fetchMeta;
|
||||||
int8_t fetchMeta;
|
int64_t snapshotVer;
|
||||||
|
SWalReader* pWalReader;
|
||||||
int64_t snapshotVer;
|
SWalRef* pRef;
|
||||||
|
STqPushHandle pushHandle; // push
|
||||||
SWalReader* pWalReader;
|
STqExecHandle execHandle; // exec
|
||||||
|
|
||||||
SWalRef* pRef;
|
|
||||||
|
|
||||||
// push
|
|
||||||
STqPushHandle pushHandle;
|
|
||||||
|
|
||||||
// exec
|
|
||||||
STqExecHandle execHandle;
|
|
||||||
|
|
||||||
} STqHandle;
|
} STqHandle;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqDataRsp dataRsp;
|
SMqDataRsp* pDataRsp;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
SRpcHandleInfo pInfo;
|
SRpcHandleInfo info;
|
||||||
} STqPushEntry;
|
} STqPushEntry;
|
||||||
|
|
||||||
struct STQ {
|
struct STQ {
|
||||||
|
@ -151,13 +141,13 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
||||||
// tqRead
|
// tqRead
|
||||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMetaRsp* pMetaRsp, STqOffsetVal* offset);
|
||||||
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset);
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** pHeadWithCkSum);
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, STaosxRsp* pRsp);
|
||||||
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
// int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SSubmitReq* pReq, STaosxRsp* pRsp);
|
||||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type);
|
||||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry);
|
||||||
|
|
||||||
// tqMeta
|
// tqMeta
|
||||||
|
|
|
@ -192,6 +192,9 @@ void tqCleanUp();
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
|
int tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp, int32_t type);
|
||||||
|
int tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer);
|
||||||
|
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||||
|
|
|
@ -68,7 +68,13 @@ static void destroySTqHandle(void* data) {
|
||||||
|
|
||||||
static void tqPushEntryFree(void* data) {
|
static void tqPushEntryFree(void* data) {
|
||||||
STqPushEntry* p = *(void**)data;
|
STqPushEntry* p = *(void**)data;
|
||||||
tDeleteSMqDataRsp(&p->dataRsp);
|
if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tDeleteSMqDataRsp(p->pDataRsp);
|
||||||
|
} else if (p->pDataRsp->head.mqMsgType == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tDeleteSTaosxRsp((STaosxRsp*)p->pDataRsp);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(p->pDataRsp);
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -166,12 +172,16 @@ int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
int64_t consumerId, int32_t type) {
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
|
||||||
|
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||||
|
}
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -183,23 +193,83 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
((SMqRspHead*)buf)->mqMsgType = type;
|
||||||
|
((SMqRspHead*)buf)->epoch = epoch;
|
||||||
|
((SMqRspHead*)buf)->consumerId = consumerId;
|
||||||
|
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, abuf, len);
|
tEncoderInit(&encoder, abuf, len);
|
||||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
|
||||||
|
if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
} else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||||
|
}
|
||||||
|
|
||||||
tEncoderClear(&encoder);
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = {
|
||||||
.info = pPushEntry->pInfo,
|
.info = *pRpcHandleInfo,
|
||||||
.pCont = buf,
|
.pCont = buf,
|
||||||
.contLen = tlen,
|
.contLen = tlen,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
|
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
|
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
|
|
||||||
|
A(!pRsp->withSchema);
|
||||||
|
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
|
|
||||||
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// int32_t len = 0;
|
||||||
|
// int32_t code = 0;
|
||||||
|
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
// if (code < 0) {
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
|
// void* buf = rpcMallocCont(tlen);
|
||||||
|
// if (buf == NULL) {
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// SEncoder encoder = {0};
|
||||||
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
|
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pPushEntry->pInfo,
|
||||||
|
// .pCont = buf,
|
||||||
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// tmsgSendRsp(&rsp);
|
||||||
|
//
|
||||||
|
|
||||||
|
SMqRspHead* pHeader = &pPushEntry->pDataRsp->head;
|
||||||
|
doSendDataRsp(&pPushEntry->info, pRsp, pHeader->epoch, pHeader->consumerId, pHeader->mqMsgType);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
|
@ -207,93 +277,146 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
|
||||||
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
tFormatOffset(buf2, tListLen(buf2), &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp) {
|
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type) {
|
||||||
int32_t len = 0;
|
#if 0
|
||||||
int32_t code = 0;
|
A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
if (code < 0) {
|
|
||||||
return -1;
|
A(!pRsp->withSchema);
|
||||||
}
|
A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
|
||||||
void* buf = rpcMallocCont(tlen);
|
if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
if (buf == NULL) {
|
if (pRsp->blockNum > 0) {
|
||||||
return -1;
|
A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
|
} else {
|
||||||
|
A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
// int32_t len = 0;
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
// int32_t code = 0;
|
||||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
//
|
||||||
|
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
// tEncodeSize(tEncodeSMqDataRsp, pRsp, len, code);
|
||||||
|
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
SEncoder encoder = {0};
|
// tEncodeSize(tEncodeSTaosxRsp, (STaosxRsp*)pRsp, len, code);
|
||||||
tEncoderInit(&encoder, abuf, len);
|
// }
|
||||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
//
|
||||||
tEncoderClear(&encoder);
|
// if (code < 0) {
|
||||||
|
// return -1;
|
||||||
SRpcMsg rsp = {
|
// }
|
||||||
.info = pMsg->info,
|
//
|
||||||
.pCont = buf,
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
.contLen = tlen,
|
// void* buf = rpcMallocCont(tlen);
|
||||||
.code = 0,
|
// if (buf == NULL) {
|
||||||
};
|
// return -1;
|
||||||
tmsgSendRsp(&rsp);
|
// }
|
||||||
|
//
|
||||||
|
// ((SMqRspHead*)buf)->mqMsgType = type;
|
||||||
|
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||||
|
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||||
|
//
|
||||||
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
|
//
|
||||||
|
// SEncoder encoder = {0};
|
||||||
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
|
//
|
||||||
|
// if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
// tEncodeSMqDataRsp(&encoder, pRsp);
|
||||||
|
// } else if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
// tEncodeSTaosxRsp(&encoder, (STaosxRsp*) pRsp);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pMsg->info,
|
||||||
|
// .pCont = buf,
|
||||||
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// tmsgSendRsp(&rsp);
|
||||||
|
doSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type);
|
||||||
|
|
||||||
char buf1[80] = {0};
|
char buf1[80] = {0};
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, req:%s, rsp:%s",
|
|
||||||
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s",
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
//int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const STaosxRsp* pRsp) {
|
||||||
int32_t len = 0;
|
//#if 0
|
||||||
int32_t code = 0;
|
// A(taosArrayGetSize(pRsp->blockData) == pRsp->blockNum);
|
||||||
tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
// A(taosArrayGetSize(pRsp->blockDataLen) == pRsp->blockNum);
|
||||||
if (code < 0) {
|
//
|
||||||
return -1;
|
// if (pRsp->withSchema) {
|
||||||
}
|
// A(taosArrayGetSize(pRsp->blockSchema) == pRsp->blockNum);
|
||||||
int32_t tlen = sizeof(SMqRspHead) + len;
|
// } else {
|
||||||
void* buf = rpcMallocCont(tlen);
|
// A(taosArrayGetSize(pRsp->blockSchema) == 0);
|
||||||
if (buf == NULL) {
|
// }
|
||||||
return -1;
|
//
|
||||||
}
|
// if (pRsp->reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
|
// if (pRsp->blockNum > 0) {
|
||||||
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
// A(pRsp->rspOffset.version > pRsp->reqOffset.version);
|
||||||
((SMqRspHead*)buf)->epoch = pReq->epoch;
|
// } else {
|
||||||
((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
// A(pRsp->rspOffset.version >= pRsp->reqOffset.version);
|
||||||
|
// }
|
||||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
// }
|
||||||
|
//#endif
|
||||||
SEncoder encoder = {0};
|
//
|
||||||
tEncoderInit(&encoder, abuf, len);
|
// int32_t len = 0;
|
||||||
tEncodeSTaosxRsp(&encoder, pRsp);
|
// int32_t code = 0;
|
||||||
tEncoderClear(&encoder);
|
// tEncodeSize(tEncodeSTaosxRsp, pRsp, len, code);
|
||||||
|
// if (code < 0) {
|
||||||
SRpcMsg rsp = {
|
// return -1;
|
||||||
.info = pMsg->info,
|
// }
|
||||||
.pCont = buf,
|
//
|
||||||
.contLen = tlen,
|
// int32_t tlen = sizeof(SMqRspHead) + len;
|
||||||
.code = 0,
|
// void* buf = rpcMallocCont(tlen);
|
||||||
};
|
// if (buf == NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
// return -1;
|
||||||
char buf1[80] = {0};
|
// }
|
||||||
char buf2[80] = {0};
|
//
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
// ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__TAOSX_RSP;
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
// ((SMqRspHead*)buf)->epoch = pReq->epoch;
|
||||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
// ((SMqRspHead*)buf)->consumerId = pReq->consumerId;
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
//
|
||||||
|
// void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
|
||||||
return 0;
|
//
|
||||||
}
|
// SEncoder encoder = {0};
|
||||||
|
// tEncoderInit(&encoder, abuf, len);
|
||||||
|
// tEncodeSTaosxRsp(&encoder, pRsp);
|
||||||
|
// tEncoderClear(&encoder);
|
||||||
|
//
|
||||||
|
// SRpcMsg rsp = {
|
||||||
|
// .info = pMsg->info,
|
||||||
|
// .pCont = buf,
|
||||||
|
// .contLen = tlen,
|
||||||
|
// .code = 0,
|
||||||
|
// };
|
||||||
|
//
|
||||||
|
// tmsgSendRsp(&rsp);
|
||||||
|
//
|
||||||
|
// char buf1[80] = {0};
|
||||||
|
// char buf2[80] = {0};
|
||||||
|
// tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
|
// tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
|
//
|
||||||
|
// tqDebug("taosx rsp, vgId:%d, consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||||
|
// TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
// return 0;
|
||||||
|
//}
|
||||||
|
|
||||||
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
static FORCE_INLINE bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
|
||||||
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
|
||||||
|
@ -382,8 +505,18 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
||||||
}
|
}
|
||||||
|
|
||||||
pRsp->withTbName = 0;
|
pRsp->withTbName = 0;
|
||||||
pRsp->withSchema = false;
|
#if 0
|
||||||
|
pRsp->withTbName = pReq->withTbName;
|
||||||
|
if (pRsp->withTbName) {
|
||||||
|
pRsp->blockTbName = taosArrayInit(0, sizeof(void*));
|
||||||
|
if (pRsp->blockTbName == NULL) {
|
||||||
|
// TODO free
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
pRsp->withSchema = false;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -404,170 +537,147 @@ static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, const SMqPollReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest,
|
||||||
SMqPollReq req = {0};
|
SRpcMsg* pMsg, bool* pBlockReturned) {
|
||||||
int32_t code = 0;
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
STqOffsetVal fetchOffsetNew;
|
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||||
SWalCkHead* pCkHead = NULL;
|
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pRequest->subKey);
|
||||||
|
*pBlockReturned = false;
|
||||||
|
|
||||||
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
// In this vnode, data has been polled by consumer for this topic, so let's continue from the last offset value.
|
||||||
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
if (pOffset != NULL) {
|
||||||
return -1;
|
*pOffsetVal = pOffset->val;
|
||||||
}
|
|
||||||
|
|
||||||
int64_t consumerId = req.consumerId;
|
char formatBuf[80];
|
||||||
int32_t reqEpoch = req.epoch;
|
tFormatOffset(formatBuf, 80, pOffsetVal);
|
||||||
STqOffsetVal reqOffset = req.reqOffset;
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, prev offset found, offset reset to %s and continue.",
|
||||||
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), formatBuf);
|
||||||
// 1. find handle
|
return 0;
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
|
||||||
if (pHandle == NULL) {
|
|
||||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s not found", consumerId, TD_VID(pTq->pVnode),
|
|
||||||
req.subKey);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. check rebalance
|
|
||||||
if (pHandle->consumerId != consumerId) {
|
|
||||||
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
|
||||||
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
|
||||||
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// update epoch if need
|
|
||||||
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
|
||||||
while (savedEpoch < reqEpoch) {
|
|
||||||
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
|
||||||
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
|
||||||
}
|
|
||||||
|
|
||||||
char buf[80];
|
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
|
||||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
|
||||||
|
|
||||||
// 2.reset offset if needed
|
|
||||||
if (reqOffset.type > 0) {
|
|
||||||
fetchOffsetNew = reqOffset;
|
|
||||||
} else {
|
} else {
|
||||||
STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, req.subKey);
|
// no poll occurs in this vnode for this topic, let's seek to the right offset value.
|
||||||
if (pOffset != NULL) {
|
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||||
fetchOffsetNew = pOffset->val;
|
if (pRequest->useSnapshot) {
|
||||||
char formatBuf[80];
|
if (pHandle->fetchMeta) {
|
||||||
tFormatOffset(formatBuf, 80, &fetchOffsetNew);
|
tqOffsetResetToMeta(pOffsetVal, 0);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d, offset reset to %s", consumerId, pHandle->subKey,
|
|
||||||
TD_VID(pTq->pVnode), formatBuf);
|
|
||||||
} else {
|
|
||||||
if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
|
||||||
if (req.useSnapshot) {
|
|
||||||
if (pHandle->fetchMeta) {
|
|
||||||
tqOffsetResetToMeta(&fetchOffsetNew, 0);
|
|
||||||
} else {
|
|
||||||
tqOffsetResetToData(&fetchOffsetNew, 0, 0);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
tqOffsetResetToData(pOffsetVal, 0, 0);
|
||||||
if (pHandle->pRef == NULL) {
|
}
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
} else {
|
||||||
return -1;
|
pHandle->pRef = walRefFirstVer(pTq->pVnode->pWal, pHandle->pRef);
|
||||||
}
|
if (pHandle->pRef == NULL) {
|
||||||
tqOffsetResetToLog(&fetchOffsetNew, pHandle->pRef->refVer - 1);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
|
||||||
SMqDataRsp dataRsp = {0};
|
|
||||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(pOffsetVal, pHandle->pRef->refVer - 1);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
|
||||||
return code;
|
|
||||||
} else {
|
|
||||||
STaosxRsp taosxRsp = {0};
|
|
||||||
tqInitTaosxRsp(&taosxRsp, &req);
|
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
|
||||||
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
|
||||||
" in vg %d, subkey %s, reset none failed",
|
|
||||||
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), req.subKey);
|
|
||||||
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
}
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
}
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
SMqDataRsp dataRsp = {0};
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||||
// lock
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
taosWLockLatch(&pTq->pushLock);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
if (tqScanData(pTq, pHandle, &dataRsp, &fetchOffsetNew) < 0) {
|
|
||||||
|
*pBlockReturned = true;
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
STaosxRsp taosxRsp = {0};
|
||||||
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
|
tqOffsetResetToLog(&taosxRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
|
int32_t code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
|
// int32_t code = tqSendTaosxRsp(pTq, pMsg, pRequest, &taosxRsp);
|
||||||
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
|
||||||
|
*pBlockReturned = true;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_NONE) {
|
||||||
|
tqError("tmq poll: subkey %s, no offset committed for consumer:0x%" PRIx64
|
||||||
|
" in vg %d, subkey %s, reset none failed",
|
||||||
|
pHandle->subKey, consumerId, TD_VID(pTq->pVnode), pRequest->subKey);
|
||||||
|
terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t extractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) {
|
||||||
|
int32_t code = -1;
|
||||||
|
STqOffsetVal offset = {0};
|
||||||
|
SWalCkHead* pCkHead = NULL;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
STqOffsetVal reqOffset = pRequest->reqOffset;
|
||||||
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
|
||||||
|
// 1. reset the offset if needed
|
||||||
|
if (reqOffset.type > 0) {
|
||||||
|
offset = reqOffset;
|
||||||
|
} else { // handle the reset offset cases, according to the consumer's choice.
|
||||||
|
bool blockReturned = false;
|
||||||
|
code = extractResetOffsetVal(&offset, pTq, pHandle, pRequest, pMsg, &blockReturned);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// empty block returned, quit
|
||||||
|
if (blockReturned) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// this is a normal subscription requirement
|
||||||
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
SMqDataRsp dataRsp = {0};
|
||||||
|
tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType);
|
||||||
|
|
||||||
|
// lock
|
||||||
|
taosWLockLatch(&pTq->pushLock);
|
||||||
|
code = tqScanData(pTq, pHandle, &dataRsp, &offset);
|
||||||
|
|
||||||
|
// till now, all data has been transferred to consumer, new data needs to push client once arrived.
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) {
|
||||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, &dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
if (pPushEntry != NULL) {
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
pPushEntry->pInfo = pMsg->info;
|
return code;
|
||||||
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
|
||||||
dataRsp.withTbName = 0;
|
|
||||||
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
|
|
||||||
pPushEntry->dataRsp.head.consumerId = consumerId;
|
|
||||||
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
|
||||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
|
||||||
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
|
|
||||||
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
|
|
||||||
// unlock
|
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP);
|
||||||
|
|
||||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
// NOTE: this pHandle->consumerId may have been changed already.
|
||||||
code = -1;
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64
|
||||||
}
|
", ts:%" PRId64,
|
||||||
|
consumerId, pHandle->subKey, vgId, dataRsp.blockNum, dataRsp.rspOffset.type, dataRsp.rspOffset.uid,
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
dataRsp.rspOffset.ts);
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
|
||||||
|
|
||||||
tDeleteSMqDataRsp(&dataRsp);
|
tDeleteSMqDataRsp(&dataRsp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo handle the case where re-balance occurs.
|
||||||
// for taosx
|
// for taosx
|
||||||
SMqMetaRsp metaRsp = {0};
|
SMqMetaRsp metaRsp = {0};
|
||||||
STaosxRsp taosxRsp = {0};
|
STaosxRsp taosxRsp = {0};
|
||||||
tqInitTaosxRsp(&taosxRsp, &req);
|
tqInitTaosxRsp(&taosxRsp, pRequest);
|
||||||
|
|
||||||
if (fetchOffsetNew.type != TMQ_OFFSET__LOG) {
|
if (offset.type != TMQ_OFFSET__LOG) {
|
||||||
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &fetchOffsetNew) < 0) {
|
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, &offset) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaRsp.metaRspLen > 0) {
|
if (metaRsp.metaRspLen > 0) {
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp);
|
||||||
code = -1;
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64
|
||||||
}
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send meta offset type:%d,uid:%" PRId64
|
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid,
|
||||||
metaRsp.rspOffset.version);
|
metaRsp.rspOffset.version);
|
||||||
taosMemoryFree(metaRsp.metaRsp);
|
taosMemoryFree(metaRsp.metaRsp);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
@ -575,54 +685,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosxRsp.blockNum > 0) {
|
if (taosxRsp.blockNum > 0) {
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
fetchOffsetNew = taosxRsp.rspOffset;
|
offset = taosxRsp.rspOffset;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey %s, vg %d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64
|
||||||
",version:%" PRId64,
|
",version:%" PRId64,
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), taosxRsp.blockNum, taosxRsp.rspOffset.type,
|
consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,
|
||||||
taosxRsp.rspOffset.uid, taosxRsp.rspOffset.version);
|
taosxRsp.rspOffset.version);
|
||||||
}
|
} else {
|
||||||
|
|
||||||
if (fetchOffsetNew.type == TMQ_OFFSET__LOG) {
|
// if (offset.type == TMQ_OFFSET__LOG) {
|
||||||
int64_t fetchVer = fetchOffsetNew.version + 1;
|
int64_t fetchVer = offset.version + 1;
|
||||||
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048);
|
||||||
if (pCkHead == NULL) {
|
if (pCkHead == NULL) {
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
savedEpoch = atomic_load_32(&pHandle->epoch);
|
// todo refactor: this is not correct.
|
||||||
if (savedEpoch > reqEpoch) {
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
if (savedEpoch > pRequest->epoch) {
|
||||||
|
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
", found new consumer epoch %d, discard req epoch %d",
|
||||||
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
|
consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead) < 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
// if (terrno == 0) { // failed to seek to given ver, but no errors happen.
|
||||||
code = -1;
|
// code = tqRegisterPushEntry(pTq, pHandle, pRequest, pMsg, (SMqDataRsp*) &taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
}
|
// return code;
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
// } else { // error happens, return to consumers
|
||||||
taosMemoryFreeClear(pCkHead);
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
return code;
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
taosMemoryFreeClear(pCkHead);
|
||||||
|
return code;
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalCont* pHead = &pCkHead->head;
|
SWalCont* pHead = &pCkHead->head;
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", consumerId,
|
||||||
req.epoch, TD_VID(pTq->pVnode), fetchVer, pHead->msgType);
|
pRequest->epoch, vgId, fetchVer, pHead->msgType);
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
SPackedData submit = {
|
SPackedData submit = {
|
||||||
|
@ -630,16 +742,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
.msgLen = pHead->bodyLen - sizeof(SSubmitReq2Msg),
|
||||||
.ver = pHead->version,
|
.ver = pHead->version,
|
||||||
};
|
};
|
||||||
|
|
||||||
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp) < 0) {
|
||||||
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, TD_VID(pTq->pVnode),
|
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", consumerId, vgId,
|
||||||
req.subKey);
|
pRequest->subKey);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (taosxRsp.blockNum > 0 /* threshold */) {
|
|
||||||
|
if (taosxRsp.blockNum > 0) {
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
|
||||||
if (tqSendTaosxRsp(pTq, pMsg, &req, &taosxRsp) < 0) {
|
code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP);
|
||||||
code = -1;
|
|
||||||
}
|
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
return code;
|
return code;
|
||||||
|
@ -655,7 +767,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
metaRsp.resMsgType = pHead->msgType;
|
metaRsp.resMsgType = pHead->msgType;
|
||||||
metaRsp.metaRspLen = pHead->bodyLen;
|
metaRsp.metaRspLen = pHead->bodyLen;
|
||||||
metaRsp.metaRsp = pHead->body;
|
metaRsp.metaRsp = pHead->body;
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, &req, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
taosMemoryFreeClear(pCkHead);
|
taosMemoryFreeClear(pCkHead);
|
||||||
tDeleteSTaosxRsp(&taosxRsp);
|
tDeleteSTaosxRsp(&taosxRsp);
|
||||||
|
@ -674,6 +786,55 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
SMqPollReq req = {0};
|
||||||
|
if (tDeserializeSMqPollReq(pMsg->pCont, pMsg->contLen, &req) < 0) {
|
||||||
|
tqError("tDeserializeSMqPollReq %d failed", pMsg->contLen);
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t consumerId = req.consumerId;
|
||||||
|
int32_t reqEpoch = req.epoch;
|
||||||
|
STqOffsetVal reqOffset = req.reqOffset;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
|
// 1. find handle
|
||||||
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
|
if (pHandle == NULL) {
|
||||||
|
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. check re-balance status
|
||||||
|
taosRLockLatch(&pTq->pushLock);
|
||||||
|
if (pHandle->consumerId != consumerId) {
|
||||||
|
tqDebug("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
|
||||||
|
consumerId, TD_VID(pTq->pVnode), req.subKey, pHandle->consumerId);
|
||||||
|
terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
|
||||||
|
taosRUnLockLatch(&pTq->pushLock);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->pushLock);
|
||||||
|
// 3. update the epoch value
|
||||||
|
int32_t savedEpoch = pHandle->epoch;
|
||||||
|
if (savedEpoch < reqEpoch) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||||
|
pHandle->epoch = reqEpoch;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
|
|
||||||
|
char buf[80];
|
||||||
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s, reqId:0x%" PRIx64,
|
||||||
|
consumerId, req.epoch, pHandle->subKey, vgId, buf, req.reqId);
|
||||||
|
|
||||||
|
return extractDataForMq(pTq, pHandle, &req, pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
SMqVDeleteReq* pReq = (SMqVDeleteReq*)msg;
|
||||||
|
|
||||||
|
@ -744,9 +905,9 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t
|
||||||
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||||
SMqRebVgReq req = {0};
|
SMqRebVgReq req = {0};
|
||||||
tDecodeSMqRebVgReq(msg, &req);
|
tDecodeSMqRebVgReq(msg, &req);
|
||||||
// todo lock
|
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq process sub req %s", pTq->pVnode->config.vgId, req.subKey);
|
tqDebug("vgId:%d, tq process sub req %s, Id:0x%" PRIx64 " -> Id:0x%" PRIx64, pTq->pVnode->config.vgId, req.subKey,
|
||||||
|
req.oldConsumerId, req.newConsumerId);
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||||
if (pHandle == NULL) {
|
if (pHandle == NULL) {
|
||||||
|
@ -754,11 +915,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
|
tqError("vgId:%d, build new consumer handle %s for consumer:0x%" PRIx64 ", but old consumerId is %" PRId64 "",
|
||||||
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (req.newConsumerId == -1) {
|
if (req.newConsumerId == -1) {
|
||||||
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
|
||||||
taosMemoryFree(req.qmsg);
|
taosMemoryFree(req.qmsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqHandle tqHandle = {0};
|
STqHandle tqHandle = {0};
|
||||||
pHandle = &tqHandle;
|
pHandle = &tqHandle;
|
||||||
/*taosInitRWLatch(&pExec->lock);*/
|
/*taosInitRWLatch(&pExec->lock);*/
|
||||||
|
@ -774,8 +937,10 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
// TODO version should be assigned and refed during preprocess
|
// TODO version should be assigned and refed during preprocess
|
||||||
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
SWalRef* pRef = walRefCommittedVer(pTq->pVnode->pWal);
|
||||||
if (pRef == NULL) {
|
if (pRef == NULL) {
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t ver = pRef->refVer;
|
int64_t ver = pRef->refVer;
|
||||||
pHandle->pRef = pRef;
|
pHandle->pRef = pRef;
|
||||||
|
|
||||||
|
@ -786,6 +951,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
.initTqReader = true,
|
.initTqReader = true,
|
||||||
.version = ver,
|
.version = ver,
|
||||||
};
|
};
|
||||||
|
|
||||||
pHandle->snapshotVer = ver;
|
pHandle->snapshotVer = ver;
|
||||||
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
|
@ -800,6 +966,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__DB) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
pHandle->execHandle.pExecReader = tqOpenReader(pTq->pVnode);
|
||||||
|
|
||||||
pHandle->execHandle.execDb.pFilterOutTbUid =
|
pHandle->execHandle.execDb.pFilterOutTbUid =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||||
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
buildSnapContext(handle.meta, handle.version, 0, pHandle->execHandle.subType, pHandle->fetchMeta,
|
||||||
|
@ -808,7 +975,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
pHandle->execHandle.task = qCreateQueueExecTaskInfo(NULL, &handle, NULL, NULL);
|
||||||
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
} else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||||
|
|
||||||
pHandle->execHandle.execTb.suid = req.suid;
|
pHandle->execHandle.execTb.suid = req.suid;
|
||||||
|
|
||||||
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
SArray* tbUidList = taosArrayInit(0, sizeof(int64_t));
|
||||||
|
@ -828,28 +994,45 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle));
|
||||||
tqDebug("try to persist handle %s consumer:0x%" PRIx64" , old consumer:0x%"PRIx64, req.subKey, pHandle->consumerId,
|
tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey,
|
||||||
oldConsumerId);
|
pHandle->consumerId, oldConsumerId);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO handle qmsg and exec modification
|
if (pHandle->consumerId == req.newConsumerId) { // do nothing
|
||||||
tqInfo("update the consumer info, old consumer id:0x%"PRIx64", new Id:0x%"PRIx64, pHandle->consumerId, req.newConsumerId);
|
tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId);
|
||||||
|
atomic_store_32(&pHandle->epoch, -1);
|
||||||
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
|
return tqMetaSaveHandle(pTq, req.subKey, pHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
|
||||||
|
req.newConsumerId);
|
||||||
|
|
||||||
|
taosWLockLatch(&pTq->pushLock);
|
||||||
atomic_store_32(&pHandle->epoch, -1);
|
atomic_store_32(&pHandle->epoch, -1);
|
||||||
|
|
||||||
|
// remove if it has been register in the push manager, and return one empty block to consumer
|
||||||
|
tqRemovePushEntry(pTq, req.subKey, (int32_t) strlen(req.subKey), pHandle->consumerId, true);
|
||||||
|
|
||||||
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
|
||||||
atomic_add_fetch_32(&pHandle->epoch, 1);
|
atomic_add_fetch_32(&pHandle->epoch, 1);
|
||||||
taosMemoryFree(req.qmsg);
|
|
||||||
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
qStreamCloseTsdbReader(pHandle->execHandle.task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) {
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
// close handle
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(req.qmsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -924,15 +1107,15 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
||||||
pTask->tbSink.vnode = pTq->pVnode;
|
pTask->tbSink.vnode = pTq->pVnode;
|
||||||
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline2;
|
||||||
|
|
||||||
int32_t version = 1;
|
int32_t ver1 = 1;
|
||||||
SMetaInfo info = {0};
|
SMetaInfo info = {0};
|
||||||
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
int32_t code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
version = info.skmVer;
|
ver1 = info.skmVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->tbSink.pTSchema =
|
pTask->tbSink.pTSchema =
|
||||||
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, version);
|
tBuildTSchema(pTask->tbSink.pSchemaWrapper->pSchema, pTask->tbSink.pSchemaWrapper->nCols, ver1);
|
||||||
if(pTask->tbSink.pTSchema == NULL) {
|
if(pTask->tbSink.pTSchema == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,11 +46,13 @@ static int32_t tqAddBlockSchemaToRsp(const STqExecHandle* pExec, STaosxRsp* pRsp
|
||||||
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
|
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, int32_t n) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
|
||||||
|
|
||||||
// TODO add reference to gurantee success
|
// TODO add reference to gurantee success
|
||||||
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
if (metaGetTableEntryByUidCache(&mr, uid) < 0) {
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < n; i++) {
|
for (int32_t i = 0; i < n; i++) {
|
||||||
char* tbName = taosStrdup(mr.me.name);
|
char* tbName = taosStrdup(mr.me.name);
|
||||||
taosArrayPush(pRsp->blockTbName, &tbName);
|
taosArrayPush(pRsp->blockTbName, &tbName);
|
||||||
|
@ -83,13 +85,16 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
tqDebug("vgId:%d, tmq task start to execute", pTq->pVnode->config.vgId);
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
tqDebug("vgId:%d, tmq task executed, get %p", pTq->pVnode->config.vgId, pDataBlock);
|
||||||
|
|
||||||
|
// current scan should be stopped asap, since the rebalance occurs.
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -99,7 +104,9 @@ int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffs
|
||||||
|
|
||||||
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
|
||||||
rowCnt += pDataBlock->info.rows;
|
rowCnt += pDataBlock->info.rows;
|
||||||
if (rowCnt >= 4096) break;
|
if (rowCnt >= 4096) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -283,7 +283,7 @@ int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||||
tdbTbcMoveToFirst(pCur);
|
tdbTbcMoveToFirst(pCur);
|
||||||
|
|
||||||
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||||
STqHandle handle;
|
STqHandle handle = {0};
|
||||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||||
tDecodeSTqHandle(&decoder, &handle);
|
tDecodeSTqHandle(&decoder, &handle);
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
|
@ -209,6 +209,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
|
||||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
|
@ -217,7 +218,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr);
|
||||||
if (numOfRegisteredPush > 0) {
|
if (numOfRegisteredPush > 0) {
|
||||||
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d",
|
||||||
pTq->pVnode->config.vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush);
|
||||||
|
|
||||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||||
|
@ -239,7 +240,10 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
pIter = taosHashIterate(pTq->pPushMgr, pIter);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
STqPushEntry* pPushEntry = *(STqPushEntry**)pIter;
|
||||||
|
|
||||||
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey));
|
||||||
|
@ -248,17 +252,16 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
SMqDataRsp* pRsp = pPushEntry->pDataRsp;
|
||||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
if (pRsp->reqOffset.version >= ver) {
|
||||||
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip", vgId,
|
||||||
|
pRsp->reqOffset.version, ver);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqExecHandle* pExec = &pHandle->execHandle;
|
STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
SMqDataRsp* pRsp = &pPushEntry->dataRsp;
|
|
||||||
|
|
||||||
// prepare scan mem data
|
// prepare scan mem data
|
||||||
SPackedData submit = {
|
SPackedData submit = {
|
||||||
.msgStr = data,
|
.msgStr = data,
|
||||||
|
@ -274,7 +277,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
SSDataBlock* pDataBlock = NULL;
|
SSDataBlock* pDataBlock = NULL;
|
||||||
uint64_t ts = 0;
|
uint64_t ts = 0;
|
||||||
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
tqDebug("vgId:%d, tq exec error since %s", pTq->pVnode->config.vgId, terrstr());
|
tqDebug("vgId:%d, tq exec error since %s", vgId, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
|
@ -285,11 +288,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
pRsp->blockNum++;
|
pRsp->blockNum++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", pTq->pVnode->config.vgId, pPushEntry->subKey,
|
tqDebug("vgId:%d, tq handle push, subkey:%s, block num:%d", vgId, pPushEntry->subKey, pRsp->blockNum);
|
||||||
pRsp->blockNum);
|
|
||||||
if (pRsp->blockNum > 0) {
|
if (pRsp->blockNum > 0) {
|
||||||
// set offset
|
// set offset
|
||||||
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
tqOffsetResetToLog(&pRsp->rspOffset, ver);
|
||||||
|
|
||||||
// remove from hash
|
// remove from hash
|
||||||
size_t kLen;
|
size_t kLen;
|
||||||
void* key = taosHashGetKey(pIter, &kLen);
|
void* key = taosHashGetKey(pIter, &kLen);
|
||||||
|
@ -311,6 +314,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
tqError("vgId:%d, tq push hash remove key error, key: %s", pTq->pVnode->config.vgId, (char*)key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(cachedKeys, (FDelete)taosMemoryFree);
|
||||||
taosArrayDestroy(cachedKeyLens);
|
taosArrayDestroy(cachedKeyLens);
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
|
@ -336,9 +340,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
};
|
};
|
||||||
|
|
||||||
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
|
tqDebug("tq copy write msg %p %d %" PRId64 " from %p", data, len, ver, pReq);
|
||||||
|
|
||||||
tqProcessSubmitReq(pTq, submit);
|
tqProcessSubmitReq(pTq, submit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (msgType == TDMT_VND_DELETE) {
|
if (msgType == TDMT_VND_DELETE) {
|
||||||
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
tqProcessDelReq(pTq, POINTER_SHIFT(msg, sizeof(SMsgHead)), msgLen - sizeof(SMsgHead), ver);
|
||||||
}
|
}
|
||||||
|
@ -346,3 +350,61 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqRegisterPushEntry(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg,
|
||||||
|
SMqDataRsp* pDataRsp, int32_t type) {
|
||||||
|
uint64_t consumerId = pRequest->consumerId;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
STqHandle* pTqHandle = pHandle;
|
||||||
|
|
||||||
|
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||||
|
if (pPushEntry == NULL) {
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", vgId:%d failed to malloc, size:%d", consumerId, vgId,
|
||||||
|
(int32_t)sizeof(STqPushEntry));
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pPushEntry->info = pRpcMsg->info;
|
||||||
|
memcpy(pPushEntry->subKey, pTqHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
|
||||||
|
|
||||||
|
if (type == TMQ_MSG_TYPE__TAOSX_RSP) {
|
||||||
|
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(STaosxRsp));
|
||||||
|
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(STaosxRsp));
|
||||||
|
} else if (type == TMQ_MSG_TYPE__POLL_RSP) {
|
||||||
|
pPushEntry->pDataRsp = taosMemoryCalloc(1, sizeof(SMqDataRsp));
|
||||||
|
memcpy(pPushEntry->pDataRsp, pDataRsp, sizeof(SMqDataRsp));
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqRspHead* pHead = &pPushEntry->pDataRsp->head;
|
||||||
|
pHead->consumerId = consumerId;
|
||||||
|
pHead->epoch = pRequest->epoch;
|
||||||
|
pHead->mqMsgType = type;
|
||||||
|
|
||||||
|
taosHashPut(pTq->pPushMgr, pTqHandle->subKey, strlen(pTqHandle->subKey), &pPushEntry, sizeof(void*));
|
||||||
|
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr, total:%d", consumerId,
|
||||||
|
pTqHandle->subKey, pDataRsp->reqOffset.version, vgId, taosHashGetSize(pTq->pPushMgr));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tqRemovePushEntry(STQ* pTq, const char* pKey, int32_t keyLen, uint64_t consumerId, bool rspConsumer) {
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
STqPushEntry** pEntry = taosHashGet(pTq->pPushMgr, pKey, keyLen);
|
||||||
|
|
||||||
|
if (pEntry != NULL) {
|
||||||
|
uint64_t cId = (*pEntry)->pDataRsp->head.consumerId;
|
||||||
|
ASSERT(consumerId == cId);
|
||||||
|
|
||||||
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s vgId:%d remove from push mgr, remains:%d", consumerId,
|
||||||
|
(*pEntry)->subKey, vgId, taosHashGetSize(pTq->pPushMgr) - 1);
|
||||||
|
|
||||||
|
if (rspConsumer) { // rsp the old consumer with empty block.
|
||||||
|
tqPushDataRsp(pTq, *pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashRemove(pTq->pPushMgr, pKey, keyLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
|
@ -183,14 +183,15 @@ end:
|
||||||
return tbSuid == realTbSuid;
|
return tbSuid == realTbSuid;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHead** ppCkHead) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
taosThreadMutexLock(&pHandle->pWalReader->mutex);
|
||||||
int64_t offset = *fetchOffset;
|
int64_t offset = *fetchOffset;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
if (walFetchHead(pHandle->pWalReader, offset, *ppCkHead) < 0) {
|
||||||
tqDebug("tmq poll: consumer:%" PRId64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", (epoch %d) vgId:%d offset %" PRId64 ", no more log to return",
|
||||||
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode), offset);
|
||||||
*fetchOffset = offset - 1;
|
*fetchOffset = offset - 1;
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -241,6 +242,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalCkHea
|
||||||
offset++;
|
offset++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
taosThreadMutexUnlock(&pHandle->pWalReader->mutex);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -307,7 +307,7 @@ static void setCreateDBResultIntoDataBlock(SSDataBlock* pBlock, char* dbName, ch
|
||||||
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
|
bool existLeaderRole(TAOS_ROW row, TAOS_FIELD* fields, int nFields) {
|
||||||
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
|
// vgroup_id | db_name | tables | v1_dnode | v1_status | v2_dnode | v2_status | v3_dnode | v3_status | v4_dnode |
|
||||||
// v4_status | cacheload | tsma |
|
// v4_status | cacheload | tsma |
|
||||||
if (nFields != 13) {
|
if (nFields != 14) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -221,12 +221,12 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* readers, int32_t* numOfCols, SSchemaWrapper** pSchema) {
|
||||||
if (msg == NULL) {
|
if (msg == NULL) {
|
||||||
// create raw scan
|
// create raw scan
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||||
if (NULL == pTaskInfo) {
|
if (NULL == pTaskInfo) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
|
|
||||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||||
|
@ -715,7 +715,6 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
||||||
|
|
||||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||||
|
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
}
|
}
|
||||||
|
@ -723,7 +722,6 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) {
|
||||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
setTaskKilled(pTaskInfo, rspCode);
|
setTaskKilled(pTaskInfo, rspCode);
|
||||||
|
|
||||||
qStopTaskOperators(pTaskInfo);
|
qStopTaskOperators(pTaskInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -2774,11 +2774,17 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
|
||||||
}
|
}
|
||||||
|
|
||||||
void qStreamCloseTsdbReader(void* task) {
|
void qStreamCloseTsdbReader(void* task) {
|
||||||
if (task == NULL) return;
|
if (task == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)task;
|
||||||
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
SOperatorInfo* pOp = pTaskInfo->pRoot;
|
||||||
qDebug("stream close tsdb reader, reset status uid %" PRId64 " ts %" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
|
|
||||||
|
qDebug("stream close tsdb reader, reset status uid:%" PRId64 " ts:%" PRId64, pTaskInfo->streamInfo.lastStatus.uid,
|
||||||
pTaskInfo->streamInfo.lastStatus.ts);
|
pTaskInfo->streamInfo.lastStatus.ts);
|
||||||
|
|
||||||
|
// todo refactor, other thread may already use this read to extract data.
|
||||||
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
pTaskInfo->streamInfo.lastStatus = (STqOffsetVal){0};
|
||||||
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
while (pOp->numOfDownstream == 1 && pOp->pDownstream[0]) {
|
||||||
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
SOperatorInfo* pDownstreamOp = pOp->pDownstream[0];
|
||||||
|
@ -2786,8 +2792,19 @@ void qStreamCloseTsdbReader(void* task) {
|
||||||
SStreamScanInfo* pInfo = pDownstreamOp->info;
|
SStreamScanInfo* pInfo = pDownstreamOp->info;
|
||||||
if (pInfo->pTableScanOp) {
|
if (pInfo->pTableScanOp) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
|
|
||||||
|
setOperatorCompleted(pInfo->pTableScanOp);
|
||||||
|
while(pTaskInfo->owner != 0) {
|
||||||
|
taosMsleep(100);
|
||||||
|
qDebug("wait for the reader stopping");
|
||||||
|
}
|
||||||
|
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
|
// restore the status, todo refactor.
|
||||||
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
|
pTaskInfo->status = TASK_NOT_COMPLETED;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -751,7 +751,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||||
if (result) {
|
if (result || (pOperator->status == OP_EXEC_DONE)) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -985,6 +985,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
||||||
pTableScanInfo->scanTimes = 0;
|
pTableScanInfo->scanTimes = 0;
|
||||||
pTableScanInfo->currentGroupId = -1;
|
pTableScanInfo->currentGroupId = -1;
|
||||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||||
|
qDebug("1");
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1143,6 +1144,7 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
|
||||||
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
tsdbReaderClose(pTableScanInfo->base.dataReader);
|
||||||
|
qDebug("2");
|
||||||
pTableScanInfo->base.dataReader = NULL;
|
pTableScanInfo->base.dataReader = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1616,6 +1618,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (!pTaskInfo->streamInfo.returned) {
|
if (!pTaskInfo->streamInfo.returned) {
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
|
qDebug("3");
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
|
||||||
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
|
||||||
|
@ -1767,6 +1770,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
|
qDebug("4");
|
||||||
|
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
pInfo->pTableScanOp->status = OP_OPENED;
|
pInfo->pTableScanOp->status = OP_OPENED;
|
||||||
|
|
||||||
|
@ -1837,6 +1842,8 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE;
|
||||||
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
|
||||||
tsdbReaderClose(pTSInfo->base.dataReader);
|
tsdbReaderClose(pTSInfo->base.dataReader);
|
||||||
|
qDebug("5");
|
||||||
|
|
||||||
pTSInfo->base.dataReader = NULL;
|
pTSInfo->base.dataReader = NULL;
|
||||||
|
|
||||||
pTSInfo->base.cond.startVersion = -1;
|
pTSInfo->base.cond.startVersion = -1;
|
||||||
|
@ -2635,6 +2642,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("8");
|
||||||
|
|
||||||
tsdbReaderClose(pInfo->base.dataReader);
|
tsdbReaderClose(pInfo->base.dataReader);
|
||||||
pInfo->base.dataReader = NULL;
|
pInfo->base.dataReader = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -29,7 +29,7 @@ class TDTestCase:
|
||||||
self.master_dnode = self.TDDnodes.dnodes[0]
|
self.master_dnode = self.TDDnodes.dnodes[0]
|
||||||
self.host=self.master_dnode.cfgDict["fqdn"]
|
self.host=self.master_dnode.cfgDict["fqdn"]
|
||||||
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
||||||
tdSql.init(conn1.cursor())
|
tdSql.init(conn1.cursor(), True)
|
||||||
|
|
||||||
|
|
||||||
def getBuildPath(self):
|
def getBuildPath(self):
|
||||||
|
|
|
@ -123,8 +123,9 @@ class TDTestCase:
|
||||||
pre_insert = "insert into "
|
pre_insert = "insert into "
|
||||||
sql = pre_insert
|
sql = pre_insert
|
||||||
|
|
||||||
t = time.time()
|
# t = 1678609778776 #time.time()
|
||||||
startTs = int(round(t * 1000))
|
t = 1600000000000
|
||||||
|
startTs = t #int(round(t * 1000))
|
||||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||||
for i in range(ctbNum):
|
for i in range(ctbNum):
|
||||||
sql += " %s_%d values "%(stbName,i)
|
sql += " %s_%d values "%(stbName,i)
|
||||||
|
|
|
@ -386,7 +386,7 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||||
pInfo->numOfVgroups++;
|
pInfo->numOfVgroups++;
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
taosFprintfFile(g_fp, "consume id %d, add new vgroupId:%d\n", pInfo->consumerId, vgroupId);
|
||||||
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||||
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId,
|
||||||
pInfo->numOfVgroups, vgroupId);
|
pInfo->numOfVgroups, vgroupId);
|
||||||
|
@ -578,18 +578,25 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
|
|
||||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
|
||||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||||
const char* dbName = tmq_get_db_name(msg);
|
const char* dbName = tmq_get_db_name(msg);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
|
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
|
||||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
int32_t index = 0;
|
||||||
tmq_get_topic_name(msg), vgroupId);
|
for (index = 0; index < pInfo->numOfVgroups; index++) {
|
||||||
|
if (vgroupId == pInfo->rowsOfPerVgroups[index][0]) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId:%d, currentRows:%d\n", dbName != NULL ? dbName : "invalid table",
|
||||||
|
tmq_get_topic_name(msg), vgroupId, pInfo->rowsOfPerVgroups[index][1]);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
if (row == NULL) {
|
||||||
if (row == NULL) break;
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||||
int32_t numOfFields = taos_field_count(msg);
|
int32_t numOfFields = taos_field_count(msg);
|
||||||
|
@ -607,7 +614,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
||||||
|
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
|
@ -621,7 +627,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
}
|
}
|
||||||
|
|
||||||
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||||
|
|
||||||
return totalRows;
|
return totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -730,9 +735,7 @@ void build_consumer(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
|
||||||
tmq_conf_destroy(conf);
|
tmq_conf_destroy(conf);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -817,7 +820,6 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
|
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
|
|
Loading…
Reference in New Issue