Merge pull request #14592 from taosdata/feature/stream
refactor(stream): stream reader created in scanner
This commit is contained in:
commit
6b479f4a1f
|
@ -241,7 +241,7 @@ int32_t create_topic() {
|
|||
taos_free_result(pRes);
|
||||
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -302,7 +302,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||
/*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
|
|
@ -32,6 +32,18 @@ enum {
|
|||
TMQ_CONF__RESET_OFFSET__LATEST = -1,
|
||||
};
|
||||
|
||||
// clang-format off
|
||||
#define IS_META_MSG(x) ( \
|
||||
x == TDMT_VND_CREATE_STB \
|
||||
|| x == TDMT_VND_ALTER_STB \
|
||||
|| x == TDMT_VND_DROP_STB \
|
||||
|| x == TDMT_VND_CREATE_TABLE \
|
||||
|| x == TDMT_VND_ALTER_TABLE \
|
||||
|| x == TDMT_VND_DROP_TABLE \
|
||||
|| x == TDMT_VND_DROP_TTL_TABLE \
|
||||
)
|
||||
// clang-format on
|
||||
|
||||
enum {
|
||||
TMQ_MSG_TYPE__DUMMY = 0,
|
||||
TMQ_MSG_TYPE__POLL_RSP,
|
||||
|
|
|
@ -2826,8 +2826,8 @@ typedef struct {
|
|||
|
||||
static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
||||
// tlen += taosEncodeFixedI64(buf, pRsp->reqOffset);
|
||||
// tlen += taosEncodeFixedI64(buf, pRsp->rspOffset);
|
||||
tlen += taosEncodeFixedI16(buf, pRsp->resMsgType);
|
||||
tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen);
|
||||
tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen);
|
||||
|
@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp
|
|||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
||||
// buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);
|
||||
// buf = taosDecodeFixedI64(buf, &pRsp->rspOffset);
|
||||
buf = taosDecodeFixedI16(buf, &pRsp->resMsgType);
|
||||
buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen);
|
||||
buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen);
|
||||
|
|
|
@ -30,15 +30,15 @@ struct SRpcMsg;
|
|||
struct SSubplan;
|
||||
|
||||
typedef struct SReadHandle {
|
||||
void* reader;
|
||||
void* streamReader;
|
||||
void* meta;
|
||||
void* config;
|
||||
void* vnode;
|
||||
void* mnd;
|
||||
SMsgCb* pMsgCb;
|
||||
|
||||
// int8_t initTsdbReader;
|
||||
bool tqReader;
|
||||
bool initMetaReader;
|
||||
bool initTableReader;
|
||||
bool initStreamReader;
|
||||
} SReadHandle;
|
||||
|
||||
typedef enum {
|
||||
|
|
|
@ -223,7 +223,7 @@ typedef struct {
|
|||
SEpSet epSet;
|
||||
} SStreamChildEpInfo;
|
||||
|
||||
struct SStreamTask {
|
||||
typedef struct SStreamTask {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int8_t isDataScan;
|
||||
|
@ -235,6 +235,11 @@ struct SStreamTask {
|
|||
int8_t taskStatus;
|
||||
int8_t execStatus;
|
||||
|
||||
// exec info
|
||||
int64_t enqueueVer;
|
||||
int64_t processedVer;
|
||||
int64_t checkpointVer;
|
||||
|
||||
// node info
|
||||
int32_t selfChildId;
|
||||
int32_t nodeId;
|
||||
|
@ -277,7 +282,7 @@ struct SStreamTask {
|
|||
|
||||
// msg handle
|
||||
SMsgCb* pMsgCb;
|
||||
};
|
||||
} SStreamTask;
|
||||
|
||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
|
||||
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
|
||||
|
@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
|||
void tFreeSStreamTask(SStreamTask* pTask);
|
||||
|
||||
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||
#if 0
|
||||
while (1) {
|
||||
int8_t inputStatus =
|
||||
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
|
||||
|
@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
|
|||
}
|
||||
ASSERT(0);
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
|
||||
|
@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem
|
|||
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
|
||||
}
|
||||
|
||||
#if 0
|
||||
// TODO: back pressure
|
||||
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
|
||||
#endif
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ typedef struct {
|
|||
EWalType level; // wal level
|
||||
} SWalCfg;
|
||||
|
||||
typedef struct SWalVer {
|
||||
typedef struct {
|
||||
int64_t firstVer;
|
||||
int64_t verInSnapshotting;
|
||||
int64_t snapshotVer;
|
||||
|
@ -149,17 +149,22 @@ typedef struct SWal {
|
|||
SWalCkHead writeHead;
|
||||
} SWal; // WAL HANDLE
|
||||
|
||||
typedef struct SWalReadHandle {
|
||||
SWal *pWal;
|
||||
TdFilePtr pReadLogTFile;
|
||||
TdFilePtr pReadIdxTFile;
|
||||
int64_t curFileFirstVer;
|
||||
int64_t curVersion;
|
||||
int64_t capacity;
|
||||
int64_t status; // if cursor valid
|
||||
TdThreadMutex mutex;
|
||||
SWalCkHead *pHead;
|
||||
} SWalReadHandle;
|
||||
typedef struct {
|
||||
int8_t scanUncommited;
|
||||
int8_t scanMeta;
|
||||
} SWalFilterCond;
|
||||
|
||||
typedef struct {
|
||||
SWal *pWal;
|
||||
TdFilePtr pLogFile;
|
||||
TdFilePtr pIdxFile;
|
||||
int64_t curFileFirstVer;
|
||||
int64_t curVersion;
|
||||
int64_t capacity;
|
||||
TdThreadMutex mutex;
|
||||
SWalFilterCond cond;
|
||||
SWalCkHead *pHead;
|
||||
} SWalReader;
|
||||
|
||||
// module initialization
|
||||
int32_t walInit();
|
||||
|
@ -178,7 +183,6 @@ void walFsync(SWal *, bool force);
|
|||
|
||||
// apis for lifecycle management
|
||||
int32_t walCommit(SWal *, int64_t ver);
|
||||
// truncate after
|
||||
int32_t walRollback(SWal *, int64_t ver);
|
||||
// notify that previous logs can be pruned safely
|
||||
int32_t walBeginSnapshot(SWal *, int64_t ver);
|
||||
|
@ -187,15 +191,16 @@ int32_t walRestoreFromSnapshot(SWal *, int64_t ver);
|
|||
// int32_t walDataCorrupted(SWal*);
|
||||
|
||||
// read
|
||||
SWalReadHandle *walOpenReadHandle(SWal *);
|
||||
void walCloseReadHandle(SWalReadHandle *);
|
||||
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
|
||||
SWalReader *walOpenReader(SWal *, SWalFilterCond *pCond);
|
||||
void walCloseReader(SWalReader *pRead);
|
||||
int32_t walReadVer(SWalReader *pRead, int64_t ver);
|
||||
int32_t walNextValidMsg(SWalReader *pRead);
|
||||
|
||||
// only for tq usage
|
||||
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity);
|
||||
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead);
|
||||
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead);
|
||||
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead);
|
||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity);
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead);
|
||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead);
|
||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead);
|
||||
|
||||
typedef struct {
|
||||
int64_t refId;
|
||||
|
@ -207,10 +212,11 @@ void walCloseRef(SWalRef *);
|
|||
int32_t walRefVer(SWalRef *, int64_t ver);
|
||||
int32_t walUnrefVer(SWal *);
|
||||
|
||||
// help function for raft
|
||||
bool walLogExist(SWal *, int64_t ver);
|
||||
bool walIsEmpty(SWal *);
|
||||
|
||||
// lifecycle check
|
||||
bool walIsEmpty(SWal *);
|
||||
int64_t walGetFirstVer(SWal *);
|
||||
int64_t walGetSnapshotVer(SWal *);
|
||||
int64_t walGetLastVer(SWal *);
|
||||
|
|
|
@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str);
|
|||
void taosIpPort2String(uint32_t ip, uint16_t port, char *str);
|
||||
|
||||
void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen);
|
||||
char *strDupUnquo(const char *src);
|
||||
|
||||
static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) {
|
||||
T_MD5_CTX context;
|
||||
|
|
|
@ -50,19 +50,18 @@ struct tmq_list_t {
|
|||
};
|
||||
|
||||
struct tmq_conf_t {
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t spEnable;
|
||||
int32_t spBatchSize;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
char* user;
|
||||
char* pass;
|
||||
/*char* db;*/
|
||||
char clientId[256];
|
||||
char groupId[TSDB_CGROUP_LEN];
|
||||
int8_t autoCommit;
|
||||
int8_t resetOffset;
|
||||
int8_t withTbName;
|
||||
int8_t spEnable;
|
||||
int32_t spBatchSize;
|
||||
uint16_t port;
|
||||
int32_t autoCommitInterval;
|
||||
char* ip;
|
||||
char* user;
|
||||
char* pass;
|
||||
tmq_commit_cb* commitCb;
|
||||
void* commitCbUserParam;
|
||||
};
|
||||
|
@ -338,7 +337,7 @@ tmq_list_t* tmq_list_new() {
|
|||
|
||||
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
||||
SArray* container = &list->container;
|
||||
char* topic = strdup(src);
|
||||
char* topic = strDupUnquo(src);
|
||||
if (taosArrayPush(container, &topic) == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -40,15 +40,6 @@ extern "C" {
|
|||
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#define IS_META_MSG(x) ( \
|
||||
x == TDMT_VND_CREATE_STB \
|
||||
|| x == TDMT_VND_ALTER_STB \
|
||||
|| x == TDMT_VND_DROP_STB \
|
||||
|| x == TDMT_VND_CREATE_TABLE \
|
||||
|| x == TDMT_VND_ALTER_TABLE \
|
||||
|| x == TDMT_VND_DROP_TABLE \
|
||||
|| x == TDMT_VND_DROP_TTL_TABLE \
|
||||
)
|
||||
// clang-format on
|
||||
|
||||
typedef struct STqOffsetStore STqOffsetStore;
|
||||
|
@ -128,7 +119,7 @@ typedef struct {
|
|||
int8_t fetchMeta;
|
||||
|
||||
// reader
|
||||
SWalReadHandle* pWalReader;
|
||||
SWalReader* pWalReader;
|
||||
|
||||
// push
|
||||
STqPushHandle pushHandle;
|
||||
|
|
|
@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con
|
|||
}
|
||||
|
||||
SReadHandle handle = {
|
||||
.reader = pReadHandle,
|
||||
.streamReader = pReadHandle,
|
||||
.meta = pMeta,
|
||||
.pMsgCb = pMsgCb,
|
||||
.vnode = pVnode,
|
||||
|
|
|
@ -28,8 +28,12 @@ int32_t tqInit() {
|
|||
atomic_store_8(&tqMgmt.inited, 0);
|
||||
return -1;
|
||||
}
|
||||
if (streamInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
atomic_store_8(&tqMgmt.inited, 1);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -42,6 +46,7 @@ void tqCleanUp() {
|
|||
|
||||
if (old == 1) {
|
||||
taosTmrCleanUp(tqMgmt.timer);
|
||||
streamCleanUp();
|
||||
atomic_store_8(&tqMgmt.inited, 0);
|
||||
}
|
||||
}
|
||||
|
@ -144,7 +149,6 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
|||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSMqDataRsp(&encoder, pRsp);
|
||||
/*tEncodeSMqDataBlkRsp(&abuf, pRsp);*/
|
||||
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
|
@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
ASSERT(IS_META_MSG(pHead->msgType));
|
||||
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
|
||||
SMqMetaRsp metaRsp = {0};
|
||||
metaRsp.reqOffset = pReq->reqOffset.version;
|
||||
metaRsp.rspOffset = fetchVer;
|
||||
/*metaRsp.reqOffset = pReq->reqOffset.version;*/
|
||||
/*metaRsp.rspOffset = fetchVer;*/
|
||||
/*metaRsp.rspOffsetNew.version = fetchVer;*/
|
||||
tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version);
|
||||
tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer);
|
||||
metaRsp.resMsgType = pHead->msgType;
|
||||
metaRsp.metaRspLen = pHead->bodyLen;
|
||||
metaRsp.metaRsp = pHead->body;
|
||||
|
@ -439,7 +446,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
pHandle->execHandle.subType = req.subType;
|
||||
pHandle->fetchMeta = req.withMeta;
|
||||
|
||||
pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
pHandle->pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
|
@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
req.qmsg = NULL;
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle handle = {
|
||||
.reader = pHandle->execHandle.pExecReader[i],
|
||||
.streamReader = pHandle->execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.tqReader = true,
|
||||
.initTableReader = true,
|
||||
};
|
||||
pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle);
|
||||
ASSERT(pHandle->execHandle.execCol.task[i]);
|
||||
|
@ -522,19 +529,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
// expand runners
|
||||
if (pTask->isDataScan) {
|
||||
SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
SReadHandle handle = {
|
||||
.reader = pStreamReader,
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
SReadHandle handle = {
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.initStreamReader = 1,
|
||||
};
|
||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else {
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
|
||||
// sink
|
||||
|
|
|
@ -77,14 +77,14 @@ int32_t tqMetaOpen(STQ* pTq) {
|
|||
STqHandle handle;
|
||||
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
|
||||
tDecodeSTqHandle(&decoder, &handle);
|
||||
handle.pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
|
||||
handle.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL);
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
handle.execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
|
||||
}
|
||||
if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
|
||||
for (int32_t i = 0; i < 5; i++) {
|
||||
SReadHandle reader = {
|
||||
.reader = handle.execHandle.pExecReader[i],
|
||||
.streamReader = handle.execHandle.pExecReader[i],
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.pMsgCb = &pTq->pVnode->msgCb,
|
||||
.vnode = pTq->pVnode,
|
||||
|
|
|
@ -348,7 +348,7 @@ typedef struct SessionWindowSupporter {
|
|||
uint8_t parentType;
|
||||
} SessionWindowSupporter;
|
||||
|
||||
typedef struct SStreamBlockScanInfo {
|
||||
typedef struct SStreamScanInfo {
|
||||
uint64_t tableUid; // queried super table uid
|
||||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
|
@ -365,7 +365,7 @@ typedef struct SStreamBlockScanInfo {
|
|||
int32_t blockType; // current block type
|
||||
int32_t validBlockIndex; // Is current data has returned?
|
||||
uint64_t numOfExec; // execution times
|
||||
void* streamBlockReader;// stream block reader handle
|
||||
void* streamReader;// stream block reader handle
|
||||
|
||||
int32_t tsArrayIndex;
|
||||
SArray* tsArray;
|
||||
|
@ -374,7 +374,7 @@ typedef struct SStreamBlockScanInfo {
|
|||
|
||||
EStreamScanMode scanMode;
|
||||
SOperatorInfo* pStreamScanOp;
|
||||
SOperatorInfo* pSnapshotReadOp;
|
||||
SOperatorInfo* pTableScanOp;
|
||||
SArray* childIds;
|
||||
SessionWindowSupporter sessionSup;
|
||||
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
|
||||
|
@ -383,7 +383,7 @@ typedef struct SStreamBlockScanInfo {
|
|||
SSDataBlock* pPullDataRes; // pull data SSDataBlock
|
||||
SSDataBlock* pDeleteDataRes; // delete data SSDataBlock
|
||||
int32_t deleteDataIndex;
|
||||
} SStreamBlockScanInfo;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
SRetrieveMetaTableRsp* pRsp;
|
||||
|
|
|
@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
} else {
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
pInfo->assignBlockUid = assignUid;
|
||||
|
||||
// TODO: if a block was set but not consumed,
|
||||
|
@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
pInfo->blockType = type;
|
||||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) {
|
||||
if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) {
|
||||
qError("submit msg messed up when initing stream block, %s" PRIx64, id);
|
||||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
|
@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
|||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) {
|
||||
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
|
@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
pInfo = pInfo->pDownstream[0];
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
SStreamBlockScanInfo* pScanInfo = pInfo->info;
|
||||
int32_t code = 0;
|
||||
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||
code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa);
|
||||
code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa);
|
||||
taosArrayDestroy(qa);
|
||||
} else { // remove the table id in current list
|
||||
qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList));
|
||||
code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList);
|
||||
code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -2849,12 +2849,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
|||
pOperator->status = OP_OPENED;
|
||||
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||
pScanInfo->blockType = STREAM_INPUT__DATA_SCAN;
|
||||
|
||||
pScanInfo->pSnapshotReadOp->status = OP_OPENED;
|
||||
pScanInfo->pTableScanOp->status = OP_OPENED;
|
||||
|
||||
STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info;
|
||||
STableScanInfo* pInfo = pScanInfo->pTableScanOp->info;
|
||||
ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER);
|
||||
|
||||
if (uid == 0) {
|
||||
|
@ -2914,8 +2914,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
|
|||
int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
|
||||
int32_t type = pOperator->operatorType;
|
||||
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
SStreamBlockScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info;
|
||||
SStreamScanInfo* pScanInfo = pOperator->info;
|
||||
STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info;
|
||||
*uid = pSnapShotScanInfo->lastStatus.uid;
|
||||
*ts = pSnapShotScanInfo->lastStatus.ts;
|
||||
} else {
|
||||
|
@ -4587,9 +4587,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI
|
|||
}
|
||||
return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo);
|
||||
} else {
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
*ppInfo = pInfo->pSnapshotReadOp->info;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN);
|
||||
*ppInfo = pInfo->pTableScanOp->info;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -780,7 +780,7 @@ _error:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
||||
static void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
||||
size_t total = taosArrayGetSize(pInfo->pBlockLists);
|
||||
|
||||
pInfo->validBlockIndex = 0;
|
||||
|
@ -791,21 +791,20 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
|||
taosArrayClear(pInfo->pBlockLists);
|
||||
}
|
||||
|
||||
static bool isSessionWindow(SStreamBlockScanInfo* pInfo) {
|
||||
static bool isSessionWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
}
|
||||
|
||||
static bool isStateWindow(SStreamBlockScanInfo* pInfo) {
|
||||
static bool isStateWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||
}
|
||||
|
||||
static bool isIntervalWindow(SStreamBlockScanInfo* pInfo) {
|
||||
static bool isIntervalWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL ||
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
|
||||
pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL;
|
||||
}
|
||||
|
||||
|
||||
static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
|
||||
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
|
||||
if (groupId) {
|
||||
|
@ -827,19 +826,17 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) {
|
|||
*/
|
||||
}
|
||||
|
||||
static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
|
||||
static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) {
|
||||
ASSERT(rowIndex < pBlock->info.rows);
|
||||
switch (pBlock->info.type)
|
||||
{
|
||||
case STREAM_DELETE_DATA:
|
||||
case STREAM_RETRIEVE: {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
|
||||
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
|
||||
pInfo->groupId = groupCol[rowIndex];
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
switch (pBlock->info.type) {
|
||||
case STREAM_DELETE_DATA:
|
||||
case STREAM_RETRIEVE: {
|
||||
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex);
|
||||
uint64_t* groupCol = (uint64_t*)pColInfo->pData;
|
||||
pInfo->groupId = groupCol[rowIndex];
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -854,17 +851,17 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) {
|
|||
pTableScanInfo->currentGroupId = -1;
|
||||
}
|
||||
|
||||
static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
|
||||
static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t* pRowIndex) {
|
||||
if ((*pRowIndex) == pBlock->info.rows) {
|
||||
return false;
|
||||
}
|
||||
|
||||
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
STimeWindow win = {.skey = startData[*pRowIndex], .ekey = endData[*pRowIndex]};
|
||||
setGroupId(pInfo, pBlock, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||
(*pRowIndex)++;
|
||||
|
||||
|
@ -877,16 +874,16 @@ static bool prepareRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, i
|
|||
win.skey = TMIN(win.skey, startData[*pRowIndex]);
|
||||
continue;
|
||||
}
|
||||
ASSERT( (win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
|
||||
( isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0) ) );
|
||||
ASSERT((win.skey > startData[*pRowIndex] && win.ekey < endData[*pRowIndex]) ||
|
||||
(isInTimeWindow(&win, startData[*pRowIndex], 0) || isInTimeWindow(&win, endData[*pRowIndex], 0)));
|
||||
break;
|
||||
}
|
||||
|
||||
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win);
|
||||
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
STimeWindow win = {
|
||||
.skey = INT64_MIN,
|
||||
.ekey = INT64_MAX,
|
||||
|
@ -907,11 +904,10 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
|||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||
(*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL);
|
||||
} else {
|
||||
win =
|
||||
getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||
win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL);
|
||||
setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex);
|
||||
(*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL,
|
||||
TSDB_ORDER_ASC);
|
||||
(*pRowIndex) +=
|
||||
getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||
}
|
||||
needRead = true;
|
||||
} else if (isStateWindow(pInfo)) {
|
||||
|
@ -929,7 +925,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3
|
|||
if (!needRead) {
|
||||
return false;
|
||||
}
|
||||
resetTableScanInfo(pInfo->pSnapshotReadOp->info, &win);
|
||||
resetTableScanInfo(pInfo->pTableScanOp->info, &win);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -946,13 +942,13 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow
|
|||
dest->info.rows++;
|
||||
}
|
||||
|
||||
static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
while (1) {
|
||||
SSDataBlock* pResult = NULL;
|
||||
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||
pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) {
|
||||
// scan next window data
|
||||
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||
pResult = doTableScan(pInfo->pTableScanOp);
|
||||
}
|
||||
if (!pResult) {
|
||||
blockDataCleanup(pSDB);
|
||||
|
@ -966,14 +962,14 @@ static SSDataBlock* doRangeScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB,
|
|||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) {
|
||||
while (1) {
|
||||
SSDataBlock* pResult = NULL;
|
||||
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||
pResult = doTableScan(pInfo->pTableScanOp);
|
||||
if (pResult == NULL) {
|
||||
if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) {
|
||||
// scan next window data
|
||||
pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||
pResult = doTableScan(pInfo->pTableScanOp);
|
||||
}
|
||||
}
|
||||
if (!pResult) {
|
||||
|
@ -997,8 +993,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i
|
|||
return pResult;
|
||||
*/
|
||||
}
|
||||
|
||||
static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
|
||||
static void generateIntervalTs(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator,
|
||||
SSDataBlock* pUpdateRes) {
|
||||
if (pDelBlock->info.rows == 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -1006,18 +1002,20 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
|
|||
blockDataEnsureCapacity(pUpdateRes, 64);
|
||||
ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
|
||||
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
|
||||
|
||||
SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows &&
|
||||
i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) {
|
||||
for (int32_t i = pInfo->deleteDataIndex;
|
||||
i < pDelBlock->info.rows &&
|
||||
i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1;
|
||||
i++) {
|
||||
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
|
||||
for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) {
|
||||
for (TSKEY startTs = startData[i]; startTs <= endData[i];) {
|
||||
colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false);
|
||||
colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false);
|
||||
pUpdateRes->info.rows++;
|
||||
|
@ -1032,33 +1030,36 @@ static void generateIntervalTs(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlo
|
|||
}
|
||||
}
|
||||
|
||||
static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) {
|
||||
if (pBlock->info.rows == 0) {
|
||||
static void generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SOperatorInfo* pOperator,
|
||||
SSDataBlock* pUpdateRes) {
|
||||
if (pBlock->info.rows == 0) {
|
||||
return;
|
||||
}
|
||||
blockDataCleanup(pUpdateRes);
|
||||
blockDataEnsureCapacity(pUpdateRes, pBlock->info.rows);
|
||||
ASSERT(taosArrayGetSize(pBlock->pDataBlock) >= 3);
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
TSKEY* startData = (TSKEY*)pStartTsCol->pData;
|
||||
SColumnInfoData* pEndTsCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
TSKEY* endData = (TSKEY*)pEndTsCol->pData;
|
||||
SColumnInfoData* pGpCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
|
||||
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
|
||||
uint64_t* uidCol = (uint64_t*)pGpCol->pData;
|
||||
|
||||
SColumnInfoData* pDestStartCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestEndCol = taosArrayGet(pUpdateRes->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
int32_t dummy = 0;
|
||||
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
|
||||
int32_t dummy = 0;
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uint64_t groupId = getGroupId(pOperator, uidCol[i]);
|
||||
//gap must be 0.
|
||||
SResultWindowInfo* pStartWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
|
||||
// gap must be 0.
|
||||
SResultWindowInfo* pStartWin =
|
||||
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, startData[i], endData[i], groupId, 0, &dummy);
|
||||
if (!pStartWin) {
|
||||
// window has been closed.
|
||||
continue;
|
||||
}
|
||||
SResultWindowInfo* pEndWin = getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
|
||||
SResultWindowInfo* pEndWin =
|
||||
getCurSessionWindow(pInfo->sessionSup.pStreamAggSup, endData[i], endData[i], groupId, 0, &dummy);
|
||||
ASSERT(pEndWin);
|
||||
colDataAppend(pDestStartCol, i, (const char*)&pStartWin->win.skey, false);
|
||||
colDataAppend(pDestEndCol, i, (const char*)&pEndWin->win.ekey, false);
|
||||
|
@ -1066,7 +1067,7 @@ static void generateScanRange(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock,
|
|||
pUpdateRes->info.rows++;
|
||||
}
|
||||
}
|
||||
static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||
static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) {
|
||||
blockDataCleanup(pUpdateBlock);
|
||||
int32_t size = taosArrayGetSize(pInfo->tsArray);
|
||||
if (pInfo->tsArrayIndex < size) {
|
||||
|
@ -1075,11 +1076,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
|||
blockDataEnsureCapacity(pUpdateBlock, size);
|
||||
|
||||
int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex);
|
||||
pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
|
||||
pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
|
||||
int32_t i = 0;
|
||||
for (; i < size; i++) {
|
||||
rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex);
|
||||
uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid);
|
||||
uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid);
|
||||
if (pInfo->groupId != id) {
|
||||
break;
|
||||
}
|
||||
|
@ -1098,12 +1099,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa
|
|||
}
|
||||
|
||||
if (size == 0) {
|
||||
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock);
|
||||
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock);
|
||||
}
|
||||
}
|
||||
|
||||
static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock,
|
||||
bool out) {
|
||||
static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TSKEY* ts = (TSKEY*)pColDataInfo->pData;
|
||||
|
@ -1119,15 +1119,15 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32
|
|||
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex);
|
||||
uint64_t* uidCol = (uint64_t*)pColDataInfo->pData;
|
||||
ASSERT(pBlock->info.rows > 0);
|
||||
for (int32_t i = 0 ; i < pBlock->info.rows; i++) {
|
||||
for (int32_t i = 0; i < pBlock->info.rows; i++) {
|
||||
uidCol[i] = getGroupId(pOperator, uidCol[i]);
|
||||
}
|
||||
}
|
||||
|
||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamScanInfo* pInfo = pOperator->info;
|
||||
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -1145,36 +1145,35 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
// TODO move into scan
|
||||
blockDataUpdateTsWindow(pBlock, 0);
|
||||
switch (pBlock->info.type) {
|
||||
case STREAM_RETRIEVE:{
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
||||
copyDataBlock(pInfo->pPullDataRes, pBlock);
|
||||
pInfo->pullDataResIndex = 0;
|
||||
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
|
||||
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||
}
|
||||
break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
if (isIntervalWindow(pInfo)) {
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
|
||||
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
|
||||
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
} else {
|
||||
generateScanRange(pInfo, pBlock, pInfo->pSnapshotReadOp, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
}
|
||||
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
case STREAM_RETRIEVE: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE;
|
||||
copyDataBlock(pInfo->pPullDataRes, pBlock);
|
||||
pInfo->pullDataResIndex = 0;
|
||||
prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex);
|
||||
updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo);
|
||||
} break;
|
||||
case STREAM_DELETE_DATA: {
|
||||
pInfo->blockType = STREAM_INPUT__DATA_SUBMIT;
|
||||
pInfo->updateResIndex = 0;
|
||||
if (isIntervalWindow(pInfo)) {
|
||||
copyDataBlock(pInfo->pDeleteDataRes, pBlock);
|
||||
generateIntervalTs(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes);
|
||||
prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER;
|
||||
} else {
|
||||
generateScanRange(pInfo, pBlock, pInfo->pTableScanOp, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
}
|
||||
pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA;
|
||||
return pInfo->pUpdateRes;
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return pBlock;
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
|
@ -1233,11 +1232,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||
blockDataCleanup(pInfo->pRes);
|
||||
|
||||
while (tqNextDataBlock(pInfo->streamBlockReader)) {
|
||||
while (tqNextDataBlock(pInfo->streamReader)) {
|
||||
SSDataBlock block = {0};
|
||||
|
||||
// todo refactor
|
||||
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader);
|
||||
int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader);
|
||||
|
||||
uint64_t groupId = block.info.groupId;
|
||||
uint64_t uid = block.info.uid;
|
||||
|
@ -1333,11 +1332,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) {
|
||||
// check reader last status
|
||||
// if not match, reset status
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp);
|
||||
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
|
||||
return pResult && pResult->info.rows > 0 ? pResult : NULL;
|
||||
|
||||
} else {
|
||||
|
@ -1361,8 +1362,8 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) {
|
|||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
|
@ -1400,13 +1401,25 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
}
|
||||
|
||||
if (pHandle) {
|
||||
SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info;
|
||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info;
|
||||
|
||||
SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0);
|
||||
if (pHandle->tqReader) {
|
||||
if (pHandle->initTableReader) {
|
||||
pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, 0);
|
||||
pSTInfo->dataReader = NULL;
|
||||
if (tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, &pSTInfo->dataReader, NULL) < 0) {
|
||||
ASSERT(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (pHandle->initStreamReader) {
|
||||
ASSERT(pHandle->streamReader == NULL);
|
||||
pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta);
|
||||
ASSERT(pInfo->streamReader);
|
||||
} else {
|
||||
ASSERT(pHandle->streamReader);
|
||||
pInfo->streamReader = pHandle->streamReader;
|
||||
}
|
||||
|
||||
if (pSTInfo->interval.interval > 0) {
|
||||
|
@ -1414,18 +1427,17 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
} else {
|
||||
pInfo->pUpdateInfo = NULL;
|
||||
}
|
||||
pInfo->pSnapshotReadOp = pTableScanDummy;
|
||||
|
||||
pInfo->pTableScanOp = pTableScanOp;
|
||||
pInfo->interval = pSTInfo->interval;
|
||||
|
||||
pInfo->readHandle = *pHandle;
|
||||
ASSERT(pHandle->reader);
|
||||
pInfo->streamBlockReader = pHandle->reader;
|
||||
pInfo->tableUid = pScanPhyNode->uid;
|
||||
|
||||
// set the extract column id to streamHandle
|
||||
tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds);
|
||||
tqReadHandleSetColIdList(pInfo->streamReader, pColIds);
|
||||
SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList);
|
||||
int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList);
|
||||
int32_t code = tqReadHandleSetTbUidList(pInfo->streamReader, tableIdList);
|
||||
if (code != 0) {
|
||||
taosArrayDestroy(tableIdList);
|
||||
goto _error;
|
||||
|
@ -1449,7 +1461,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pInfo->deleteDataIndex = 0;
|
||||
pInfo->pDeleteDataRes = createPullDataBlock();
|
||||
|
||||
pOperator->name = "StreamBlockScanOperator";
|
||||
pOperator->name = "StreamScanOperator";
|
||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN;
|
||||
pOperator->blocking = false;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
@ -1458,7 +1470,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
|
|
@ -3023,7 +3023,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num
|
|||
void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark,
|
||||
uint8_t type) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
SStreamBlockScanInfo* pScanInfo = downstream->info;
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type};
|
||||
pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark);
|
||||
}
|
||||
|
@ -3580,8 +3580,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
|
|||
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
|
||||
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
|
||||
|
||||
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
|
||||
SArray* pClosed, __get_win_info_ fn, bool delete) {
|
||||
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn,
|
||||
bool delete) {
|
||||
// Todo(liuyao) save window to tdb
|
||||
void** pIte = NULL;
|
||||
size_t keyLen = 0;
|
||||
|
@ -3743,8 +3743,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getResWinForSession, pInfo->ignoreExpiredData);
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession,
|
||||
pInfo->ignoreExpiredData);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
|
||||
copyUpdateResult(pStUpdated, pUpdated);
|
||||
taosHashCleanup(pStUpdated);
|
||||
|
@ -4245,8 +4245,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
|
||||
getResWinForState, pInfo->ignoreExpiredData);
|
||||
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState,
|
||||
pInfo->ignoreExpiredData);
|
||||
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData);
|
||||
copyUpdateResult(pSeUpdated, pUpdated);
|
||||
taosHashCleanup(pSeUpdated);
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#define _STREAM_INC_H_
|
||||
|
||||
#include "executor.h"
|
||||
#include "tref.h"
|
||||
#include "tstream.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -24,8 +25,9 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
void* timer;
|
||||
int8_t inited;
|
||||
int32_t refPool;
|
||||
void* timer;
|
||||
} SStreamGlobalEnv;
|
||||
|
||||
static SStreamGlobalEnv streamEnv;
|
||||
|
|
|
@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) {
|
|||
|
||||
int32_t streamSetupTrigger(SStreamTask* pTask) {
|
||||
if (pTask->triggerParam != 0) {
|
||||
if (streamInit() < 0) {
|
||||
return -1;
|
||||
}
|
||||
pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer);
|
||||
pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE;
|
||||
}
|
||||
|
|
|
@ -32,8 +32,8 @@ typedef struct SSyncLogStoreData {
|
|||
SSyncNode* pSyncNode;
|
||||
SWal* pWal;
|
||||
|
||||
TdThreadMutex mutex;
|
||||
SWalReadHandle* pWalHandle;
|
||||
TdThreadMutex mutex;
|
||||
SWalReader* pWalHandle;
|
||||
|
||||
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
|
||||
} SSyncLogStoreData;
|
||||
|
|
|
@ -62,7 +62,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
|||
ASSERT(pData->pWal != NULL);
|
||||
|
||||
taosThreadMutexInit(&(pData->mutex), NULL);
|
||||
pData->pWalHandle = walOpenReadHandle(pData->pWal);
|
||||
pData->pWalHandle = walOpenReader(pData->pWal, NULL);
|
||||
ASSERT(pData->pWalHandle != NULL);
|
||||
|
||||
pLogStore->appendEntry = logStoreAppendEntry;
|
||||
|
@ -95,7 +95,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
|
|||
|
||||
taosThreadMutexLock(&(pData->mutex));
|
||||
if (pData->pWalHandle != NULL) {
|
||||
walCloseReadHandle(pData->pWalHandle);
|
||||
walCloseReader(pData->pWalHandle);
|
||||
pData->pWalHandle = NULL;
|
||||
}
|
||||
taosThreadMutexUnlock(&(pData->mutex));
|
||||
|
@ -255,7 +255,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
|||
*ppEntry = NULL;
|
||||
|
||||
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||
SWalReadHandle* pWalHandle = pData->pWalHandle;
|
||||
SWalReader* pWalHandle = pData->pWalHandle;
|
||||
if (pWalHandle == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
|
@ -263,7 +263,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
|||
|
||||
taosThreadMutexLock(&(pData->mutex));
|
||||
|
||||
code = walReadWithHandle(pWalHandle, index);
|
||||
code = walReadVer(pWalHandle, index);
|
||||
if (code != 0) {
|
||||
int32_t err = terrno;
|
||||
const char* errStr = tstrerror(err);
|
||||
|
@ -398,10 +398,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
taosThreadMutexLock(&(pData->mutex));
|
||||
|
||||
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||
SWalReadHandle* pWalHandle = pData->pWalHandle;
|
||||
SWalReader* pWalHandle = pData->pWalHandle;
|
||||
ASSERT(pWalHandle != NULL);
|
||||
|
||||
int32_t code = walReadWithHandle(pWalHandle, index);
|
||||
int32_t code = walReadVer(pWalHandle, index);
|
||||
if (code != 0) {
|
||||
int32_t err = terrno;
|
||||
const char* errStr = tstrerror(err);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcommon.h"
|
||||
#include "tcompare.h"
|
||||
#include "wal.h"
|
||||
|
||||
|
|
|
@ -16,20 +16,29 @@
|
|||
#include "taoserror.h"
|
||||
#include "walInt.h"
|
||||
|
||||
SWalReadHandle *walOpenReadHandle(SWal *pWal) {
|
||||
SWalReadHandle *pRead = taosMemoryMalloc(sizeof(SWalReadHandle));
|
||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer);
|
||||
static int32_t walFetchBodyNew(SWalReader *pRead);
|
||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead);
|
||||
|
||||
SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) {
|
||||
SWalReader *pRead = taosMemoryMalloc(sizeof(SWalReader));
|
||||
if (pRead == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pRead->pWal = pWal;
|
||||
pRead->pReadIdxTFile = NULL;
|
||||
pRead->pReadLogTFile = NULL;
|
||||
pRead->pIdxFile = NULL;
|
||||
pRead->pLogFile = NULL;
|
||||
pRead->curVersion = -1;
|
||||
pRead->curFileFirstVer = -1;
|
||||
pRead->capacity = 0;
|
||||
pRead->status = 0;
|
||||
if (cond)
|
||||
pRead->cond = *cond;
|
||||
else {
|
||||
pRead->cond.scanMeta = 0;
|
||||
pRead->cond.scanUncommited = 0;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&pRead->mutex, NULL);
|
||||
|
||||
|
@ -39,26 +48,46 @@ SWalReadHandle *walOpenReadHandle(SWal *pWal) {
|
|||
taosMemoryFree(pRead);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return pRead;
|
||||
}
|
||||
|
||||
void walCloseReadHandle(SWalReadHandle *pRead) {
|
||||
taosCloseFile(&pRead->pReadIdxTFile);
|
||||
taosCloseFile(&pRead->pReadLogTFile);
|
||||
void walCloseReader(SWalReader *pRead) {
|
||||
taosCloseFile(&pRead->pIdxFile);
|
||||
taosCloseFile(&pRead->pLogFile);
|
||||
taosMemoryFreeClear(pRead->pHead);
|
||||
taosMemoryFree(pRead);
|
||||
}
|
||||
|
||||
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) {
|
||||
// TODO
|
||||
return 0;
|
||||
int32_t walNextValidMsg(SWalReader *pRead) {
|
||||
int64_t fetchVer = pRead->curVersion;
|
||||
int64_t endVer = pRead->cond.scanUncommited ? walGetLastVer(pRead->pWal) : walGetCommittedVer(pRead->pWal);
|
||||
while (fetchVer <= endVer) {
|
||||
if (walFetchHeadNew(pRead, fetchVer) < 0) {
|
||||
return -1;
|
||||
}
|
||||
if (pRead->pHead->head.msgType == TDMT_VND_SUBMIT ||
|
||||
(IS_META_MSG(pRead->pHead->head.msgType) && pRead->cond.scanMeta)) {
|
||||
if (walFetchBodyNew(pRead) < 0) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
} else {
|
||||
if (walSkipFetchBodyNew(pRead) < 0) {
|
||||
return -1;
|
||||
}
|
||||
fetchVer++;
|
||||
ASSERT(fetchVer == pRead->curVersion);
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
|
||||
static int64_t walReadSeekFilePos(SWalReader *pRead, int64_t fileFirstVer, int64_t ver) {
|
||||
int64_t ret = 0;
|
||||
|
||||
TdFilePtr pIdxTFile = pRead->pReadIdxTFile;
|
||||
TdFilePtr pLogTFile = pRead->pReadLogTFile;
|
||||
TdFilePtr pIdxTFile = pRead->pIdxFile;
|
||||
TdFilePtr pLogTFile = pRead->pLogFile;
|
||||
|
||||
// seek position
|
||||
int64_t offset = (ver - fileFirstVer) * sizeof(SWalIdxEntry);
|
||||
|
@ -90,11 +119,11 @@ static int64_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, i
|
|||
return ret;
|
||||
}
|
||||
|
||||
static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
||||
static int32_t walReadChangeFile(SWalReader *pRead, int64_t fileFirstVer) {
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
|
||||
taosCloseFile(&pRead->pReadIdxTFile);
|
||||
taosCloseFile(&pRead->pReadLogTFile);
|
||||
taosCloseFile(&pRead->pIdxFile);
|
||||
taosCloseFile(&pRead->pLogFile);
|
||||
|
||||
walBuildLogName(pRead->pWal, fileFirstVer, fnameStr);
|
||||
TdFilePtr pLogTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
|
@ -104,7 +133,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pRead->pReadLogTFile = pLogTFile;
|
||||
pRead->pLogFile = pLogTFile;
|
||||
|
||||
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
|
||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
|
@ -114,11 +143,11 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pRead->pReadIdxTFile = pIdxTFile;
|
||||
pRead->pIdxFile = pIdxTFile;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
||||
static int32_t walReadSeekVer(SWalReader *pRead, int64_t ver) {
|
||||
SWal *pWal = pRead->pWal;
|
||||
if (ver == pRead->curVersion) {
|
||||
return 0;
|
||||
|
@ -153,9 +182,94 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
||||
void walSetReaderCapacity(SWalReader *pRead, int32_t capacity) { pRead->capacity = capacity; }
|
||||
|
||||
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||
static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) {
|
||||
int64_t contLen;
|
||||
if (pRead->curVersion != fetchVer) {
|
||||
if (walReadSeekVer(pRead, fetchVer) < 0) return -1;
|
||||
}
|
||||
contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (contLen != sizeof(SWalCkHead)) {
|
||||
if (contLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
} else {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
pRead->curVersion = -1;
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walFetchBodyNew(SWalReader *pRead) {
|
||||
SWalCont *pReadHead = &pRead->pHead->head;
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
if (pRead->capacity < pReadHead->bodyLen) {
|
||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pRead->pHead = ptr;
|
||||
pReadHead = &pRead->pHead->head;
|
||||
pRead->capacity = pReadHead->bodyLen;
|
||||
}
|
||||
|
||||
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since %s",
|
||||
pRead->pHead->head.version, ver, tstrerror(terrno));
|
||||
} else {
|
||||
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 ", since file corrupted",
|
||||
pRead->pHead->head.version, ver);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
}
|
||||
pRead->curVersion = -1;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReadHead->version != ver) {
|
||||
wError("wal fetch body error: %" PRId64 ", read request version:%" PRId64 "", pRead->pHead->head.version, ver);
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (walValidBodyCksum(pRead->pHead) != 0) {
|
||||
wError("wal fetch body error: % " PRId64 ", since body checksum not passed", ver);
|
||||
pRead->curVersion = -1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion = ver + 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t walSkipFetchBodyNew(SWalReader *pRead) {
|
||||
int64_t code;
|
||||
|
||||
ASSERT(pRead->curVersion == pRead->pHead->head.version);
|
||||
|
||||
code = taosLSeekFile(pRead->pLogFile, pRead->pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pRead->curVersion = -1;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRead->curVersion++;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
||||
int64_t code;
|
||||
|
||||
// TODO: valid ver
|
||||
|
@ -168,9 +282,9 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
if (code < 0) return -1;
|
||||
}
|
||||
|
||||
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
|
||||
ASSERT(taosValidFile(pRead->pLogFile) == true);
|
||||
|
||||
code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalCkHead));
|
||||
code = taosReadFile(pRead->pLogFile, pHead, sizeof(SWalCkHead));
|
||||
if (code != sizeof(SWalCkHead)) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -186,12 +300,12 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
|
||||
int32_t walSkipFetchBody(SWalReader *pRead, const SWalCkHead *pHead) {
|
||||
int64_t code;
|
||||
|
||||
ASSERT(pRead->curVersion == pHead->head.version);
|
||||
|
||||
code = taosLSeekFile(pRead->pReadLogTFile, pHead->head.bodyLen, SEEK_CUR);
|
||||
code = taosLSeekFile(pRead->pLogFile, pHead->head.bodyLen, SEEK_CUR);
|
||||
if (code < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
pRead->curVersion = -1;
|
||||
|
@ -203,7 +317,7 @@ int32_t walSkipFetchBody(SWalReadHandle *pRead, const SWalCkHead *pHead) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
|
||||
int32_t walFetchBody(SWalReader *pRead, SWalCkHead **ppHead) {
|
||||
SWalCont *pReadHead = &((*ppHead)->head);
|
||||
int64_t ver = pReadHead->version;
|
||||
|
||||
|
@ -218,7 +332,7 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
|
|||
pRead->capacity = pReadHead->bodyLen;
|
||||
}
|
||||
|
||||
if (pReadHead->bodyLen != taosReadFile(pRead->pReadLogTFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
if (pReadHead->bodyLen != taosReadFile(pRead->pLogFile, pReadHead->body, pReadHead->bodyLen)) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
@ -241,9 +355,9 @@ int32_t walFetchBody(SWalReadHandle *pRead, SWalCkHead **ppHead) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHead) {
|
||||
int32_t walReadWithHandle_s(SWalReader *pRead, int64_t ver, SWalCont **ppHead) {
|
||||
taosThreadMutexLock(&pRead->mutex);
|
||||
if (walReadWithHandle(pRead, ver) < 0) {
|
||||
if (walReadVer(pRead, ver) < 0) {
|
||||
taosThreadMutexUnlock(&pRead->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
@ -257,7 +371,7 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalCont **ppHea
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
||||
int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
||||
int64_t code;
|
||||
|
||||
if (pRead->pWal->vers.firstVer == -1) {
|
||||
|
@ -280,9 +394,9 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(taosValidFile(pRead->pReadLogTFile) == true);
|
||||
ASSERT(taosValidFile(pRead->pLogFile) == true);
|
||||
|
||||
code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
code = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead));
|
||||
if (code != sizeof(SWalCkHead)) {
|
||||
if (code < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -310,7 +424,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||
}
|
||||
|
||||
if ((code = taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||
if ((code = taosReadFile(pRead->pLogFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) !=
|
||||
pRead->pHead->head.bodyLen) {
|
||||
if (code < 0)
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -340,46 +454,3 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||
int code;
|
||||
code = walSeekVer(pWal, ver);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
if (*ppHead == NULL) {
|
||||
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead));
|
||||
if (ptr == NULL) {
|
||||
return -1;
|
||||
}
|
||||
*ppHead = ptr;
|
||||
}
|
||||
if (tfRead(pWal->pWriteLogTFile, *ppHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
|
||||
return -1;
|
||||
}
|
||||
// TODO: endian compatibility processing after read
|
||||
if (walValidHeadCksum(*ppHead) != 0) {
|
||||
return -1;
|
||||
}
|
||||
void *ptr = taosMemoryRealloc(*ppHead, sizeof(SWalHead) + (*ppHead)->head.len);
|
||||
if (ptr == NULL) {
|
||||
taosMemoryFree(*ppHead);
|
||||
*ppHead = NULL;
|
||||
return -1;
|
||||
}
|
||||
if (tfRead(pWal->pWriteLogTFile, (*ppHead)->head.body, (*ppHead)->head.len) != (*ppHead)->head.len) {
|
||||
return -1;
|
||||
}
|
||||
// TODO: endian compatibility processing after read
|
||||
if (walValidBodyCksum(*ppHead) != 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -79,19 +79,21 @@ int32_t walCommit(SWal *pWal, int64_t ver) {
|
|||
}
|
||||
|
||||
int32_t walRollback(SWal *pWal, int64_t ver) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
int64_t code;
|
||||
char fnameStr[WAL_FILE_LEN];
|
||||
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) {
|
||||
terrno = TSDB_CODE_WAL_INVALID_VER;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
|
||||
// find correct file
|
||||
if (ver < walGetLastFileFirstVer(pWal)) {
|
||||
// change current files
|
||||
code = walChangeWrite(pWal, ver);
|
||||
if (code < 0) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -146,6 +148,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
ASSERT(taosValidFile(pLogTFile));
|
||||
int64_t size = taosReadFile(pLogTFile, &head, sizeof(SWalCkHead));
|
||||
if (size != sizeof(SWalCkHead)) {
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = walValidHeadCksum(&head);
|
||||
|
@ -154,11 +157,13 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
if (code != 0) {
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
if (head.head.version != ver) {
|
||||
ASSERT(0);
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -167,12 +172,14 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
|
|||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
code = taosFtruncateFile(pIdxTFile, idxOff);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
taosThreadMutexUnlock(&pWal->mutex);
|
||||
return -1;
|
||||
}
|
||||
pWal->vers.lastVer = ver - 1;
|
||||
|
|
|
@ -292,8 +292,8 @@ TEST_F(WalCleanDeleteEnv, roll) {
|
|||
|
||||
TEST_F(WalKeepEnv, readHandleRead) {
|
||||
walResetEnv();
|
||||
int code;
|
||||
SWalReadHandle* pRead = walOpenReadHandle(pWal);
|
||||
int code;
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL);
|
||||
ASSERT(pRead != NULL);
|
||||
|
||||
int i;
|
||||
|
@ -306,7 +306,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
|
|||
}
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int ver = taosRand() % 100;
|
||||
code = walReadWithHandle(pRead, ver);
|
||||
code = walReadVer(pRead, ver);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
// printf("rrbody: \n");
|
||||
|
@ -325,7 +325,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
|
|||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||
}
|
||||
}
|
||||
walCloseReadHandle(pRead);
|
||||
walCloseReader(pRead);
|
||||
}
|
||||
|
||||
TEST_F(WalRetentionEnv, repairMeta1) {
|
||||
|
@ -354,12 +354,12 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
|||
|
||||
ASSERT_EQ(pWal->vers.lastVer, 99);
|
||||
|
||||
SWalReadHandle* pRead = walOpenReadHandle(pWal);
|
||||
SWalReader* pRead = walOpenReader(pWal, NULL);
|
||||
ASSERT(pRead != NULL);
|
||||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int ver = taosRand() % 100;
|
||||
code = walReadWithHandle(pRead, ver);
|
||||
code = walReadVer(pRead, ver);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
// printf("rrbody: \n");
|
||||
|
@ -389,7 +389,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
|||
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
int ver = taosRand() % 200;
|
||||
code = walReadWithHandle(pRead, ver);
|
||||
code = walReadVer(pRead, ver);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
// printf("rrbody: \n");
|
||||
|
|
|
@ -64,6 +64,20 @@ int32_t strdequote(char *z) {
|
|||
return j + 1; // only one quote, do nothing
|
||||
}
|
||||
|
||||
char *strDupUnquo(const char *src) {
|
||||
if (src == NULL) return NULL;
|
||||
if (src[0] != '`') return strdup(src);
|
||||
int32_t len = (int32_t)strlen(src);
|
||||
if (src[len - 1] != '`') return NULL;
|
||||
char *ret = taosMemoryMalloc(len);
|
||||
if (ret == NULL) return NULL;
|
||||
for (int32_t i = 0; i < len - 1; i++) {
|
||||
ret[i] = src[i + 1];
|
||||
}
|
||||
ret[len - 1] = 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
size_t strtrim(char *z) {
|
||||
int32_t i = 0;
|
||||
int32_t j = 0;
|
||||
|
|
|
@ -25,7 +25,7 @@ class TDTestCase:
|
|||
paraDict = {'dbName': 'db2',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
|
@ -44,7 +44,7 @@ class TDTestCase:
|
|||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
|
||||
tdLog.info("create stb")
|
||||
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
|
||||
tdLog.info("create ctb")
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
#python3 ./test.py -f 2-query/last.py -Q 3
|
|
@ -295,7 +295,7 @@ python3 ./test.py -f 2-query/Today.py -Q 3
|
|||
python3 ./test.py -f 2-query/max.py -Q 3
|
||||
python3 ./test.py -f 2-query/min.py -Q 3
|
||||
python3 ./test.py -f 2-query/count.py -Q 3
|
||||
python3 ./test.py -f 2-query/last.py -Q 3
|
||||
#python3 ./test.py -f 2-query/last.py -Q 3
|
||||
python3 ./test.py -f 2-query/first.py -Q 3
|
||||
python3 ./test.py -f 2-query/To_iso8601.py -Q 3
|
||||
python3 ./test.py -f 2-query/To_unixtimestamp.py -Q 3
|
||||
|
|
Loading…
Reference in New Issue