fix(tmq): set config
This commit is contained in:
parent
e839c432b8
commit
d7e34a6642
|
@ -61,7 +61,7 @@ int32_t init_env() {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes =
|
pRes =
|
||||||
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)");
|
taos_query(pConn, "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -106,8 +106,8 @@ int32_t create_topic() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
|
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");
|
||||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -185,6 +185,7 @@ typedef struct {
|
||||||
int32_t async;
|
int32_t async;
|
||||||
tsem_t rspSem;
|
tsem_t rspSem;
|
||||||
tmq_resp_err_t rspErr;
|
tmq_resp_err_t rspErr;
|
||||||
|
SArray* offsets;
|
||||||
} SMqCommitCbParam;
|
} SMqCommitCbParam;
|
||||||
|
|
||||||
tmq_conf_t* tmq_conf_new() {
|
tmq_conf_t* tmq_conf_new() {
|
||||||
|
@ -246,10 +247,13 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
if (strcmp(key, "msg.with.table.name") == 0) {
|
if (strcmp(key, "msg.with.table.name") == 0) {
|
||||||
if (strcmp(value, "true") == 0) {
|
if (strcmp(value, "true") == 0) {
|
||||||
conf->withTbName = 1;
|
conf->withTbName = 1;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
} else if (strcmp(value, "false") == 0) {
|
} else if (strcmp(value, "false") == 0) {
|
||||||
conf->withTbName = 0;
|
conf->withTbName = 0;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
} else if (strcmp(value, "none") == 0) {
|
} else if (strcmp(value, "none") == 0) {
|
||||||
conf->withTbName = -1;
|
conf->withTbName = -1;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
} else {
|
} else {
|
||||||
return TMQ_CONF_INVALID;
|
return TMQ_CONF_INVALID;
|
||||||
}
|
}
|
||||||
|
@ -395,6 +399,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
if (!pParam->async)
|
if (!pParam->async)
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
else {
|
else {
|
||||||
|
if (pParam->offsets) {
|
||||||
|
taosArrayDestroy(pParam->offsets);
|
||||||
|
}
|
||||||
tsem_destroy(&pParam->rspSem);
|
tsem_destroy(&pParam->rspSem);
|
||||||
/*if (pParam->pArray) {*/
|
/*if (pParam->pArray) {*/
|
||||||
/*taosArrayDestroy(pParam->pArray);*/
|
/*taosArrayDestroy(pParam->pArray);*/
|
||||||
|
@ -540,10 +547,10 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
||||||
// build msg
|
// build msg
|
||||||
// send to mnode
|
// send to mnode
|
||||||
SMqCMCommitOffsetReq req;
|
SMqCMCommitOffsetReq req;
|
||||||
SArray* pArray = NULL;
|
SArray* pOffsets = NULL;
|
||||||
|
|
||||||
if (offsets == NULL) {
|
if (offsets == NULL) {
|
||||||
pArray = taosArrayInit(0, sizeof(SMqOffset));
|
pOffsets = taosArrayInit(0, sizeof(SMqOffset));
|
||||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||||
|
@ -553,11 +560,11 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
||||||
strcpy(offset.cgroup, tmq->groupId);
|
strcpy(offset.cgroup, tmq->groupId);
|
||||||
offset.vgId = pVg->vgId;
|
offset.vgId = pVg->vgId;
|
||||||
offset.offset = pVg->currentOffset;
|
offset.offset = pVg->currentOffset;
|
||||||
taosArrayPush(pArray, &offset);
|
taosArrayPush(pOffsets, &offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
req.num = pArray->size;
|
req.num = pOffsets->size;
|
||||||
req.offsets = pArray->pData;
|
req.offsets = pOffsets->pData;
|
||||||
} else {
|
} else {
|
||||||
req.num = taosArrayGetSize(&offsets->container);
|
req.num = taosArrayGetSize(&offsets->container);
|
||||||
req.offsets = (SMqOffset*)offsets->container.pData;
|
req.offsets = (SMqOffset*)offsets->container.pData;
|
||||||
|
@ -591,6 +598,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
tsem_init(&pParam->rspSem, 0, 0);
|
tsem_init(&pParam->rspSem, 0, 0);
|
||||||
pParam->async = async;
|
pParam->async = async;
|
||||||
|
pParam->offsets = pOffsets;
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){
|
pRequest->body.requestMsg = (SDataBuf){
|
||||||
.pData = buf,
|
.pData = buf,
|
||||||
|
@ -613,8 +621,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
|
||||||
tsem_destroy(&pParam->rspSem);
|
tsem_destroy(&pParam->rspSem);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
||||||
if (pArray) {
|
if (pOffsets) {
|
||||||
taosArrayDestroy(pArray);
|
taosArrayDestroy(pOffsets);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1015,7 +1023,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
atomic_store_32(&tmq->epSkipCnt, 0);
|
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||||
#endif
|
#endif
|
||||||
int32_t tlen = sizeof(SMqAskEpReq);
|
int32_t tlen = sizeof(SMqAskEpReq);
|
||||||
SMqAskEpReq* req = taosMemoryMalloc(tlen);
|
SMqAskEpReq* req = taosMemoryCalloc(1, tlen);
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tscError("failed to malloc get subscribe ep buf");
|
tscError("failed to malloc get subscribe ep buf");
|
||||||
/*atomic_store_8(&tmq->epStatus, 0);*/
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
|
@ -1025,7 +1033,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||||
req->epoch = htonl(tmq->epoch);
|
req->epoch = htonl(tmq->epoch);
|
||||||
strcpy(req->cgroup, tmq->groupId);
|
strcpy(req->cgroup, tmq->groupId);
|
||||||
|
|
||||||
SMqAskEpCbParam* pParam = taosMemoryMalloc(sizeof(SMqAskEpCbParam));
|
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
tscError("failed to malloc subscribe param");
|
tscError("failed to malloc subscribe param");
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
|
@ -1107,7 +1115,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
|
||||||
reqOffset = tmq->resetOffsetCfg;
|
reqOffset = tmq->resetOffsetCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqPollReq* pReq = taosMemoryMalloc(sizeof(SMqPollReq));
|
SMqPollReq* pReq = taosMemoryCalloc(1, sizeof(SMqPollReq));
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -559,6 +559,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
// db subscribe
|
// db subscribe
|
||||||
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
|
||||||
|
rsp.withSchema = 1;
|
||||||
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
STqReadHandle* pReader = pExec->pExecReader[workerId];
|
||||||
tqReadHandleSetMsg(pReader, pCont, 0);
|
tqReadHandleSetMsg(pReader, pCont, 0);
|
||||||
while (tqNextDataBlock(pReader)) {
|
while (tqNextDataBlock(pReader)) {
|
||||||
|
|
Loading…
Reference in New Issue