feat(tmq): add snapshot test
This commit is contained in:
parent
f324a50b2e
commit
4695dc2dae
|
@ -199,7 +199,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
|
||||
tmq_conf_set(conf, "experiment.use.snapshot", "false");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
|
|
@ -36,6 +36,7 @@ typedef struct SReadHandle {
|
|||
void* vnode;
|
||||
void* mnd;
|
||||
SMsgCb* pMsgCb;
|
||||
bool tqReader;
|
||||
} SReadHandle;
|
||||
|
||||
typedef enum {
|
||||
|
@ -133,7 +134,6 @@ int32_t qKillTask(qTaskInfo_t tinfo);
|
|||
*/
|
||||
int32_t qAsyncKillTask(qTaskInfo_t tinfo);
|
||||
|
||||
|
||||
/**
|
||||
* destroy query info structure
|
||||
* @param qHandle
|
||||
|
@ -172,6 +172,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
|
|||
*/
|
||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts);
|
||||
|
||||
int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -54,7 +54,8 @@ struct tmq_conf_t {
|
|||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t useSnapshot;
|
||||
int8_t spEnable;
|
||||
int32_t spBatchSize;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
|
@ -288,18 +289,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
|
|||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "experiment.use.snapshot") == 0) {
|
||||
if (strcmp(key, "experimental.snapshot.enable") == 0) {
|
||||
if (strcmp(value, "true") == 0) {
|
||||
conf->useSnapshot = true;
|
||||
conf->spEnable = true;
|
||||
return TMQ_CONF_OK;
|
||||
} else if (strcmp(value, "false") == 0) {
|
||||
conf->useSnapshot = false;
|
||||
conf->spEnable = false;
|
||||
return TMQ_CONF_OK;
|
||||
} else {
|
||||
return TMQ_CONF_INVALID;
|
||||
}
|
||||
}
|
||||
|
||||
if (strcmp(key, "experimental.snapshot.batch.size") == 0) {
|
||||
conf->spBatchSize = atoi(value);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcmp(key, "td.connect.ip") == 0) {
|
||||
conf->ip = strdup(value);
|
||||
return TMQ_CONF_OK;
|
||||
|
@ -918,7 +924,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
strcpy(pTmq->clientId, conf->clientId);
|
||||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->withTbName = conf->withTbName;
|
||||
pTmq->useSnapshot = conf->useSnapshot;
|
||||
pTmq->useSnapshot = conf->spEnable;
|
||||
pTmq->autoCommit = conf->autoCommit;
|
||||
pTmq->autoCommitInterval = conf->autoCommitInterval;
|
||||
pTmq->commitCb = conf->commitCb;
|
||||
|
|
|
@ -116,6 +116,7 @@ typedef void *tsdbReaderT;
|
|||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||
#define BLOCK_LOAD_TABLE_RR_ORDER 3
|
||||
|
||||
int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid);
|
||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray *tableList);
|
||||
tsdbReaderT tsdbReaderOpen(SVnode *pVnode, SQueryTableDataCond *pCond, SArray *tableList, uint64_t qId,
|
||||
uint64_t taskId);
|
||||
|
|
|
@ -161,7 +161,7 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
|
|||
|
||||
// tqExec
|
||||
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId);
|
||||
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId);
|
||||
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId);
|
||||
int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp);
|
||||
|
||||
// tqMeta
|
||||
|
@ -183,6 +183,17 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore);
|
|||
// tqSink
|
||||
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
|
||||
pOffsetVal->uid = uid;
|
||||
pOffsetVal->ts = ts;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
|
||||
pOffsetVal->type = TMQ_OFFSET__LOG;
|
||||
pOffsetVal->version = ver;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -228,17 +228,6 @@ static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t su
|
|||
|
||||
static int32_t tqInitMetaRsp(SMqMetaRsp* pRsp, const SMqPollReq* pReq) { return 0; }
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToData(STqOffsetVal* pOffsetVal, int64_t uid, int64_t ts) {
|
||||
pOffsetVal->type = TMQ_OFFSET__SNAPSHOT_DATA;
|
||||
pOffsetVal->uid = uid;
|
||||
pOffsetVal->ts = ts;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ver) {
|
||||
pOffsetVal->type = TMQ_OFFSET__LOG;
|
||||
pOffsetVal->version = ver;
|
||||
}
|
||||
|
||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||
SMqPollReq* pReq = pMsg->pCont;
|
||||
int64_t consumerId = pReq->consumerId;
|
||||
|
@ -394,7 +383,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
char formatBuf[50];
|
||||
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
|
||||
tqInfo("retrieve using snapshot req offset %s", formatBuf);
|
||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, workerId) < 0) {
|
||||
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
|
@ -655,6 +644,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.tqReader = true,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
|
|
|
@ -59,13 +59,18 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, int32_t workerId) {
|
||||
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
|
||||
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
|
||||
qTaskInfo_t task = pExec->execCol.task[workerId];
|
||||
// TODO set uid and ts
|
||||
|
||||
if (qStreamScanSnapshot(task) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
if (qPrepareScan(task, offset.uid, offset.ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
@ -86,7 +91,16 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, i
|
|||
/*tqAddTbNameToRsp(pTq, uid, pRsp, workerId);*/
|
||||
}
|
||||
pRsp->blockNum++;
|
||||
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt >= 4096) break;
|
||||
}
|
||||
int64_t uid;
|
||||
int64_t ts;
|
||||
if (qGetStreamScanStatus(task, &uid, &ts) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
tqOffsetResetToData(&pRsp->rspOffset, uid, ts);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -112,9 +112,9 @@ typedef struct STsdbReadHandle {
|
|||
STimeWindow window; // the primary query time window that applies to all queries
|
||||
// SColumnDataAgg* statis; // query level statistics, only one table block statistics info exists at any time
|
||||
// SColumnDataAgg** pstatis;// the ptr array list to return to caller
|
||||
int32_t numOfBlocks;
|
||||
int32_t numOfBlocks;
|
||||
SSDataBlock* pResBlock;
|
||||
// SArray* pColumns; // column list, SColumnInfoData array list
|
||||
// SArray* pColumns; // column list, SColumnInfoData array list
|
||||
bool locateStart;
|
||||
int32_t outputCapacity;
|
||||
int32_t realNumOfRows;
|
||||
|
@ -223,6 +223,22 @@ int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle) {
|
|||
return rows;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromUid(STsdbReadHandle* pTsdbReadHandle, int64_t uid) {
|
||||
SArray* pTableCheckInfo = taosArrayInit(1, sizeof(STableCheckInfo));
|
||||
if (pTableCheckInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
STableCheckInfo info = {
|
||||
.tableId = uid,
|
||||
};
|
||||
info.suid = pTsdbReadHandle->suid;
|
||||
|
||||
taosArrayPush(pTableCheckInfo, &info);
|
||||
tsdbDebug("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pTsdbReadHandle, info.tableId, info.lastKey,
|
||||
pTsdbReadHandle->idStr);
|
||||
return pTableCheckInfo;
|
||||
}
|
||||
|
||||
static SArray* createCheckInfoFromTableGroup(STsdbReadHandle* pTsdbReadHandle, SArray* pTableList) {
|
||||
size_t tableSize = taosArrayGetSize(pTableList);
|
||||
|
||||
|
@ -428,8 +444,8 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
|
|||
|
||||
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {.info = pCond->colList[i], 0};
|
||||
int32_t code = blockDataAppendColInfo(pReadHandle->pResBlock, &colInfo);
|
||||
if (code != TSDB_CODE_SUCCESS){
|
||||
int32_t code = blockDataAppendColInfo(pReadHandle->pResBlock, &colInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
|
@ -494,9 +510,19 @@ static int32_t setCurrentSchema(SVnode* pVnode, STsdbReadHandle* pTsdbReadHandle
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
|
||||
int32_t tsdbSetTableId(tsdbReaderT reader, int64_t uid) {
|
||||
STsdbReadHandle* pTsdbReadHandle = reader;
|
||||
if(pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
|
||||
if (pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
|
||||
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromUid(pTsdbReadHandle, uid);
|
||||
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
|
||||
return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
}
|
||||
return TDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList) {
|
||||
STsdbReadHandle* pTsdbReadHandle = reader;
|
||||
if (pTsdbReadHandle->pTableCheckInfo) taosArrayDestroy(pTsdbReadHandle->pTableCheckInfo);
|
||||
pTsdbReadHandle->pTableCheckInfo = createCheckInfoFromTableGroup(pTsdbReadHandle, tableList);
|
||||
if (pTsdbReadHandle->pTableCheckInfo == NULL) {
|
||||
return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -505,10 +531,10 @@ int32_t tsdbSetTableList(tsdbReaderT reader, SArray* tableList){
|
|||
}
|
||||
|
||||
tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* tableList, uint64_t qId,
|
||||
uint64_t taskId) {
|
||||
if(taosArrayGetSize(tableList) == 0){
|
||||
return NULL;
|
||||
}
|
||||
uint64_t taskId) {
|
||||
/*if (taosArrayGetSize(tableList) == 0) {*/
|
||||
/*return NULL;*/
|
||||
/*}*/
|
||||
STsdbReadHandle* pTsdbReadHandle = tsdbQueryTablesImpl(pVnode, pCond, qId, taskId);
|
||||
if (pTsdbReadHandle == NULL) {
|
||||
return NULL;
|
||||
|
@ -553,8 +579,7 @@ tsdbReaderT tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* t
|
|||
}
|
||||
|
||||
tsdbDebug("%p total numOfTable:%" PRIzu " in this query, table %" PRIzu " %s", pTsdbReadHandle,
|
||||
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList),
|
||||
pTsdbReadHandle->idStr);
|
||||
taosArrayGetSize(pTsdbReadHandle->pTableCheckInfo), taosArrayGetSize(tableList), pTsdbReadHandle->idStr);
|
||||
|
||||
return (tsdbReaderT)pTsdbReadHandle;
|
||||
}
|
||||
|
@ -1073,7 +1098,7 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
|
|||
}
|
||||
|
||||
int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerMin[precision])); // set the starting fileId
|
||||
if (fid < 0LL && llabs(fid) > INT32_MAX) { // data value overflow for INT32
|
||||
if (fid < 0LL && llabs(fid) > INT32_MAX) { // data value overflow for INT32
|
||||
fid = INT32_MIN;
|
||||
}
|
||||
|
||||
|
@ -2612,7 +2637,7 @@ int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* queryHandle, STableBlockDistInfo*
|
|||
tsdbGetFidKeyRange(pCfg->days, pCfg->precision, pTsdbReadHandle->pFileGroup->fid, &win.skey, &win.ekey);
|
||||
|
||||
// current file are not overlapped with query time window, ignore remain files
|
||||
if ((win.skey > pTsdbReadHandle->window.ekey)/* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
|
||||
if ((win.skey > pTsdbReadHandle->window.ekey) /* || (!ascTraverse && win.ekey < pTsdbReadHandle->window.ekey)*/) {
|
||||
tsdbUnLockFS(REPO_FS(pTsdbReadHandle->pTsdb));
|
||||
tsdbDebug("%p remain files are not qualified for qrange:%" PRId64 "-%" PRId64 ", ignore, %s", pTsdbReadHandle,
|
||||
pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
|
||||
|
@ -2886,7 +2911,7 @@ int32_t tsdbGetCtbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
|||
*/
|
||||
int32_t tsdbGetStbIdList(SMeta* pMeta, int64_t suid, SArray* list) {
|
||||
SMStbCursor* pCur = metaOpenStbCursor(pMeta, suid);
|
||||
if(!pCur) {
|
||||
if (!pCur) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
|
|
|
@ -248,6 +248,11 @@ typedef struct SSampleExecInfo {
|
|||
uint32_t seed; // random seed value
|
||||
} SSampleExecInfo;
|
||||
|
||||
enum {
|
||||
TABLE_SCAN__TABLE_ORDER = 1,
|
||||
TABLE_SCAN__BLOCK_ORDER = 1,
|
||||
};
|
||||
|
||||
typedef struct STableScanInfo {
|
||||
void* dataReader;
|
||||
SReadHandle readHandle;
|
||||
|
@ -272,13 +277,21 @@ typedef struct STableScanInfo {
|
|||
int32_t curTWinIdx;
|
||||
|
||||
int32_t currentGroupId;
|
||||
int32_t currentTable;
|
||||
uint64_t queryId; // todo remove it
|
||||
uint64_t taskId; // todo remove it
|
||||
|
||||
struct {
|
||||
uint64_t uid;
|
||||
int64_t t;
|
||||
} scanStatus;
|
||||
int64_t ts;
|
||||
} lastStatus;
|
||||
|
||||
struct {
|
||||
uint64_t uid;
|
||||
int64_t ts;
|
||||
} expStatus;
|
||||
|
||||
int8_t scanMode;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
@ -713,6 +726,7 @@ void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
|||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
||||
|
||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts);
|
||||
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts);
|
||||
|
||||
SSDataBlock* loadNextDataBlock(void* param);
|
||||
|
|
|
@ -222,7 +222,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
|||
}
|
||||
|
||||
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
|
||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*) tinfo;
|
||||
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
|
||||
|
||||
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -231,11 +231,14 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le
|
|||
return decodeOperator(pTaskInfo->pRoot, pInput, len);
|
||||
}
|
||||
|
||||
int32_t qPrepareScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) tinfo;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
|
||||
}
|
||||
|
||||
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
|
||||
}
|
||||
|
|
|
@ -1033,7 +1033,7 @@ static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSData
|
|||
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||
uint32_t status = BLK_DATA_NOT_LOAD;
|
||||
|
||||
int32_t numOfOutput = 0;//pTableScanInfo->numOfOutput;
|
||||
int32_t numOfOutput = 0; // pTableScanInfo->numOfOutput;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
|
||||
|
@ -2821,13 +2821,35 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
|
|||
}
|
||||
}
|
||||
|
||||
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
|
||||
/**uid = pSnapShotScanInfo->scanStatus.uid;*/
|
||||
/**ts = pSnapShotScanInfo->scanStatus.t;*/
|
||||
if (pSnapShotScanInfo->lastStatus.uid != uid || pSnapShotScanInfo->lastStatus.ts != ts) {
|
||||
// rebuild scan
|
||||
//
|
||||
}
|
||||
} else {
|
||||
if (pOperator->pDownstream[0] == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
} else {
|
||||
doPrepareScan(pOperator->pDownstream[0], uid, ts);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
|
||||
*uid = pSnapShotScanInfo->scanStatus.uid;
|
||||
*ts = pSnapShotScanInfo->scanStatus.t;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
|
||||
*uid = pSnapShotScanInfo->lastStatus.uid;
|
||||
*ts = pSnapShotScanInfo->lastStatus.ts;
|
||||
} else {
|
||||
if (pOperator->pDownstream[0] == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
|
|
@ -416,8 +416,8 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
|||
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
|
||||
|
||||
// todo refactor
|
||||
pTableScanInfo->scanStatus.uid = pBlock->info.uid;
|
||||
pTableScanInfo->scanStatus.t = pBlock->info.window.ekey;
|
||||
pTableScanInfo->lastStatus.uid = pBlock->info.uid;
|
||||
pTableScanInfo->lastStatus.ts = pBlock->info.window.ekey;
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
@ -513,6 +513,44 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
// if scan table by table
|
||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||
// check status
|
||||
if (pInfo->lastStatus.uid == pInfo->expStatus.uid && pInfo->lastStatus.ts == pInfo->expStatus.ts) {
|
||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||
if (result) {
|
||||
return result;
|
||||
}
|
||||
// if no data, switch to next table and continue scan
|
||||
pInfo->currentTable++;
|
||||
if (pInfo->currentTable >= taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList)) {
|
||||
return NULL;
|
||||
}
|
||||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
|
||||
/*pTableInfo->uid */
|
||||
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
|
||||
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
|
||||
pInfo->scanTimes = 0;
|
||||
pInfo->curTWinIdx = 0;
|
||||
pInfo->lastStatus.ts = pInfo->expStatus.ts;
|
||||
pInfo->lastStatus.uid = pInfo->expStatus.uid;
|
||||
return doTableScan(pOperator);
|
||||
}
|
||||
// reset to exp table and window start from ts
|
||||
tsdbSetTableId(pInfo->dataReader, pInfo->expStatus.uid);
|
||||
SQueryTableDataCond tmpCond = pInfo->cond;
|
||||
tmpCond.twindows[0] = (STimeWindow){
|
||||
.skey = pInfo->expStatus.ts,
|
||||
.ekey = INT64_MAX,
|
||||
};
|
||||
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
|
||||
pInfo->scanTimes = 0;
|
||||
pInfo->curTWinIdx = 0;
|
||||
pInfo->lastStatus.ts = pInfo->expStatus.ts;
|
||||
pInfo->lastStatus.uid = pInfo->expStatus.uid;
|
||||
return doTableScan(pOperator);
|
||||
}
|
||||
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
pInfo->currentGroupId++;
|
||||
if (pInfo->currentGroupId >= taosArrayGetSize(pTaskInfo->tableqinfoList.pGroupList)) {
|
||||
|
@ -1207,6 +1245,13 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (pHandle) {
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
|
||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||
if (pHandle->tqReader) {
|
||||
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pSTInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0);
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
|
||||
} else {
|
||||
|
|
|
@ -129,6 +129,7 @@
|
|||
./test.sh -f tsim/tmq/basic2Of2ConsOverlap.sim
|
||||
./test.sh -f tsim/tmq/topic.sim
|
||||
./test.sh -f tsim/tmq/snapshot.sim
|
||||
./test.sh -f tsim/tmq/snapshot1.sim
|
||||
|
||||
# --- stable
|
||||
./test.sh -f tsim/stable/disk.sim
|
||||
|
|
|
@ -0,0 +1,308 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic2Of2ConsOverlap.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
|
||||
run tsim/tmq/prepareBasicEnv-1vgrp.sim
|
||||
|
||||
#---- global parameters start ----#
|
||||
$dbName = db
|
||||
$vgroups = 1
|
||||
$stbPrefix = stb
|
||||
$ctbPrefix = ctb
|
||||
$ntbPrefix = ntb
|
||||
$stbNum = 1
|
||||
$ctbNum = 10
|
||||
$ntbNum = 10
|
||||
$rowsPerCtb = 10
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#---- global parameters end ----#
|
||||
|
||||
$pullDelay = 5
|
||||
$ifcheckdata = 1
|
||||
$ifmanualcommit = 1
|
||||
$showMsg = 1
|
||||
$showRow = 0
|
||||
|
||||
sql connect
|
||||
sql use $dbName
|
||||
|
||||
print == create topics from super table
|
||||
sql create topic topic_stb_column as select ts, c3 from stb
|
||||
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
print == create topics from child table
|
||||
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||
sql create topic topic_ctb_all as select * from ctb0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||
|
||||
print == create topics from normal table
|
||||
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||
sql create topic topic_ntb_all as select * from ntb0
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||
|
||||
#sql show topics
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||
$keyList = ' . group.id:cgrp1
|
||||
$keyList = $keyList . ,
|
||||
$keyList = $keyList . enable.auto.commit:false
|
||||
#$keyList = $keyList . ,
|
||||
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||
#$keyList = $keyList . ,
|
||||
#$keyList = $keyList . auto.offset.reset:earliest
|
||||
$keyList = $keyList . '
|
||||
print ========== key list: $keyList
|
||||
|
||||
$topicNum = 2
|
||||
|
||||
#=============================== start consume =============================#
|
||||
|
||||
|
||||
print ================ test consume from stb
|
||||
print == overlap toipcs: topic_stb_column + topic_stb_all, topic_stb_function + topic_stb_all
|
||||
$topicList = ' . topic_stb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_all
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
|
||||
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfStb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
|
||||
$topicList = ' . topic_stb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_function
|
||||
$topicList = $topicList . '
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
print == start consumer to pull msgs from stb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start -e 1
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_stb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_stb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
# $data[0][3]/$data[1][3] should be between $totalMsgOfOneTopic and $totalMsgOfStb.
|
||||
if $data[0][3] < $totalMsgOfOneTopic then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] > $totalMsgOfStb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] < $totalMsgOfOneTopic then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] > $totalMsgOfStb then
|
||||
return -1
|
||||
endi
|
||||
|
||||
$totalMsgCons = $totalMsgOfOneTopic + $totalMsgOfStb
|
||||
$sumOfRows = $data[0][3] + $data[1][3]
|
||||
if $sumOfRows != $totalMsgCons then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb1
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ctb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ctb
|
||||
print == overlap toipcs: topic_ctb_column + topic_ctb_all, topic_ctb_function + topic_ctb_all
|
||||
$topicList = ' . topic_ctb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_all
|
||||
$topicList = $topicList . '
|
||||
$consumerId = 0
|
||||
|
||||
$totalMsgOfOneTopic = $rowsPerCtb
|
||||
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfCtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
$topicList = ' . topic_ctb_function
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_all
|
||||
$topicList = $topicList . '
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
print == start consumer to pull msgs from ctb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_ctb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ctb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] == $totalMsgOfOneTopic then
|
||||
if $data[1][3] == $totalMsgOfCtb then
|
||||
goto check_ok_1
|
||||
endi
|
||||
elif $data[1][3] == $totalMsgOfOneTopic then
|
||||
if $data[0][3] == $totalMsgOfCtb then
|
||||
goto check_ok_1
|
||||
endi
|
||||
endi
|
||||
return -1
|
||||
check_ok_1:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb2
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ntb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ntb
|
||||
print == overlap toipcs: topic_ntb_column + topic_ntb_all, topic_ntb_function + topic_ntb_all
|
||||
$topicList = ' . topic_ntb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_all
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfOneTopic = $rowsPerCtb
|
||||
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfNtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
|
||||
$topicList = ' . topic_ntb_function
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_all
|
||||
$topicList = $topicList . '
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||
|
||||
print == start consumer to pull msgs from ntb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start -e 1
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start -e 1
|
||||
|
||||
print == check consume result from ntb
|
||||
wait_consumer_end_from_ntb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ntb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
|
||||
if $data[0][3] == $totalMsgOfOneTopic then
|
||||
if $data[1][3] == $totalMsgOfNtb then
|
||||
goto check_ok_3
|
||||
endi
|
||||
elif $data[1][3] == $totalMsgOfOneTopic then
|
||||
if $data[0][3] == $totalMsgOfNtb then
|
||||
goto check_ok_3
|
||||
endi
|
||||
endi
|
||||
return -1
|
||||
check_ok_3:
|
||||
|
||||
sql select * from performance_schema.`consumers`
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#sql select * from performance_schema.`subscriptions`
|
||||
#if $rows != 0 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -124,15 +124,13 @@ char* getCurrentTimeString(char* timeString) {
|
|||
return timeString;
|
||||
}
|
||||
|
||||
static void tmqStop(int signum, void *info, void *ctx) {
|
||||
static void tmqStop(int signum, void* info, void* ctx) {
|
||||
running = 0;
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
|
||||
taosFprintfFile(g_fp, "%s tmqStop() receive stop signal[%d]\n", getCurrentTimeString(tmpString), signum);
|
||||
}
|
||||
|
||||
static void tmqSetSignalHandle() {
|
||||
taosSetSignal(SIGINT, tmqStop);
|
||||
}
|
||||
static void tmqSetSignalHandle() { taosSetSignal(SIGINT, tmqStop); }
|
||||
|
||||
void initLogFile() {
|
||||
char filename[256];
|
||||
|
@ -463,16 +461,16 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
|||
int32_t precision = taos_result_precision(msg);
|
||||
const char* tbName = tmq_get_table_name(msg);
|
||||
|
||||
#if 0
|
||||
#if 0
|
||||
// get schema
|
||||
//============================== stub =================================================//
|
||||
for (int32_t i = 0; i < numOfFields; i++) {
|
||||
taosFprintfFile(g_fp, "%02d: name: %s, type: %d, len: %d\n", i, fields[i].name, fields[i].type, fields[i].bytes);
|
||||
}
|
||||
//============================== stub =================================================//
|
||||
#endif
|
||||
#endif
|
||||
|
||||
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
||||
dumpToFileForCheck(pInfo->pConsumeRowsFile, row, fields, length, numOfFields, precision);
|
||||
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
|
@ -529,7 +527,7 @@ static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
|||
g_once_commit_flag = 1;
|
||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||
}
|
||||
|
||||
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||
}
|
||||
|
@ -565,7 +563,7 @@ void build_consumer(SThreadInfo* pInfo) {
|
|||
// tmq_conf_set(conf, "auto.offset.reset", "latest");
|
||||
//
|
||||
if (g_stConfInfo.useSnapshot) {
|
||||
tmq_conf_set(conf, "experiment.use.snapshot", "true");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
}
|
||||
|
||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
@ -683,13 +681,13 @@ void* consumeThreadFunc(void* param) {
|
|||
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (pInfo->taos == NULL) {
|
||||
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
|
||||
return NULL;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
build_consumer(pInfo);
|
||||
build_topic_list(pInfo);
|
||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
||||
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
|
||||
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
|
||||
assert(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -697,7 +695,7 @@ void* consumeThreadFunc(void* param) {
|
|||
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
|
||||
if (err != 0) {
|
||||
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
|
||||
assert(0);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -718,13 +716,13 @@ void* consumeThreadFunc(void* param) {
|
|||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
if (err != 0) {
|
||||
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(pInfo->tmq);
|
||||
if (err != 0) {
|
||||
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
|
||||
}
|
||||
pInfo->tmq = NULL;
|
||||
|
||||
|
|
Loading…
Reference in New Issue