Merge remote-tracking branch 'origin/3.0' into fix/dnode
This commit is contained in:
commit
725baf4a7c
|
@ -76,19 +76,41 @@ int32_t init_env() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
|
pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
|
printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "create table if not exists ct1 using st1 tags(2000)");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
|
pRes = taos_query(pConn, "create table if not exists ct3 using st1 tags(3000)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create child table tu3, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c')");
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,6 +190,9 @@ tmq_t* build_consumer() {
|
||||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||||
|
|
||||||
|
tmq_conf_set(conf, "experiment.use.snapshot", "true");
|
||||||
|
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
assert(tmq);
|
assert(tmq);
|
||||||
|
|
|
@ -116,8 +116,8 @@ typedef struct SQueryTableDataCond {
|
||||||
int32_t type; // data block load type:
|
int32_t type; // data block load type:
|
||||||
int32_t numOfTWindows;
|
int32_t numOfTWindows;
|
||||||
STimeWindow* twindows;
|
STimeWindow* twindows;
|
||||||
int32_t startVersion;
|
int64_t startVersion;
|
||||||
int32_t endVersion;
|
int64_t endVersion;
|
||||||
} SQueryTableDataCond;
|
} SQueryTableDataCond;
|
||||||
|
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
|
|
@ -696,12 +696,12 @@ typedef struct {
|
||||||
|
|
||||||
typedef STableCfg STableCfgRsp;
|
typedef STableCfg STableCfgRsp;
|
||||||
|
|
||||||
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
|
int32_t tSerializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq);
|
||||||
int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
|
int32_t tDeserializeSTableCfgReq(void* buf, int32_t bufLen, STableCfgReq* pReq);
|
||||||
|
|
||||||
int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
|
int32_t tSerializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp);
|
||||||
int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
|
int32_t tDeserializeSTableCfgRsp(void* buf, int32_t bufLen, STableCfgRsp* pRsp);
|
||||||
void tFreeSTableCfgRsp(STableCfgRsp *pRsp);
|
void tFreeSTableCfgRsp(STableCfgRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
@ -2677,6 +2677,7 @@ typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
int8_t useSnapshot;
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
uint64_t reqId;
|
uint64_t reqId;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
|
|
|
@ -36,11 +36,13 @@ typedef struct SReadHandle {
|
||||||
void* vnode;
|
void* vnode;
|
||||||
void* mnd;
|
void* mnd;
|
||||||
SMsgCb* pMsgCb;
|
SMsgCb* pMsgCb;
|
||||||
|
int8_t initTsdbReader;
|
||||||
} SReadHandle;
|
} SReadHandle;
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
STREAM_DATA_TYPE_SUBMIT_BLOCK = 1,
|
STREAM_DATA_TYPE_SUBMIT_BLOCK = 1,
|
||||||
STREAM_DATA_TYPE_SSDATA_BLOCK = 2,
|
STREAM_DATA_TYPE_SSDATA_BLOCK = 2,
|
||||||
|
STREAM_DATA_TYPE_FROM_SNAPSHOT = 3,
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -56,6 +58,13 @@ typedef enum {
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
|
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Switch the stream scan to snapshot mode
|
||||||
|
* @param tinfo
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the input data block for the stream scan.
|
* Set the input data block for the stream scan.
|
||||||
* @param tinfo
|
* @param tinfo
|
||||||
|
|
|
@ -54,6 +54,7 @@ struct tmq_conf_t {
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int8_t resetOffset;
|
int8_t resetOffset;
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
int8_t useSnapshot;
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
char* ip;
|
char* ip;
|
||||||
|
@ -69,6 +70,7 @@ struct tmq_t {
|
||||||
char groupId[TSDB_CGROUP_LEN];
|
char groupId[TSDB_CGROUP_LEN];
|
||||||
char clientId[256];
|
char clientId[256];
|
||||||
int8_t withTbName;
|
int8_t withTbName;
|
||||||
|
int8_t useSnapshot;
|
||||||
int8_t autoCommit;
|
int8_t autoCommit;
|
||||||
int32_t autoCommitInterval;
|
int32_t autoCommitInterval;
|
||||||
int32_t resetOffsetCfg;
|
int32_t resetOffsetCfg;
|
||||||
|
@ -282,6 +284,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (strcmp(key, "experiment.use.snapshot") == 0) {
|
||||||
|
if (strcmp(value, "true") == 0) {
|
||||||
|
conf->useSnapshot = true;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
} else if (strcmp(value, "false") == 0) {
|
||||||
|
conf->useSnapshot = false;
|
||||||
|
return TMQ_CONF_OK;
|
||||||
|
} else {
|
||||||
|
return TMQ_CONF_INVALID;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (strcmp(key, "td.connect.ip") == 0) {
|
if (strcmp(key, "td.connect.ip") == 0) {
|
||||||
conf->ip = strdup(value);
|
conf->ip = strdup(value);
|
||||||
return TMQ_CONF_OK;
|
return TMQ_CONF_OK;
|
||||||
|
@ -953,6 +967,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
pTmq->withTbName = conf->withTbName;
|
pTmq->withTbName = conf->withTbName;
|
||||||
|
pTmq->useSnapshot = conf->useSnapshot;
|
||||||
pTmq->autoCommit = conf->autoCommit;
|
pTmq->autoCommit = conf->autoCommit;
|
||||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||||
pTmq->commitCb = conf->commitCb;
|
pTmq->commitCb = conf->commitCb;
|
||||||
|
@ -1534,6 +1549,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic*
|
||||||
pReq->currentOffset = reqOffset;
|
pReq->currentOffset = reqOffset;
|
||||||
pReq->reqId = generateRequestId();
|
pReq->reqId = generateRequestId();
|
||||||
|
|
||||||
|
pReq->useSnapshot = tmq->useSnapshot;
|
||||||
|
|
||||||
pReq->head.vgId = htonl(pVg->vgId);
|
pReq->head.vgId = htonl(pVg->vgId);
|
||||||
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
pReq->head.contLen = htonl(sizeof(SMqPollReq));
|
||||||
return pReq;
|
return pReq;
|
||||||
|
|
|
@ -509,7 +509,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
mndAddTaskToTaskSet(taskOneLevel, pTask);
|
mndAddTaskToTaskSet(taskOneLevel, pTask);
|
||||||
|
|
||||||
// input
|
// source
|
||||||
pTask->isDataScan = 1;
|
pTask->isDataScan = 1;
|
||||||
|
|
||||||
// trigger
|
// trigger
|
||||||
|
|
|
@ -150,6 +150,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
|
||||||
|
|
||||||
// tqExec
|
// tqExec
|
||||||
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
|
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId);
|
||||||
|
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId);
|
||||||
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp);
|
int32_t tqSendPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataBlkRsp* pRsp);
|
||||||
|
|
||||||
// tqMeta
|
// tqMeta
|
||||||
|
|
|
@ -227,19 +227,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
|
||||||
if (pHeadWithCkSum == NULL) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
|
||||||
|
|
||||||
SMqDataBlkRsp rsp = {0};
|
SMqDataBlkRsp rsp = {0};
|
||||||
rsp.reqOffset = pReq->currentOffset;
|
rsp.reqOffset = pReq->currentOffset;
|
||||||
|
|
||||||
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
rsp.blockData = taosArrayInit(0, sizeof(void*));
|
||||||
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
|
||||||
|
|
||||||
|
if (rsp.blockData == NULL || rsp.blockDataLen == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
rsp.withTbName = pReq->withTbName;
|
rsp.withTbName = pReq->withTbName;
|
||||||
if (rsp.withTbName) {
|
if (rsp.withTbName) {
|
||||||
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
|
||||||
|
@ -253,6 +250,30 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
rsp.blockSchema = taosArrayInit(0, sizeof(void*));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
if (pReq->useSnapshot) {
|
||||||
|
tqInfo("retrieve using snapshot");
|
||||||
|
int64_t lastVer = walGetCommittedVer(pTq->pWal);
|
||||||
|
if (rsp.reqOffset < lastVer) {
|
||||||
|
tqScanSnapshot(pTq, &pHandle->execHandle, &rsp, workerId);
|
||||||
|
|
||||||
|
if (rsp.blockNum != 0) {
|
||||||
|
rsp.withTbName = false;
|
||||||
|
rsp.rspOffset = lastVer;
|
||||||
|
tqInfo("direct send by snapshot rsp offset %ld", lastVer);
|
||||||
|
goto SEND_RSP;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
SWalHead* pHeadWithCkSum = taosMemoryMalloc(sizeof(SWalHead) + 2048);
|
||||||
|
if (pHeadWithCkSum == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
|
@ -292,6 +313,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
metaRsp.metaRsp = pHead->body;
|
metaRsp.metaRsp = pHead->body;
|
||||||
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
if (tqSendMetaPollRsp(pTq, pMsg, pReq, &metaRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
goto OVER;
|
||||||
}
|
}
|
||||||
code = 0;
|
code = 0;
|
||||||
goto OVER;
|
goto OVER;
|
||||||
|
@ -308,6 +330,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
|
|
||||||
taosMemoryFree(pHeadWithCkSum);
|
taosMemoryFree(pHeadWithCkSum);
|
||||||
|
|
||||||
|
SEND_RSP:
|
||||||
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockData) == rsp.blockNum);
|
||||||
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
ASSERT(taosArrayGetSize(rsp.blockDataLen) == rsp.blockNum);
|
||||||
if (rsp.withSchema) {
|
if (rsp.withSchema) {
|
||||||
|
@ -376,6 +399,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SReadHandle handle = {
|
SReadHandle handle = {
|
||||||
.reader = pHandle->execHandle.pExecReader[i],
|
.reader = pHandle->execHandle.pExecReader[i],
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
|
.vnode = pTq->pVnode,
|
||||||
|
.initTsdbReader = 1,
|
||||||
};
|
};
|
||||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||||
|
@ -448,6 +473,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
.reader = pStreamReader,
|
.reader = pStreamReader,
|
||||||
.meta = pTq->pVnode->pMeta,
|
.meta = pTq->pVnode->pMeta,
|
||||||
.vnode = pTq->pVnode,
|
.vnode = pTq->pVnode,
|
||||||
|
.initTsdbReader = 1,
|
||||||
};
|
};
|
||||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||||
|
|
|
@ -60,6 +60,30 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, const STqExecHandle* pExec, SMqD
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataBlkRsp* pRsp, int32_t workerId) {
|
||||||
|
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||||
|
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||||
|
if (qStreamScanSnapshot(task) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
while (1) {
|
||||||
|
SSDataBlock* pDataBlock = NULL;
|
||||||
|
uint64_t ts = 0;
|
||||||
|
if (qExecTask(task, &pDataBlock, &ts) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
if (pDataBlock == NULL) break;
|
||||||
|
|
||||||
|
ASSERT(pDataBlock->info.rows != 0);
|
||||||
|
ASSERT(pDataBlock->info.numOfCols != 0);
|
||||||
|
|
||||||
|
tqAddBlockDataToRsp(pDataBlock, pRsp);
|
||||||
|
pRsp->blockNum++;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) {
|
int32_t tqDataExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataBlkRsp* pRsp, int32_t workerId) {
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||||
|
|
|
@ -67,6 +67,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock);
|
||||||
taosArrayPush(pInfo->pBlockLists, &p);
|
taosArrayPush(pInfo->pBlockLists, &p);
|
||||||
}
|
}
|
||||||
|
} else if (type == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
|
||||||
|
// do nothing
|
||||||
|
ASSERT(pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
@ -75,6 +78,14 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
||||||
|
if (tinfo == NULL) {
|
||||||
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
}
|
||||||
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
|
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
||||||
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
return qSetMultiStreamInput(tinfo, input, 1, type, assignUid);
|
||||||
}
|
}
|
||||||
|
@ -106,14 +117,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// print those info into log
|
|
||||||
#if 0
|
|
||||||
pMsg->sId = pMsg->sId;
|
|
||||||
pMsg->queryId = pMsg->queryId;
|
|
||||||
pMsg->taskId = pMsg->taskId;
|
|
||||||
pMsg->contentLen = pMsg->contentLen;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/*qDebugL("stream task string %s", (const char*)msg);*/
|
/*qDebugL("stream task string %s", (const char*)msg);*/
|
||||||
|
|
||||||
struct SSubplan* plan = NULL;
|
struct SSubplan* plan = NULL;
|
||||||
|
|
|
@ -4041,16 +4041,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
|
|
||||||
if (pHandle) {
|
if (pHandle) {
|
||||||
if (pHandle->vnode) {
|
if (pHandle->initTsdbReader) {
|
||||||
// for stram
|
// for stream
|
||||||
|
ASSERT(pHandle->vnode);
|
||||||
pDataReader =
|
pDataReader =
|
||||||
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
doCreateDataReader(pTableScanNode, pHandle, pTableListInfo, (uint64_t)queryId, taskId);
|
||||||
} else {
|
} else {
|
||||||
// for tq
|
// for tq
|
||||||
|
ASSERT(pHandle->meta);
|
||||||
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
|
getTableList(pHandle->meta, pScanPhyNode, pTableListInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
qDebug("%s pDataReader is NULL", GET_TASKID(pTaskInfo));
|
||||||
// return NULL;
|
// return NULL;
|
||||||
|
|
|
@ -928,7 +928,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||||
blockDataUpdateTsWindow(pBlock, 0);
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
return pBlock;
|
return pBlock;
|
||||||
} else {
|
} else if (pInfo->blockType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
|
||||||
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
if (pInfo->scanMode == STREAM_SCAN_FROM_RES) {
|
||||||
blockDataDestroy(pInfo->pUpdateRes);
|
blockDataDestroy(pInfo->pUpdateRes);
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||||
|
@ -1062,6 +1062,15 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
} else if (pInfo->blockType == STREAM_DATA_TYPE_FROM_SNAPSHOT) {
|
||||||
|
SSDataBlock* pResult = doTableScan(pInfo->pOperatorDumy);
|
||||||
|
if (pResult) {
|
||||||
|
return pResult->info.rows > 0 ? pResult : NULL;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -80,7 +80,6 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
}
|
}
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
/*qRes->sourceVg = pTask->nodeId;*/
|
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
|
|
|
@ -109,6 +109,7 @@
|
||||||
./test.sh -f tsim/tmq/basic4Of2Cons.sim
|
./test.sh -f tsim/tmq/basic4Of2Cons.sim
|
||||||
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
|
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
|
||||||
./test.sh -f tsim/tmq/topic.sim
|
./test.sh -f tsim/tmq/topic.sim
|
||||||
|
./test.sh -f tsim/tmq/snapshot.sim
|
||||||
|
|
||||||
# --- stable
|
# --- stable
|
||||||
./test.sh -f tsim/stable/disk.sim
|
./test.sh -f tsim/stable/disk.sim
|
||||||
|
|
|
@ -17,8 +17,9 @@ VALGRIND=0
|
||||||
SIGNAL=SIGINT
|
SIGNAL=SIGINT
|
||||||
SHOW_MSG=0
|
SHOW_MSG=0
|
||||||
SHOW_ROW=0
|
SHOW_ROW=0
|
||||||
|
EXP_USE_SNAPSHOT=0
|
||||||
|
|
||||||
while getopts "d:s:v:y:x:g:r:w:" arg
|
while getopts "d:s:v:y:x:g:r:w:e:" arg
|
||||||
do
|
do
|
||||||
case $arg in
|
case $arg in
|
||||||
d)
|
d)
|
||||||
|
@ -45,6 +46,9 @@ do
|
||||||
w)
|
w)
|
||||||
CDB_NAME=$OPTARG
|
CDB_NAME=$OPTARG
|
||||||
;;
|
;;
|
||||||
|
e)
|
||||||
|
EXP_USE_SNAPSHOT=$OPTARG
|
||||||
|
;;
|
||||||
?)
|
?)
|
||||||
echo "unkown argument"
|
echo "unkown argument"
|
||||||
;;
|
;;
|
||||||
|
@ -91,8 +95,8 @@ if [ "$EXEC_OPTON" = "start" ]; then
|
||||||
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
|
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
|
||||||
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
|
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
|
||||||
else
|
else
|
||||||
echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &"
|
echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME -e $EXP_USE_SNAPSHOT > /dev/null 2>&1 &"
|
||||||
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &
|
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME -e $EXP_USE_SNAPSHOT > /dev/null 2>&1 &
|
||||||
fi
|
fi
|
||||||
else
|
else
|
||||||
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
|
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
|
||||||
|
|
|
@ -0,0 +1,289 @@
|
||||||
|
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||||
|
#basic1.sim: vgroups=1, one topic for one consumer, firstly insert data, then start consume. Include six topics
|
||||||
|
#basic2.sim: vgroups=1, multi topics for one consumer, firstly insert data, then start consume. Include six topics
|
||||||
|
#basic3.sim: vgroups=4, one topic for one consumer, firstly insert data, then start consume. Include six topics
|
||||||
|
#basic4.sim: vgroups=4, multi topics for one consumer, firstly insert data, then start consume. Include six topics
|
||||||
|
|
||||||
|
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||||
|
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||||
|
#
|
||||||
|
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||||
|
#
|
||||||
|
|
||||||
|
run tsim/tmq/prepareBasicEnv-1vgrp.sim
|
||||||
|
|
||||||
|
#---- global parameters start ----#
|
||||||
|
$dbName = db
|
||||||
|
$vgroups = 1
|
||||||
|
$stbPrefix = stb
|
||||||
|
$ctbPrefix = ctb
|
||||||
|
$ntbPrefix = ntb
|
||||||
|
$stbNum = 1
|
||||||
|
$ctbNum = 10
|
||||||
|
$ntbNum = 10
|
||||||
|
$rowsPerCtb = 10
|
||||||
|
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
#---- global parameters end ----#
|
||||||
|
|
||||||
|
$pullDelay = 3
|
||||||
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
|
$showMsg = 1
|
||||||
|
$showRow = 0
|
||||||
|
|
||||||
|
sql connect
|
||||||
|
sql use $dbName
|
||||||
|
|
||||||
|
print == create topics from super table
|
||||||
|
sql create topic topic_stb_column as select ts, c3 from stb
|
||||||
|
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||||
|
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||||
|
|
||||||
|
print == create topics from child table
|
||||||
|
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||||
|
sql create topic topic_ctb_all as select * from ctb0
|
||||||
|
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||||
|
|
||||||
|
print == create topics from normal table
|
||||||
|
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||||
|
sql create topic topic_ntb_all as select * from ntb0
|
||||||
|
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
|
|
||||||
|
#sql show topics
|
||||||
|
#if $rows != 9 then
|
||||||
|
# return -1
|
||||||
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
|
|
||||||
|
$cdb_index = 0
|
||||||
|
#=============================== start consume =============================#
|
||||||
|
|
||||||
|
print ================ test consume from stb
|
||||||
|
$loop_cnt = 0
|
||||||
|
loop_consume_diff_topic_from_stb:
|
||||||
|
|
||||||
|
#######################################################################################
|
||||||
|
# clear consume info and consume result
|
||||||
|
#run tsim/tmq/clearConsume.sim
|
||||||
|
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||||
|
$cdb_index = $cdb_index + 1
|
||||||
|
$cdbName = cdb . $cdb_index
|
||||||
|
sql create database $cdbName vgroups 1
|
||||||
|
sleep 500
|
||||||
|
sql use $cdbName
|
||||||
|
|
||||||
|
print == create consume info table and consume result table
|
||||||
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
|
sql show tables
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
#######################################################################################
|
||||||
|
|
||||||
|
if $loop_cnt == 0 then
|
||||||
|
print == scenario 1: topic_stb_column
|
||||||
|
$topicList = ' . topic_stb_column
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 1 then
|
||||||
|
print == scenario 2: topic_stb_all
|
||||||
|
$topicList = ' . topic_stb_all
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 2 then
|
||||||
|
print == scenario 3: topic_stb_function
|
||||||
|
$topicList = ' . topic_stb_function
|
||||||
|
$topicList = $topicList . '
|
||||||
|
else
|
||||||
|
goto loop_consume_diff_topic_from_stb_end
|
||||||
|
endi
|
||||||
|
|
||||||
|
$consumerId = 0
|
||||||
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
|
$expectmsgcnt = 1
|
||||||
|
$expectrowcnt = 100
|
||||||
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
print == start consumer to pull msgs from stb
|
||||||
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start
|
||||||
|
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -e 1 -s start
|
||||||
|
|
||||||
|
print == check consume result
|
||||||
|
wait_consumer_end_from_stb:
|
||||||
|
sql select * from consumeresult
|
||||||
|
print ==> rows: $rows
|
||||||
|
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||||
|
if $rows != 1 then
|
||||||
|
sleep 1000
|
||||||
|
goto wait_consumer_end_from_stb
|
||||||
|
endi
|
||||||
|
if $data[0][1] != $consumerId then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][2] != $expectmsgcnt then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][3] != $expectrowcnt then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
$loop_cnt = $loop_cnt + 1
|
||||||
|
goto loop_consume_diff_topic_from_stb
|
||||||
|
loop_consume_diff_topic_from_stb_end:
|
||||||
|
|
||||||
|
print ================ test consume from ctb
|
||||||
|
$loop_cnt = 0
|
||||||
|
loop_consume_diff_topic_from_ctb:
|
||||||
|
|
||||||
|
#######################################################################################
|
||||||
|
# clear consume info and consume result
|
||||||
|
#run tsim/tmq/clearConsume.sim
|
||||||
|
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||||
|
$cdb_index = $cdb_index + 1
|
||||||
|
$cdbName = cdb . $cdb_index
|
||||||
|
sql create database $cdbName vgroups 1
|
||||||
|
sleep 500
|
||||||
|
sql use $cdbName
|
||||||
|
|
||||||
|
print == create consume info table and consume result table
|
||||||
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
|
sql show tables
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
#######################################################################################
|
||||||
|
|
||||||
|
if $loop_cnt == 0 then
|
||||||
|
print == scenario 1: topic_ctb_column
|
||||||
|
$topicList = ' . topic_ctb_column
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 1 then
|
||||||
|
print == scenario 2: topic_ctb_all
|
||||||
|
$topicList = ' . topic_ctb_all
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 2 then
|
||||||
|
print == scenario 3: topic_ctb_function
|
||||||
|
$topicList = ' . topic_ctb_function
|
||||||
|
$topicList = $topicList . '
|
||||||
|
else
|
||||||
|
goto loop_consume_diff_topic_from_ctb_end
|
||||||
|
endi
|
||||||
|
|
||||||
|
$consumerId = 0
|
||||||
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
|
$expectmsgcnt = 1
|
||||||
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
print == start consumer to pull msgs from ctb
|
||||||
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
|
||||||
|
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
|
||||||
|
|
||||||
|
print == check consume result
|
||||||
|
wait_consumer_end_from_ctb:
|
||||||
|
sql select * from consumeresult
|
||||||
|
print ==> rows: $rows
|
||||||
|
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||||
|
if $rows != 1 then
|
||||||
|
sleep 1000
|
||||||
|
goto wait_consumer_end_from_ctb
|
||||||
|
endi
|
||||||
|
if $data[0][1] != $consumerId then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][2] != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][3] != 10 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
$loop_cnt = $loop_cnt + 1
|
||||||
|
goto loop_consume_diff_topic_from_ctb
|
||||||
|
loop_consume_diff_topic_from_ctb_end:
|
||||||
|
|
||||||
|
print ================ test consume from ntb
|
||||||
|
$loop_cnt = 0
|
||||||
|
loop_consume_diff_topic_from_ntb:
|
||||||
|
|
||||||
|
#######################################################################################
|
||||||
|
# clear consume info and consume result
|
||||||
|
#run tsim/tmq/clearConsume.sim
|
||||||
|
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||||
|
$cdb_index = $cdb_index + 1
|
||||||
|
$cdbName = cdb . $cdb_index
|
||||||
|
sql create database $cdbName vgroups 1
|
||||||
|
sleep 500
|
||||||
|
sql use $cdbName
|
||||||
|
|
||||||
|
print == create consume info table and consume result table
|
||||||
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
|
sql show tables
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
#######################################################################################
|
||||||
|
|
||||||
|
if $loop_cnt == 0 then
|
||||||
|
print == scenario 1: topic_ntb_column
|
||||||
|
$topicList = ' . topic_ntb_column
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 1 then
|
||||||
|
print == scenario 2: topic_ntb_all
|
||||||
|
$topicList = ' . topic_ntb_all
|
||||||
|
$topicList = $topicList . '
|
||||||
|
elif $loop_cnt == 2 then
|
||||||
|
print == scenario 3: topic_ntb_function
|
||||||
|
$topicList = ' . topic_ntb_function
|
||||||
|
$topicList = $topicList . '
|
||||||
|
else
|
||||||
|
goto loop_consume_diff_topic_from_ntb_end
|
||||||
|
endi
|
||||||
|
|
||||||
|
$consumerId = 0
|
||||||
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
print == start consumer to pull msgs from ntb
|
||||||
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
|
||||||
|
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
|
||||||
|
|
||||||
|
print == check consume result from ntb
|
||||||
|
wait_consumer_end_from_ntb:
|
||||||
|
sql select * from consumeresult
|
||||||
|
print ==> rows: $rows
|
||||||
|
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||||
|
if $rows != 1 then
|
||||||
|
sleep 1000
|
||||||
|
goto wait_consumer_end_from_ntb
|
||||||
|
endi
|
||||||
|
if $data[0][1] != $consumerId then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][2] != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data[0][3] != $totalMsgOfNtb then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
$loop_cnt = $loop_cnt + 1
|
||||||
|
goto loop_consume_diff_topic_from_ntb
|
||||||
|
loop_consume_diff_topic_from_ntb_end:
|
||||||
|
|
||||||
|
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||||
|
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -22,9 +22,9 @@
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
|
#include "taosdef.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
|
|
||||||
#define GREEN "\033[1;32m"
|
#define GREEN "\033[1;32m"
|
||||||
|
@ -36,11 +36,7 @@
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
#define MAX_VGROUP_CNT (32)
|
#define MAX_VGROUP_CNT (32)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum { NOTIFY_CMD_START_CONSUM, NOTIFY_CMD_START_COMMIT, NOTIFY_CMD_ID_BUTT } NOTIFY_CMD_ID;
|
||||||
NOTIFY_CMD_START_CONSUM,
|
|
||||||
NOTIFY_CMD_START_COMMIT,
|
|
||||||
NOTIFY_CMD_ID_BUTT
|
|
||||||
}NOTIFY_CMD_ID;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
|
@ -89,6 +85,7 @@ typedef struct {
|
||||||
int32_t saveRowFlag;
|
int32_t saveRowFlag;
|
||||||
int32_t consumeDelay; // unit s
|
int32_t consumeDelay; // unit s
|
||||||
int32_t numOfThread;
|
int32_t numOfThread;
|
||||||
|
int32_t useSnapshot;
|
||||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||||
} SConfInfo;
|
} SConfInfo;
|
||||||
|
|
||||||
|
@ -96,6 +93,8 @@ static SConfInfo g_stConfInfo;
|
||||||
TdFilePtr g_fp = NULL;
|
TdFilePtr g_fp = NULL;
|
||||||
static int running = 1;
|
static int running = 1;
|
||||||
|
|
||||||
|
int8_t useSnapshot = 0;
|
||||||
|
|
||||||
// char* g_pRowValue = NULL;
|
// char* g_pRowValue = NULL;
|
||||||
// TdFilePtr g_fp = NULL;
|
// TdFilePtr g_fp = NULL;
|
||||||
|
|
||||||
|
@ -205,6 +204,8 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
g_stConfInfo.saveRowFlag = atol(argv[++i]);
|
g_stConfInfo.saveRowFlag = atol(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-y") == 0) {
|
} else if (strcmp(argv[i], "-y") == 0) {
|
||||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-e") == 0) {
|
||||||
|
useSnapshot = (int8_t)atol(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
|
@ -298,11 +299,11 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
|
||||||
//if (shell.args.is_raw_time) {
|
// if (shell.args.is_raw_time) {
|
||||||
// sprintf(buf, "%" PRId64, val);
|
// sprintf(buf, "%" PRId64, val);
|
||||||
// return buf;
|
// return buf;
|
||||||
//}
|
// }
|
||||||
|
|
||||||
time_t tt;
|
time_t tt;
|
||||||
int32_t ms = 0;
|
int32_t ms = 0;
|
||||||
|
@ -340,7 +341,7 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
struct tm *ptm = taosLocalTime(&tt, NULL);
|
struct tm* ptm = taosLocalTime(&tt, NULL);
|
||||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||||
|
|
||||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
|
@ -354,7 +355,8 @@ static char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *field, int32_t length, int32_t precision) {
|
static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* field, int32_t length,
|
||||||
|
int32_t precision) {
|
||||||
if (val == NULL) {
|
if (val == NULL) {
|
||||||
taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
|
taosFprintfFile(pFile, "%s", TSDB_DATA_NULL_STR);
|
||||||
return;
|
return;
|
||||||
|
@ -364,31 +366,31 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
|
||||||
char buf[TSDB_MAX_BYTES_PER_ROW];
|
char buf[TSDB_MAX_BYTES_PER_ROW];
|
||||||
switch (field->type) {
|
switch (field->type) {
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char *)val))) == 1) ? 1 : 0));
|
taosFprintfFile(pFile, "%d", ((((int32_t)(*((char*)val))) == 1) ? 1 : 0));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
taosFprintfFile(pFile, "%d", *((int8_t *)val));
|
taosFprintfFile(pFile, "%d", *((int8_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
taosFprintfFile(pFile, "%u", *((uint8_t *)val));
|
taosFprintfFile(pFile, "%u", *((uint8_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
taosFprintfFile(pFile, "%d", *((int16_t *)val));
|
taosFprintfFile(pFile, "%d", *((int16_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
taosFprintfFile(pFile, "%u", *((uint16_t *)val));
|
taosFprintfFile(pFile, "%u", *((uint16_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
taosFprintfFile(pFile, "%d", *((int32_t *)val));
|
taosFprintfFile(pFile, "%d", *((int32_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
taosFprintfFile(pFile, "%u", *((uint32_t *)val));
|
taosFprintfFile(pFile, "%u", *((uint32_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
taosFprintfFile(pFile, "%" PRId64, *((int64_t *)val));
|
taosFprintfFile(pFile, "%" PRId64, *((int64_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t *)val));
|
taosFprintfFile(pFile, "%" PRIu64, *((uint64_t*)val));
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
|
taosFprintfFile(pFile, "%.5f", GET_FLOAT_VAL(val));
|
||||||
|
@ -409,7 +411,7 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
|
||||||
taosFprintfFile(pFile, "\'%s\'", buf);
|
taosFprintfFile(pFile, "\'%s\'", buf);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
shellFormatTimestamp(buf, *(int64_t *)val, precision);
|
shellFormatTimestamp(buf, *(int64_t*)val, precision);
|
||||||
taosFprintfFile(pFile, "'%s'", buf);
|
taosFprintfFile(pFile, "'%s'", buf);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
@ -417,12 +419,13 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char *val, TAOS_FIELD *f
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields, int32_t precision) {
|
static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields, int32_t* length, int32_t num_fields,
|
||||||
|
int32_t precision) {
|
||||||
for (int32_t i = 0; i < num_fields; i++) {
|
for (int32_t i = 0; i < num_fields; i++) {
|
||||||
if (i > 0) {
|
if (i > 0) {
|
||||||
taosFprintfFile(pFile, "\n");
|
taosFprintfFile(pFile, "\n");
|
||||||
}
|
}
|
||||||
shellDumpFieldToFile(pFile, (const char *)row[i], fields + i, length[i], precision);
|
shellDumpFieldToFile(pFile, (const char*)row[i], fields + i, length[i], precision);
|
||||||
}
|
}
|
||||||
taosFprintfFile(pFile, "\n");
|
taosFprintfFile(pFile, "\n");
|
||||||
}
|
}
|
||||||
|
@ -436,7 +439,8 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
||||||
const char* dbName = tmq_get_db_name(msg);
|
const char* dbName = tmq_get_db_name(msg);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
||||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", tmq_get_topic_name(msg), vgroupId);
|
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
||||||
|
tmq_get_topic_name(msg), vgroupId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
@ -454,9 +458,9 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
||||||
|
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
|
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
|
||||||
//if (0 != g_stConfInfo.saveRowFlag) {
|
// if (0 != g_stConfInfo.saveRowFlag) {
|
||||||
// saveConsumeContentToTbl(pInfo, buf);
|
// saveConsumeContentToTbl(pInfo, buf);
|
||||||
//}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
totalRows++;
|
totalRows++;
|
||||||
|
@ -479,8 +483,7 @@ int queryDB(TAOS* taos, char* command) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {
|
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
|
||||||
}
|
|
||||||
|
|
||||||
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
|
@ -488,10 +491,7 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)",
|
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
|
||||||
g_stConfInfo.cdbName,
|
|
||||||
now,
|
|
||||||
cmdId,
|
|
||||||
pInfo->consumerId);
|
pInfo->consumerId);
|
||||||
|
|
||||||
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
||||||
|
@ -541,6 +541,10 @@ void build_consumer(SThreadInfo* pInfo) {
|
||||||
// tmq_conf_set(conf, "auto.offset.reset", "none");
|
// tmq_conf_set(conf, "auto.offset.reset", "none");
|
||||||
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||||
// tmq_conf_set(conf, "auto.offset.reset", "latest");
|
// tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||||
|
//
|
||||||
|
if (useSnapshot) {
|
||||||
|
tmq_conf_set(conf, "experiment.use.snapshot", "true");
|
||||||
|
}
|
||||||
|
|
||||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||||
|
|
||||||
|
@ -599,7 +603,8 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
if (pInfo->ifCheckData) {
|
if (pInfo->ifCheckData) {
|
||||||
char filename[256] = {0};
|
char filename[256] = {0};
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
//sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId, getCurrentTimeString(tmpString));
|
// sprintf(filename, "%s/../log/consumerid_%d_%s.txt", configDir, pInfo->consumerId,
|
||||||
|
// getCurrentTimeString(tmpString));
|
||||||
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||||
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
if (pInfo->pConsumeRowsFile == NULL) {
|
if (pInfo->pConsumeRowsFile == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue